airflow.providers.cncf.kubernetes.operators.kubernetes_pod

Executes task in a Kubernetes POD

Module Contents

Classes

KubernetesPodOperator

Execute a task in a Kubernetes Pod

exception airflow.providers.cncf.kubernetes.operators.kubernetes_pod.PodReattachFailure[source]

Bases: airflow.exceptions.AirflowException

When we expect to be able to find a pod but cannot.

class airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator(*, namespace: Optional[str] = None, image: Optional[str] = None, name: Optional[str] = None, random_name_suffix: Optional[bool] = True, cmds: Optional[List[str]] = None, arguments: Optional[List[str]] = None, ports: Optional[List[kubernetes.client.models.V1ContainerPort]] = None, volume_mounts: Optional[List[kubernetes.client.models.V1VolumeMount]] = None, volumes: Optional[List[kubernetes.client.models.V1Volume]] = None, env_vars: Optional[List[kubernetes.client.models.V1EnvVar]] = None, env_from: Optional[List[kubernetes.client.models.V1EnvFromSource]] = None, secrets: Optional[List[airflow.kubernetes.secret.Secret]] = None, in_cluster: Optional[bool] = None, cluster_context: Optional[str] = None, labels: Optional[Dict] = None, reattach_on_restart: bool = True, startup_timeout_seconds: int = 120, get_logs: bool = True, image_pull_policy: Optional[str] = None, annotations: Optional[Dict] = None, resources: Optional[kubernetes.client.models.V1ResourceRequirements] = None, affinity: Optional[kubernetes.client.models.V1Affinity] = None, config_file: Optional[str] = None, node_selectors: Optional[dict] = None, node_selector: Optional[dict] = None, image_pull_secrets: Optional[List[kubernetes.client.models.V1LocalObjectReference]] = None, service_account_name: Optional[str] = None, is_delete_operator_pod: bool = True, hostnetwork: bool = False, tolerations: Optional[List[kubernetes.client.models.V1Toleration]] = None, security_context: Optional[Dict] = None, dnspolicy: Optional[str] = None, schedulername: Optional[str] = None, full_pod_spec: Optional[kubernetes.client.models.V1Pod] = None, init_containers: Optional[List[kubernetes.client.models.V1Container]] = None, log_events_on_failure: bool = False, do_xcom_push: bool = False, pod_template_file: Optional[str] = None, priority_class_name: Optional[str] = None, pod_runtime_info_envs: List[airflow.providers.cncf.kubernetes.backcompat.pod_runtime_info_env.PodRuntimeInfoEnv] = None, termination_grace_period: Optional[int] = None, configmaps: Optional[List[str]] = None, **kwargs)[source]

Bases: airflow.models.BaseOperator

Execute a task in a Kubernetes Pod

See also

For more information on how to use this operator, take a look at the guide: KubernetesPodOperator

Note

If you use Google Kubernetes Engine and Airflow is not running in the same cluster, consider using GKEStartPodOperator, which simplifies the authorization process.

