airflow.providers.cncf.kubernetes.executors.kubernetes_executor

KubernetesExecutor.

See also

For more information on how the KubernetesExecutor works, take a look at the guide: Kubernetes Executor

Module Contents

Classes

KubernetesExecutor

Executor for Kubernetes.

Attributes

base_version

ARG_NAMESPACE

ARG_MIN_PENDING_MINUTES

KUBERNETES_COMMANDS

airflow.providers.cncf.kubernetes.executors.kubernetes_executor.base_version[source]
airflow.providers.cncf.kubernetes.executors.kubernetes_executor.ARG_NAMESPACE[source]
airflow.providers.cncf.kubernetes.executors.kubernetes_executor.ARG_MIN_PENDING_MINUTES[source]
airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KUBERNETES_COMMANDS = ()[source]
class airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor[source]

Bases: airflow.executors.base_executor.BaseExecutor

Executor for Kubernetes.

RUNNING_POD_LOG_LINES = 100[source]
supports_ad_hoc_ti_run: bool = True[source]
clear_not_launched_queued_tasks(session=NEW_SESSION)[source]

Clear tasks that were not yet launched, but were previously queued.

Tasks can end up in a “Queued” state when a rescheduled/deferred operator comes back up for execution (with the same try_number) before the pod of its previous incarnation has been fully removed (we think).

It’s also possible when an executor abruptly shuts down (leaving a non-empty task_queue on that executor), but that scenario is handled via normal adoption.

This method checks each of our queued tasks to see if the corresponding pod is around, and if not, and there’s no matching entry in our own task_queue, marks it for re-execution.

start()[source]

Start the executor.

execute_async(key, command, queue=None, executor_config=None)[source]

Execute task asynchronously.

sync()[source]

Synchronize task state.

get_task_log(ti, try_number)[source]

Return the task logs.

Parameters
Returns

tuple of logs and messages

Return type

tuple[list[str], list[str]]

try_adopt_task_instances(tis)[source]

Try to adopt running task instances that have been abandoned by a SchedulerJob dying.

Anything that is not adopted will be cleared by the scheduler (and then become eligible for re-scheduling)

Returns

any TaskInstances that were unable to be adopted

Return type

Sequence[airflow.models.taskinstance.TaskInstance]

cleanup_stuck_queued_tasks(tis)[source]

Handle remnants of tasks that were failed because they were stuck in queued.

Tasks can get stuck in queued. If such a task is detected, it will be marked as UP_FOR_RETRY if the task instance has remaining retries or marked as FAILED if it doesn’t.

Parameters

tis (list[airflow.models.taskinstance.TaskInstance]) – List of Task Instances to clean up

Returns

List of readable task instances for a warning message

Return type

list[str]

adopt_launched_task(kube_client, pod, tis_to_flush_by_key)[source]

Patch existing pod so that the current KubernetesJobWatcher can monitor it via label selectors.

Parameters
  • kube_client (kubernetes.client.CoreV1Api) – kubernetes client for speaking to kube API

  • pod (kubernetes.client.models.V1Pod) – V1Pod spec that we will patch with new label

  • tis_to_flush_by_key (dict[airflow.models.taskinstancekey.TaskInstanceKey, kubernetes.client.models.V1Pod]) – TIs that will be flushed if they aren’t adopted

end()[source]

Shut down the executor.

terminate()[source]

Terminate the executor is not doing anything.

static get_cli_commands()[source]

Vends CLI commands to be included in Airflow CLI.

Override this method to expose commands via Airflow CLI to manage this executor. This can be commands to setup/teardown the executor, inspect state, etc. Make sure to choose unique names for those commands, to avoid collisions.

Was this entry helpful?