Source code for tests.system.providers.cncf.kubernetes.example_kubernetes

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This is an example dag for using the KubernetesPodOperator.
"""
from __future__ import annotations

import os
from datetime import datetime

from kubernetes.client import models as k8s

from airflow import DAG
from airflow.kubernetes.secret import Secret
from airflow.operators.bash import BashOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

# [START howto_operator_k8s_cluster_resources]
[docs]secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn')
[docs]secret_env = Secret('env', 'SQL_CONN', 'airflow-secrets', 'sql_alchemy_conn')
[docs]secret_all_keys = Secret('env', None, 'airflow-secrets-2')
[docs]volume_mount = k8s.V1VolumeMount( name='test-volume', mount_path='/root/mount_file', sub_path=None, read_only=True
)
[docs]configmaps = [ k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name='test-configmap-1')), k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name='test-configmap-2')),
]
[docs]volume = k8s.V1Volume( name='test-volume', persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name='test-volume'),
)
[docs]port = k8s.V1ContainerPort(name='http', container_port=80)
[docs]init_container_volume_mounts = [ k8s.V1VolumeMount(mount_path='/etc/foo', name='test-volume', sub_path=None, read_only=True)
]
[docs]init_environments = [k8s.V1EnvVar(name='key1', value='value1'), k8s.V1EnvVar(name='key2', value='value2')]
[docs]init_container = k8s.V1Container( name="init-container", image="ubuntu:16.04", env=init_environments, volume_mounts=init_container_volume_mounts, command=["bash", "-cx"], args=["echo 10"],
)
[docs]affinity = k8s.V1Affinity( node_affinity=k8s.V1NodeAffinity( preferred_during_scheduling_ignored_during_execution=[ k8s.V1PreferredSchedulingTerm( weight=1, preference=k8s.V1NodeSelectorTerm( match_expressions=[ k8s.V1NodeSelectorRequirement(key="disktype", operator="In", values=["ssd"]) ] ), ) ] ), pod_affinity=k8s.V1PodAffinity( required_during_scheduling_ignored_during_execution=[ k8s.V1WeightedPodAffinityTerm( weight=1, pod_affinity_term=k8s.V1PodAffinityTerm( label_selector=k8s.V1LabelSelector( match_expressions=[ k8s.V1LabelSelectorRequirement(key="security", operator="In", values="S1") ] ), topology_key="failure-domain.beta.kubernetes.io/zone",
), ) ] ), )
[docs]tolerations = [k8s.V1Toleration(key="key", operator="Equal", value="value")]
# [END howto_operator_k8s_cluster_resources]
[docs]ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
[docs]DAG_ID = "example_kubernetes_operator"
with DAG( dag_id='example_kubernetes_operator', schedule=None, start_date=datetime(2021, 1, 1), tags=['example'], ) as dag:
[docs] k = KubernetesPodOperator( namespace='default', image="ubuntu:16.04", cmds=["bash", "-cx"], arguments=["echo", "10"], labels={"foo": "bar"}, secrets=[secret_file, secret_env, secret_all_keys], ports=[port], volumes=[volume], volume_mounts=[volume_mount], env_from=configmaps, name="airflow-test-pod", task_id="task", affinity=affinity, is_delete_operator_pod=True, hostnetwork=False, tolerations=tolerations, init_containers=[init_container], priority_class_name="medium",
) # [START howto_operator_k8s_private_image] quay_k8s = KubernetesPodOperator( namespace='default', image='quay.io/apache/bash', image_pull_secrets=[k8s.V1LocalObjectReference('testquay')], cmds=["bash", "-cx"], arguments=["echo", "10", "echo pwd"], labels={"foo": "bar"}, name="airflow-private-image-pod", is_delete_operator_pod=True, in_cluster=True, task_id="task-two", get_logs=True, ) # [END howto_operator_k8s_private_image] # [START howto_operator_k8s_write_xcom] write_xcom = KubernetesPodOperator( namespace='default', image='alpine', cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"], name="write-xcom", do_xcom_push=True, is_delete_operator_pod=True, in_cluster=True, task_id="write-xcom", get_logs=True, ) pod_task_xcom_result = BashOperator( bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"", task_id="pod_task_xcom_result", ) write_xcom >> pod_task_xcom_result # [END howto_operator_k8s_write_xcom] from tests.system.utils.watcher import watcher # This test needs watcher in order to properly mark success/failure # when "tearDown" task with trigger rule is part of the DAG list(dag.tasks) >> watcher() from tests.system.utils import get_test_run # noqa: E402 # Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
[docs]test_run = get_test_run(dag)

Was this entry helpful?