#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This is an example dag for using the KubernetesPodOperator.
"""
from __future__ import annotations
import os
from datetime import datetime
from kubernetes.client import models as k8s
from airflow import DAG
from airflow.kubernetes.secret import Secret
from airflow.operators.bash import BashOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
# [START howto_operator_k8s_cluster_resources]
[docs]secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn')
[docs]secret_env = Secret('env', 'SQL_CONN', 'airflow-secrets', 'sql_alchemy_conn')
[docs]secret_all_keys = Secret('env', None, 'airflow-secrets-2')
[docs]volume_mount = k8s.V1VolumeMount(
name='test-volume', mount_path='/root/mount_file', sub_path=None, read_only=True
)
[docs]configmaps = [
k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name='test-configmap-1')),
k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name='test-configmap-2')),
]
[docs]volume = k8s.V1Volume(
name='test-volume',
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name='test-volume'),
)
[docs]port = k8s.V1ContainerPort(name='http', container_port=80)
[docs]init_container_volume_mounts = [
k8s.V1VolumeMount(mount_path='/etc/foo', name='test-volume', sub_path=None, read_only=True)
]
[docs]init_environments = [k8s.V1EnvVar(name='key1', value='value1'), k8s.V1EnvVar(name='key2', value='value2')]
[docs]init_container = k8s.V1Container(
name="init-container",
image="ubuntu:16.04",
env=init_environments,
volume_mounts=init_container_volume_mounts,
command=["bash", "-cx"],
args=["echo 10"],
)
[docs]affinity = k8s.V1Affinity(
node_affinity=k8s.V1NodeAffinity(
preferred_during_scheduling_ignored_during_execution=[
k8s.V1PreferredSchedulingTerm(
weight=1,
preference=k8s.V1NodeSelectorTerm(
match_expressions=[
k8s.V1NodeSelectorRequirement(key="disktype", operator="In", values=["ssd"])
]
),
)
]
),
pod_affinity=k8s.V1PodAffinity(
required_during_scheduling_ignored_during_execution=[
k8s.V1WeightedPodAffinityTerm(
weight=1,
pod_affinity_term=k8s.V1PodAffinityTerm(
label_selector=k8s.V1LabelSelector(
match_expressions=[
k8s.V1LabelSelectorRequirement(key="security", operator="In", values="S1")
]
),
topology_key="failure-domain.beta.kubernetes.io/zone",
),
)
]
),
)
[docs]tolerations = [k8s.V1Toleration(key="key", operator="Equal", value="value")]
# [END howto_operator_k8s_cluster_resources]
[docs]ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
[docs]DAG_ID = "example_kubernetes_operator"
with DAG(
dag_id='example_kubernetes_operator',
schedule=None,
start_date=datetime(2021, 1, 1),
tags=['example'],
) as dag:
[docs] k = KubernetesPodOperator(
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo", "10"],
labels={"foo": "bar"},
secrets=[secret_file, secret_env, secret_all_keys],
ports=[port],
volumes=[volume],
volume_mounts=[volume_mount],
env_from=configmaps,
name="airflow-test-pod",
task_id="task",
affinity=affinity,
is_delete_operator_pod=True,
hostnetwork=False,
tolerations=tolerations,
init_containers=[init_container],
priority_class_name="medium",
)
# [START howto_operator_k8s_private_image]
quay_k8s = KubernetesPodOperator(
namespace='default',
image='quay.io/apache/bash',
image_pull_secrets=[k8s.V1LocalObjectReference('testquay')],
cmds=["bash", "-cx"],
arguments=["echo", "10", "echo pwd"],
labels={"foo": "bar"},
name="airflow-private-image-pod",
is_delete_operator_pod=True,
in_cluster=True,
task_id="task-two",
get_logs=True,
)
# [END howto_operator_k8s_private_image]
# [START howto_operator_k8s_write_xcom]
write_xcom = KubernetesPodOperator(
namespace='default',
image='alpine',
cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
name="write-xcom",
do_xcom_push=True,
is_delete_operator_pod=True,
in_cluster=True,
task_id="write-xcom",
get_logs=True,
)
pod_task_xcom_result = BashOperator(
bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"",
task_id="pod_task_xcom_result",
)
write_xcom >> pod_task_xcom_result
# [END howto_operator_k8s_write_xcom]
from tests.system.utils.watcher import watcher
# This test needs watcher in order to properly mark success/failure
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()
from tests.system.utils import get_test_run # noqa: E402
# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
[docs]test_run = get_test_run(dag)