Source code for airflow.providers.cncf.kubernetes.kube_config
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
from airflow.settings import AIRFLOW_HOME
[docs]class KubeConfig:
"""Configuration for Kubernetes."""
[docs] kubernetes_section = "kubernetes_executor"
[docs] logging_section = "logging"
def __init__(self):
configuration_dict = conf.as_dict(display_sensitive=True)
self.core_configuration = configuration_dict[self.core_section]
self.airflow_home = AIRFLOW_HOME
self.dags_folder = conf.get(self.core_section, "dags_folder")
self.parallelism = conf.getint(self.core_section, "parallelism")
self.pod_template_file = conf.get(self.kubernetes_section, "pod_template_file", fallback=None)
self.delete_worker_pods = conf.getboolean(self.kubernetes_section, "delete_worker_pods")
self.delete_worker_pods_on_failure = conf.getboolean(
self.kubernetes_section, "delete_worker_pods_on_failure"
)
self.worker_pod_pending_fatal_container_state_reasons = []
if conf.get(self.kubernetes_section, "worker_pod_pending_fatal_container_state_reasons", fallback=""):
self.worker_pod_pending_fatal_container_state_reasons = conf.get(
self.kubernetes_section, "worker_pod_pending_fatal_container_state_reasons"
).split(",")
self.worker_pods_creation_batch_size = conf.getint(
self.kubernetes_section, "worker_pods_creation_batch_size"
)
self.worker_container_repository = conf.get(self.kubernetes_section, "worker_container_repository")
self.worker_container_tag = conf.get(self.kubernetes_section, "worker_container_tag")
if self.worker_container_repository and self.worker_container_tag:
self.kube_image = f"{self.worker_container_repository}:{self.worker_container_tag}"
else:
self.kube_image = None
# The Kubernetes Namespace in which the Scheduler and Webserver reside. Note
# that if your
# cluster has RBAC enabled, your scheduler may need service account permissions to
# create, watch, get, and delete pods in this namespace.
self.kube_namespace = conf.get(self.kubernetes_section, "namespace")
self.multi_namespace_mode = conf.getboolean(self.kubernetes_section, "multi_namespace_mode")
if self.multi_namespace_mode and conf.get(
self.kubernetes_section, "multi_namespace_mode_namespace_list"
):
self.multi_namespace_mode_namespace_list = conf.get(
self.kubernetes_section, "multi_namespace_mode_namespace_list"
).split(",")
else:
self.multi_namespace_mode_namespace_list = None
# The Kubernetes Namespace in which pods will be created by the executor. Note
# that if your
# cluster has RBAC enabled, your workers may need service account permissions to
# interact with cluster components.
self.executor_namespace = conf.get(self.kubernetes_section, "namespace")
self.worker_pods_queued_check_interval = conf.getint(
self.kubernetes_section, "worker_pods_queued_check_interval"
)
self.kube_client_request_args = conf.getjson(
self.kubernetes_section, "kube_client_request_args", fallback={}
)
if not isinstance(self.kube_client_request_args, dict):
raise AirflowConfigException(
f"[{self.kubernetes_section}] 'kube_client_request_args' expected a JSON dict, got "
+ type(self.kube_client_request_args).__name__
)
if self.kube_client_request_args:
if "_request_timeout" in self.kube_client_request_args and isinstance(
self.kube_client_request_args["_request_timeout"], list
):
self.kube_client_request_args["_request_timeout"] = tuple(
self.kube_client_request_args["_request_timeout"]
)
self.delete_option_kwargs = conf.getjson(self.kubernetes_section, "delete_option_kwargs", fallback={})
if not isinstance(self.delete_option_kwargs, dict):
raise AirflowConfigException(
f"[{self.kubernetes_section}] 'delete_option_kwargs' expected a JSON dict, got "
+ type(self.delete_option_kwargs).__name__
)