airflow.contrib.executors.kubernetes_executor

Kubernetes executor

Module Contents

airflow.contrib.executors.kubernetes_executor.MAX_POD_ID_LEN = 253[source]
airflow.contrib.executors.kubernetes_executor.MAX_LABEL_LEN = 63[source]
class airflow.contrib.executors.kubernetes_executor.KubernetesExecutorConfig(image=None, image_pull_policy=None, request_memory=None, request_cpu=None, limit_memory=None, limit_cpu=None, limit_gpu=None, gcp_service_account_key=None, node_selectors=None, affinity=None, annotations=None, volumes=None, volume_mounts=None, tolerations=None, labels=None)[source]
__repr__(self)[source]
static from_dict(obj)[source]
as_dict(self)[source]
class airflow.contrib.executors.kubernetes_executor.KubeConfig[source]

Configuration for Kubernetes

core_section = core[source]
kubernetes_section = kubernetes[source]
_get_security_context_val(self, scontext)[source]
_validate(self)[source]
class airflow.contrib.executors.kubernetes_executor.KubernetesJobWatcher(namespace, watcher_queue, resource_version, worker_uuid, kube_config)[source]

Bases: multiprocessing.Process, airflow.utils.log.logging_mixin.LoggingMixin

Watches for Kubernetes jobs

run(self)[source]

Performs watching

_run(self, kube_client, resource_version, worker_uuid, kube_config)[source]
process_error(self, event)[source]

Process error response

process_status(self, pod_id, namespace, status, labels, resource_version, event)[source]

Process status response

class airflow.contrib.executors.kubernetes_executor.AirflowKubernetesScheduler(kube_config, task_queue, result_queue, kube_client, worker_uuid)[source]

Bases: airflow.utils.log.logging_mixin.LoggingMixin

Airflow Scheduler for Kubernetes

_make_kube_watcher(self)[source]
_health_check_kube_watcher(self)[source]
run_next(self, next_job)[source]

The run_next command will check the task_queue for any un-run jobs. It will then create a unique job-id, launch that job in the cluster, and store relevant info in the current_jobs map so we can track the job’s status

delete_pod(self, pod_id, namespace)[source]

Deletes POD

sync(self)[source]

The sync function checks the status of all currently running kubernetes jobs. If a job is completed, it’s status is placed in the result queue to be sent back to the scheduler.

Returns

process_watcher_task(self, task)[source]

Process the task by watcher.

static _strip_unsafe_kubernetes_special_chars(string)[source]

Kubernetes only supports lowercase alphanumeric characters and “-” and “.” in the pod name However, there are special rules about how “-” and “.” can be used so let’s only keep alphanumeric chars see here for detail: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/

Parameters

string – The requested Pod name

Returns

str Pod name stripped of any unsafe characters

static _make_safe_pod_id(safe_dag_id, safe_task_id, safe_uuid)[source]

Kubernetes pod names must be <= 253 chars and must pass the following regex for validation ^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$

Parameters
  • safe_dag_id – a dag_id with only alphanumeric characters

  • safe_task_id – a task_id with only alphanumeric characters

  • safe_uuid – a uuid

Returns

str valid Pod name of appropriate length

static _make_safe_label_value(string)[source]

Valid label values must be 63 characters or less and must be empty or begin and end with an alphanumeric character ([a-z0-9A-Z]) with dashes (-), underscores (_), dots (.), and alphanumerics between.

If the label value is then greater than 63 chars once made safe, or differs in any way from the original value sent to this function, then we need to truncate to 53chars, and append it with a unique hash.

static _create_pod_id(dag_id, task_id)[source]
static _label_safe_datestring_to_datetime(string)[source]

Kubernetes doesn’t permit “:” in labels. ISO datetime format uses “:” but not “_”, let’s replace “:” with “_”

Parameters

string – str

Returns

datetime.datetime object

static _datetime_to_label_safe_datestring(datetime_obj)[source]

Kubernetes doesn’t like “:” in labels, since ISO datetime format uses “:” but not “_” let’s replace “:” with “_”

Parameters

datetime_obj – datetime.datetime object

Returns

ISO-like string representing the datetime

_labels_to_key(self, labels)[source]
_flush_watcher_queue(self)[source]
terminate(self)[source]

Termninates the watcher.

class airflow.contrib.executors.kubernetes_executor.KubernetesExecutor[source]

Bases: airflow.executors.base_executor.BaseExecutor, airflow.utils.log.logging_mixin.LoggingMixin

Executor for Kubernetes

clear_not_launched_queued_tasks(self, session=None)[source]

If the airflow scheduler restarts with pending “Queued” tasks, the tasks may or may not have been launched Thus, on starting up the scheduler let’s check every “Queued” task to see if it has been launched (ie: if there is a corresponding pod on kubernetes)

If it has been launched then do nothing, otherwise reset the state to “None” so the task will be rescheduled

This will not be necessary in a future version of airflow in which there is proper support for State.LAUNCHED

_inject_secrets(self)[source]
start(self)[source]

Starts the executor

execute_async(self, key, command, queue=None, executor_config=None)[source]

Executes task asynchronously

sync(self)[source]

Synchronize task state.

_change_state(self, key, state, pod_id, namespace)[source]
_flush_task_queue(self)[source]
_flush_result_queue(self)[source]
end(self)[source]

Called when the executor shuts down

Was this entry helpful?