KubernetesPodOperator

The KubernetesPodOperator allows you to create and run Pods on a Kubernetes cluster.

Note

If you use Google Kubernetes Engine, consider using the GKEStartPodOperator operator as it simplifies the Kubernetes authorization process.

Note

The Kubernetes executor is not required to use this operator.

How does this operator work?

The KubernetesPodOperator uses the Kubernetes API to launch a pod in a Kubernetes cluster. By supplying an image URL and a command with optional arguments, the operator uses the Kube Python Client to generate a Kubernetes API request that dynamically launches those individual pods. Users can specify a kubeconfig file using the config_file parameter, otherwise the operator will default to ~/.kube/config.

The KubernetesPodOperator enables task-level resource configuration and is optimal for custom Python dependencies that are not available through the public PyPI repository. It also allows users to supply a template YAML file using the pod_template_file parameter. Ultimately, it allows Airflow to act a job orchestrator - no matter the language those jobs are written in.

Debugging KubernetesPodOperator

You can print out the Kubernetes manifest for the pod that would be created at runtime by calling dry_run() on an instance of the operator.

from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

k = KubernetesPodOperator(
    name="hello-dry-run",
    image="debian",
    cmds=["bash", "-cx"],
    arguments=["echo", "10"],
    labels={"foo": "bar"},
    task_id="dry_run_demo",
    do_xcom_push=True,
)

k.dry_run()

Argument precedence

When building the pod object, there may be overlap between KPO params, pod spec, template and airflow connection. In general, the order of precedence is KPO argument > full pod spec > pod template file > airflow connection.

For namespace, if namespace is not provided via any of these methods, then we’ll first try to get the current namespace (if the task is already running in kubernetes) and failing that we’ll use the default namespace.

For pod name, if not provided explicitly, we’ll use the task_id. A random suffix is added by default so the pod name is not generally of great consequence.

How to use cluster ConfigMaps, Secrets, and Volumes with Pod?

To add ConfigMaps, Volumes, and other Kubernetes native objects, we recommend that you import the Kubernetes model API like this:

from kubernetes.client import models as k8s

With this API object, you can have access to all Kubernetes API objects in the form of python classes. Using this method will ensure correctness and type safety. While we have removed almost all Kubernetes convenience classes, we have kept the Secret class to simplify the process of generating secret volumes/env variables.

tests/system/providers/cncf/kubernetes/example_kubernetes.py[source]

secret_file = Secret("volume", "/etc/sql_conn", "airflow-secrets", "sql_alchemy_conn")
secret_env = Secret("env", "SQL_CONN", "airflow-secrets", "sql_alchemy_conn")
secret_all_keys = Secret("env", None, "airflow-secrets-2")
volume_mount = k8s.V1VolumeMount(
    name="test-volume", mount_path="/root/mount_file", sub_path=None, read_only=True
)

configmaps = [
    k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name="test-configmap-1")),
    k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name="test-configmap-2")),
]

volume = k8s.V1Volume(
    name="test-volume",
    persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="test-volume"),
)

port = k8s.V1ContainerPort(name="http", container_port=80)

init_container_volume_mounts = [
    k8s.V1VolumeMount(mount_path="/etc/foo", name="test-volume", sub_path=None, read_only=True)
]

init_environments = [k8s.V1EnvVar(name="key1", value="value1"), k8s.V1EnvVar(name="key2", value="value2")]

init_container = k8s.V1Container(
    name="init-container",
    image="ubuntu:16.04",
    env=init_environments,
    volume_mounts=init_container_volume_mounts,
    command=["bash", "-cx"],
    args=["echo 10"],
)

affinity = k8s.V1Affinity(
    node_affinity=k8s.V1NodeAffinity(
        preferred_during_scheduling_ignored_during_execution=[
            k8s.V1PreferredSchedulingTerm(
                weight=1,
                preference=k8s.V1NodeSelectorTerm(
                    match_expressions=[
                        k8s.V1NodeSelectorRequirement(key="disktype", operator="In", values=["ssd"])
                    ]
                ),
            )
        ]
    ),
    pod_affinity=k8s.V1PodAffinity(
        required_during_scheduling_ignored_during_execution=[
            k8s.V1WeightedPodAffinityTerm(
                weight=1,
                pod_affinity_term=k8s.V1PodAffinityTerm(
                    label_selector=k8s.V1LabelSelector(
                        match_expressions=[
                            k8s.V1LabelSelectorRequirement(key="security", operator="In", values="S1")
                        ]
                    ),
                    topology_key="failure-domain.beta.kubernetes.io/zone",
                ),
            )
        ]
    ),
)