Parameters
  • namespace (str) -- the namespace to run within kubernetes.

  • image (str) -- Docker image you wish to launch. Defaults to hub.docker.com, but fully qualified URLS will point to custom repositories. (templated)

  • name (str) -- name of the pod in which the task will run, will be used (plus a random suffix if random_name_suffix is True) to generate a pod id (DNS-1123 subdomain, containing only [a-z0-9.-]).

  • random_name_suffix (bool) -- if True, will generate a random suffix.

  • cmds (list[str]) -- entrypoint of the container. (templated) The docker images's entrypoint is used if this is not provided.

  • arguments (list[str]) -- arguments of the entrypoint. (templated) The docker image's CMD is used if this is not provided.

  • ports (list[k8s.V1ContainerPort]) -- ports for launched pod.

  • volume_mounts (list[k8s.V1VolumeMount]) -- volumeMounts for launched pod.

  • volumes (list[k8s.V1Volume]) -- volumes for launched pod. Includes ConfigMaps and PersistentVolumes.

  • env_vars (list[k8s.V1EnvVar]) -- Environment variables initialized in the container. (templated)

  • secrets (list[airflow.kubernetes.secret.Secret]) -- Kubernetes secrets to inject in the container. They can be exposed as environment vars or files in a volume.

  • in_cluster (bool) -- run kubernetes client with in_cluster configuration.

  • cluster_context (str) -- context that points to kubernetes cluster. Ignored when in_cluster is True. If None, current-context is used.

  • reattach_on_restart (bool) -- if the scheduler dies while the pod is running, reattach and monitor

  • labels (dict) -- labels to apply to the Pod. (templated)

  • startup_timeout_seconds (int) -- timeout in seconds to startup the pod.

  • get_logs (bool) -- get the stdout of the container as logs of the tasks.

  • image_pull_policy (str) -- Specify a policy to cache or always pull an image.

  • annotations (dict) -- non-identifying metadata you can attach to the Pod. Can be a large range of data, and can include characters that are not permitted by labels.

  • resources (k8s.V1ResourceRequirements) -- A dict containing resources requests and limits. Possible keys are request_memory, request_cpu, limit_memory, limit_cpu, and limit_gpu, which will be used to generate airflow.kubernetes.pod.Resources. See also kubernetes.io/docs/concepts/configuration/manage-compute-resources-container

  • affinity (k8s.V1Affinity) -- A dict containing a group of affinity scheduling rules.

  • config_file (str) -- The path to the Kubernetes config file. (templated) If not specified, default value is ~/.kube/config

  • node_selector (dict) -- A dict containing a group of scheduling rules.

  • image_pull_secrets (List[k8s.V1LocalObjectReference]) -- Any image pull secrets to be given to the pod. If more than one secret is required, provide a comma separated list: secret_a,secret_b

  • service_account_name (str) -- Name of the service account

  • is_delete_operator_pod (bool) -- What to do when the pod reaches its final state, or the execution is interrupted. If True (default), delete the pod; if False, leave the pod.

  • hostnetwork (bool) -- If True enable host networking on the pod.

  • tolerations (List[k8s.V1Toleration]) -- A list of kubernetes tolerations.

  • security_context (dict) -- security options the pod should run with (PodSecurityContext).

  • dnspolicy (str) -- dnspolicy for the pod.

  • schedulername (str) -- Specify a schedulername for the pod

  • full_pod_spec (kubernetes.client.models.V1Pod) -- The complete podSpec

  • init_containers (list[kubernetes.client.models.V1Container]) -- init container for the launched Pod

  • log_events_on_failure (bool) -- Log the pod's events if a failure occurs

  • do_xcom_push (bool) -- If True, the content of the file /airflow/xcom/return.json in the container will also be pushed to an XCom when the container completes.

  • pod_template_file (str) -- path to pod template file (templated)

  • priority_class_name (str) -- priority class name for the launched Pod

  • termination_grace_period (int) -- Termination grace period if task killed in UI, defaults to kubernetes default

BASE_CONTAINER_NAME = base[source]
POD_CHECKED_KEY = already_checked[source]
template_fields :Sequence[str] = ['image', 'cmds', 'arguments', 'env_vars', 'labels', 'config_file', 'pod_template_file', 'namespace'][source]
pod_manager(self) airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager[source]
client(self) kubernetes.client.CoreV1Api[source]
find_pod(self, namespace, context) Optional[kubernetes.client.models.V1Pod][source]

Returns an already-running pod for this task instance if one exists.

get_or_create_pod(self, pod_request_obj: kubernetes.client.models.V1Pod, context)[source]
await_pod_start(self, pod)[source]
extract_xcom(self, pod)[source]

Retrieves xcom value and kills xcom sidecar container

execute(self, context: airflow.utils.context.Context)[source]

This is the main method to derive when creating an operator. Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

cleanup(self, pod: kubernetes.client.models.V1Pod, remote_pod: kubernetes.client.models.V1Pod)[source]
process_pod_deletion(self, pod)[source]
patch_already_checked(self, pod: kubernetes.client.models.V1Pod)[source]

Add an "already checked" annotation to ensure we don't reattach on retries

on_kill(self) None[source]

Override this method to cleanup subprocesses when a task instance gets killed. Any use of the threading, subprocess or multiprocessing module within an operator needs to be cleaned up or it will leave ghost processes behind.

build_pod_request_obj(self, context=None)[source]

Returns V1Pod object based on pod template file, full pod spec, and other operator parameters.

The V1Pod attributes are derived (in order of precedence) from operator params, full pod spec, pod template file.

dry_run(self) None[source]

Prints out the pod definition that would be created by this operator. Does not include labels specific to the task instance (since there isn't one in a dry_run) and excludes all empty elements.

Was this entry helpful?