Source code for tests.system.providers.cncf.kubernetes.example_kubernetes_async

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This is an example dag for using the KubernetesPodOperator.
"""

from __future__ import annotations

import os
from datetime import datetime

from kubernetes.client import models as k8s

from airflow import DAG
from airflow.kubernetes.secret import Secret
from airflow.operators.bash import BashOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

# [START howto_operator_k8s_cluster_resources]
[docs]secret_file = Secret("volume", "/etc/sql_conn", "airflow-secrets", "sql_alchemy_conn")
[docs]secret_env = Secret("env", "SQL_CONN", "airflow-secrets", "sql_alchemy_conn")
[docs]secret_all_keys = Secret("env", None, "airflow-secrets-2")
[docs]volume_mount = k8s.V1VolumeMount( name="test-volume", mount_path="/root/mount_file", sub_path=None, read_only=True
)
[docs]configmaps = [ k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name="test-configmap-1")), k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name="test-configmap-2")),
]
[docs]volume = k8s.V1Volume( name="test-volume", persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="test-volume"),
)
[docs]port = k8s.V1ContainerPort(name="http", container_port=80)
[docs]init_container_volume_mounts = [ k8s.V1VolumeMount(mount_path="/etc/foo", name="test-volume", sub_path=None, read_only=True)
]
[docs]init_environments = [k8s.V1EnvVar(name="key1", value="value1"), k8s.V1EnvVar(name="key2", value="value2")]
[docs]init_container = k8s.V1Container( name="init-container", image="ubuntu:16.04", env=init_environments, volume_mounts=init_container_volume_mounts, command=["bash", "-cx"], args=["echo 10"],
)
[docs]affinity = k8s.V1Affinity( node_affinity=k8s.V1NodeAffinity( preferred_during_scheduling_ignored_during_execution=[ k8s.V1PreferredSchedulingTerm( weight=1, preference=k8s.V1NodeSelectorTerm( match_expressions=[ k8s.V1NodeSelectorRequirement(key="disktype", operator="In", values=["ssd"]) ] ), ) ] ), pod_affinity=k8s.V1PodAffinity( required_during_scheduling_ignored_during_execution=[ k8s.V1WeightedPodAffinityTerm( weight=1, pod_affinity_term=k8s.V1PodAffinityTerm( label_selector=k8s.V1LabelSelector( match_expressions=[ k8s.V1LabelSelectorRequirement(key="security", operator="In", values="S1") ] ), topology_key="failure-domain.beta.kubernetes.io/zone",
), ) ] ), )
[docs]tolerations = [k8s.V1Toleration(key="key", operator="Equal", value="value")]
# [END howto_operator_k8s_cluster_resources]
[docs]ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
[docs]DAG_ID = "example_kubernetes_operator_async"
with DAG( dag_id=DAG_ID, schedule=None, start_date=datetime(2021, 1, 1), tags=["example"], ) as dag:
[docs] k = KubernetesPodOperator( task_id="kubernetes_task_async", namespace="default", image="ubuntu:16.04", cmds=["bash", "-cx"], arguments=["echo", "10"], labels={"foo": "bar"}, secrets=[secret_file, secret_env, secret_all_keys], ports=[port], volumes=[volume], volume_mounts=[volume_mount], env_from=configmaps, name="airflow-test-pod", affinity=affinity, is_delete_operator_pod=True, hostnetwork=False, tolerations=tolerations, init_containers=[init_container], priority_class_name="medium", deferrable=True,
) # [START howto_operator_k8s_private_image_async] quay_k8s_async = KubernetesPodOperator( task_id="kubernetes_private_img_task_async", namespace="default", image="quay.io/apache/bash", image_pull_secrets=[k8s.V1LocalObjectReference("testquay")], cmds=["bash", "-cx"], arguments=["echo", "10", "echo pwd"], labels={"foo": "bar"}, name="airflow-private-image-pod", is_delete_operator_pod=True, in_cluster=True, get_logs=True, deferrable=True, ) # [END howto_operator_k8s_private_image_async] # [START howto_operator_k8s_write_xcom_async] write_xcom_async = KubernetesPodOperator( task_id="kubernetes_write_xcom_task_async", namespace="default", image="alpine", cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"], name="write-xcom", do_xcom_push=True, is_delete_operator_pod=True, in_cluster=True, get_logs=True, deferrable=True, ) pod_task_xcom_result_async = BashOperator( task_id="pod_task_xcom_result_async", bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"", ) write_xcom_async >> pod_task_xcom_result_async # [END howto_operator_k8s_write_xcom_async] from tests.system.utils.watcher import watcher # This test needs watcher in order to properly mark success/failure # when "tearDown" task with trigger rule is part of the DAG list(dag.tasks) >> watcher() from tests.system.utils import get_test_run # noqa: E402 # Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
[docs]test_run = get_test_run(dag)

Was this entry helpful?