tolerations = [k8s.V1Toleration(key="key", operator="Equal", value="value")]

Difference between KubernetesPodOperator and Kubernetes object spec

The KubernetesPodOperator can be considered a substitute for a Kubernetes object spec definition that is able to be run in the Airflow scheduler in the DAG context. If using the operator, there is no need to create the equivalent YAML/JSON object spec for the Pod you would like to run. The YAML file can still be provided with the pod_template_file or even the Pod Spec constructed in Python via the full_pod_spec parameter which requires a Kubernetes V1Pod.

How to use private images (container registry)?

By default, the KubernetesPodOperator will look for images hosted publicly on Dockerhub. To pull images from a private registry (such as ECR, GCR, Quay, or others), you must create a Kubernetes Secret that represents the credentials for accessing images from the private registry that is ultimately specified in the image_pull_secrets parameter.

Create the Secret using kubectl:

kubectl create secret docker-registry testquay \
    --docker-server=quay.io \
    --docker-username=<Profile name> \
    --docker-password=<password>

Then use it in your pod like so:

tests/system/providers/cncf/kubernetes/example_kubernetes.py[source]

    quay_k8s = KubernetesPodOperator(
        namespace="default",
        image="quay.io/apache/bash",
        image_pull_secrets=[k8s.V1LocalObjectReference("testquay")],
        cmds=["bash", "-cx"],
        arguments=["echo", "10", "echo pwd"],
        labels={"foo": "bar"},
        name="airflow-private-image-pod",
        is_delete_operator_pod=True,
        in_cluster=True,
        task_id="task-two",
        get_logs=True,
    )

Also for this action you can use operator in the deferrable mode:

tests/system/providers/cncf/kubernetes/example_kubernetes_async.py[source]

    quay_k8s_async = KubernetesPodOperator(
        task_id="kubernetes_private_img_task_async",
        namespace="default",
        image="quay.io/apache/bash",
        image_pull_secrets=[k8s.V1LocalObjectReference("testquay")],
        cmds=["bash", "-cx"],
        arguments=["echo", "10", "echo pwd"],
        labels={"foo": "bar"},
        name="airflow-private-image-pod",
        is_delete_operator_pod=True,
        in_cluster=True,
        get_logs=True,
        deferrable=True,
    )

How does XCom work?

The KubernetesPodOperator handles XCom values differently than other operators. In order to pass a XCom value from your Pod you must specify the do_xcom_push as True. This will create a sidecar container that runs alongside the Pod. The Pod must write the XCom value into this location at the /airflow/xcom/return.json path.

See the following example on how this occurs:

tests/system/providers/cncf/kubernetes/example_kubernetes.py[source]

    write_xcom = KubernetesPodOperator(
        namespace="default",
        image="alpine",
        cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
        name="write-xcom",
        do_xcom_push=True,
        is_delete_operator_pod=True,
        in_cluster=True,
        task_id="write-xcom",
        get_logs=True,
    )

    pod_task_xcom_result = BashOperator(
        bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"",
        task_id="pod_task_xcom_result",
    )

    write_xcom >> pod_task_xcom_result

Note

XCOMs will be pushed only for tasks marked as State.SUCCESS.

Also for this action you can use operator in the deferrable mode:

tests/system/providers/cncf/kubernetes/example_kubernetes_async.py[source]

    write_xcom_async = KubernetesPodOperator(
        task_id="kubernetes_write_xcom_task_async",
        namespace="default",
        image="alpine",
        cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
        name="write-xcom",
        do_xcom_push=True,
        is_delete_operator_pod=True,
        in_cluster=True,
        get_logs=True,
        deferrable=True,
    )

    pod_task_xcom_result_async = BashOperator(
        task_id="pod_task_xcom_result_async",
        bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"",
    )

    write_xcom_async >> pod_task_xcom_result_async

Include error message in email alert

Any content written to /dev/termination-log will be retrieved by Kubernetes and included in the exception message if the task fails.

k = KubernetesPodOperator(
    task_id="test_error_message",
    image="alpine",
    cmds=["/bin/sh"],
    arguments=["-c", "echo hello world; echo Custom error > /dev/termination-log; exit 1;"],
    name="test-error-message",
    email="airflow@example.com",
    email_on_failure=True,
)

Read more on termination-log here.

Reference

For further information, look at:

Was this entry helpful?