Source code for airflow.providers.cncf.kubernetes.kube_config

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
from airflow.settings import AIRFLOW_HOME


[docs]class KubeConfig: """Configuration for Kubernetes."""
[docs] core_section = "core"
[docs] kubernetes_section = "kubernetes_executor"
[docs] logging_section = "logging"
def __init__(self): configuration_dict = conf.as_dict(display_sensitive=True) self.core_configuration = configuration_dict[self.core_section] self.airflow_home = AIRFLOW_HOME self.dags_folder = conf.get(self.core_section, "dags_folder") self.parallelism = conf.getint(self.core_section, "parallelism") self.pod_template_file = conf.get(self.kubernetes_section, "pod_template_file", fallback=None) self.delete_worker_pods = conf.getboolean(self.kubernetes_section, "delete_worker_pods") self.delete_worker_pods_on_failure = conf.getboolean( self.kubernetes_section, "delete_worker_pods_on_failure" ) self.worker_pod_pending_fatal_container_state_reasons = [] if conf.get(self.kubernetes_section, "worker_pod_pending_fatal_container_state_reasons", fallback=""): self.worker_pod_pending_fatal_container_state_reasons = conf.get( self.kubernetes_section, "worker_pod_pending_fatal_container_state_reasons" ).split(",") self.worker_pods_creation_batch_size = conf.getint( self.kubernetes_section, "worker_pods_creation_batch_size" ) self.worker_container_repository = conf.get(self.kubernetes_section, "worker_container_repository") self.worker_container_tag = conf.get(self.kubernetes_section, "worker_container_tag") if self.worker_container_repository and self.worker_container_tag: self.kube_image = f"{self.worker_container_repository}:{self.worker_container_tag}" else: self.kube_image = None # The Kubernetes Namespace in which the Scheduler and Webserver reside. Note # that if your # cluster has RBAC enabled, your scheduler may need service account permissions to # create, watch, get, and delete pods in this namespace. self.kube_namespace = conf.get(self.kubernetes_section, "namespace") self.multi_namespace_mode = conf.getboolean(self.kubernetes_section, "multi_namespace_mode") if self.multi_namespace_mode and conf.get( self.kubernetes_section, "multi_namespace_mode_namespace_list" ): self.multi_namespace_mode_namespace_list = conf.get( self.kubernetes_section, "multi_namespace_mode_namespace_list" ).split(",") else: self.multi_namespace_mode_namespace_list = None # The Kubernetes Namespace in which pods will be created by the executor. Note # that if your # cluster has RBAC enabled, your workers may need service account permissions to # interact with cluster components. self.executor_namespace = conf.get(self.kubernetes_section, "namespace") self.worker_pods_queued_check_interval = conf.getint( self.kubernetes_section, "worker_pods_queued_check_interval" ) self.kube_client_request_args = conf.getjson( self.kubernetes_section, "kube_client_request_args", fallback={} ) if not isinstance(self.kube_client_request_args, dict): raise AirflowConfigException( f"[{self.kubernetes_section}] 'kube_client_request_args' expected a JSON dict, got " + type(self.kube_client_request_args).__name__ ) if self.kube_client_request_args: if "_request_timeout" in self.kube_client_request_args and isinstance( self.kube_client_request_args["_request_timeout"], list ): self.kube_client_request_args["_request_timeout"] = tuple( self.kube_client_request_args["_request_timeout"] ) self.delete_option_kwargs = conf.getjson(self.kubernetes_section, "delete_option_kwargs", fallback={}) if not isinstance(self.delete_option_kwargs, dict): raise AirflowConfigException( f"[{self.kubernetes_section}] 'delete_option_kwargs' expected a JSON dict, got " + type(self.delete_option_kwargs).__name__ )

Was this entry helpful?