Source code for airflow.providers.cncf.kubernetes.pod_generator
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Pod generator.This module provides an interface between the previous PodAPI and outputs a kubernetes.client.models.V1Pod.The advantage being that the full Kubernetes APIis supported and no serialization need be written."""from__future__importannotationsimportcopyimportloggingimportosimportwarningsfromfunctoolsimportreducefromtypingimportTYPE_CHECKINGimportre2fromdateutilimportparserfromdeprecatedimportdeprecatedfromkubernetes.clientimportmodelsask8sfromkubernetes.client.api_clientimportApiClientfromairflow.exceptionsimport(AirflowConfigException,AirflowException,AirflowProviderDeprecationWarning,)fromairflow.providers.cncf.kubernetes.kubernetes_helper_functionsimport(POD_NAME_MAX_LENGTH,add_unique_suffix,rand_str,)fromairflow.providers.cncf.kubernetes.pod_generator_deprecatedimport(PodDefaultsasPodDefaultsDeprecated,PodGeneratorasPodGeneratorDeprecated,)fromairflow.utilsimportyamlfromairflow.utils.hashlib_wrapperimportmd5fromairflow.versionimportversionasairflow_versionifTYPE_CHECKING:importdatetime
[docs]classPodMutationHookException(AirflowException):"""Raised when exception happens during Pod Mutation Hook execution."""
[docs]classPodReconciliationError(AirflowException):"""Raised when an error is encountered while trying to merge pod configs."""
[docs]defmake_safe_label_value(string:str)->str:""" Normalize a provided label to be of valid length and characters. Valid label values must be 63 characters or less and must be empty or begin and end with an alphanumeric character ([a-z0-9A-Z]) with dashes (-), underscores (_), dots (.), and alphanumerics between. If the label value is greater than 63 chars once made safe, or differs in any way from the original value sent to this function, then we need to truncate to 53 chars, and append it with a unique hash. """safe_label=re2.sub(r"^[^a-z0-9A-Z]*|[^a-zA-Z0-9_\-\.]|[^a-z0-9A-Z]*$","",string)iflen(safe_label)>MAX_LABEL_LENorstring!=safe_label:safe_hash=md5(string.encode()).hexdigest()[:9]safe_label=safe_label[:MAX_LABEL_LEN-len(safe_hash)-1]+"-"+safe_hashreturnsafe_label
[docs]defdatetime_to_label_safe_datestring(datetime_obj:datetime.datetime)->str:""" Transform a datetime string to use as a label. Kubernetes doesn't like ":" in labels, since ISO datetime format uses ":" but not "_" let's replace ":" with "_" :param datetime_obj: datetime.datetime object :return: ISO-like string representing the datetime """returndatetime_obj.isoformat().replace(":","_").replace("+","_plus_")
[docs]deflabel_safe_datestring_to_datetime(string:str)->datetime.datetime:""" Transform a label back to a datetime object. Kubernetes doesn't permit ":" in labels. ISO datetime format uses ":" but not "_", let's replace ":" with "_" :param string: str :return: datetime.datetime object """returnparser.parse(string.replace("_plus_","+").replace("_",":"))
[docs]classPodGenerator:""" Contains Kubernetes Airflow Worker configuration logic. Represents a kubernetes pod and manages execution of a single pod. Any configuration that is container specific gets applied to the first container in the list of containers. :param pod: The fully specified pod. Mutually exclusive with `pod_template_file` :param pod_template_file: Path to YAML file. Mutually exclusive with `pod` :param extract_xcom: Whether to bring up a container for xcom """def__init__(self,pod:k8s.V1Pod|None=None,pod_template_file:str|None=None,extract_xcom:bool=True,):ifnotpod_template_fileandnotpod:raiseAirflowConfigException("Podgenerator requires either a `pod` or a `pod_template_file` argument")ifpod_template_fileandpod:raiseAirflowConfigException("Cannot pass both `pod` and `pod_template_file` arguments")ifpod_template_file:self.ud_pod=self.deserialize_model_file(pod_template_file)else:self.ud_pod=pod# Attach sidecarself.extract_xcom=extract_xcom@deprecated(reason="This method is deprecated and will be removed in the future releases",category=AirflowProviderDeprecationWarning,)
@staticmethod@deprecated(reason=("This function is deprecated. ""Please use airflow.providers.cncf.kubernetes.utils.xcom_sidecar.add_xcom_sidecar instead"),category=AirflowProviderDeprecationWarning,)
[docs]deffrom_obj(obj)->dict|k8s.V1Pod|None:"""Convert to pod from obj."""ifobjisNone:returnNonek8s_legacy_object=obj.get("KubernetesExecutor",None)k8s_object=obj.get("pod_override",None)ifk8s_legacy_objectandk8s_object:raiseAirflowConfigException("Can not have both a legacy and new""executor_config object. Please delete the KubernetesExecutor""dict and only use the pod_override kubernetes.client.models.V1Pod""object.")ifnotk8s_objectandnotk8s_legacy_object:returnNoneifisinstance(k8s_object,k8s.V1Pod):returnk8s_objectelifisinstance(k8s_legacy_object,dict):warnings.warn("Using a dictionary for the executor_config is deprecated and will soon be removed. "'Please use a `kubernetes.client.models.V1Pod` class with a "pod_override" key'" instead. ",category=AirflowProviderDeprecationWarning,stacklevel=2,)returnPodGenerator.from_legacy_obj(obj)else:raiseTypeError("Cannot convert a non-kubernetes.client.models.V1Pod object into a KubernetesExecutorConfig")
@staticmethod
[docs]deffrom_legacy_obj(obj)->k8s.V1Pod|None:"""Convert to pod from obj."""ifobjisNone:returnNone# We do not want to extract constant here from ExecutorLoader because it is just# A name in dictionary rather than executor selection mechanism and it causes cyclic importnamespaced=obj.get("KubernetesExecutor",{})ifnotnamespaced:returnNoneresources=namespaced.get("resources")ifresourcesisNone:requests={"cpu":namespaced.pop("request_cpu",None),"memory":namespaced.pop("request_memory",None),"ephemeral-storage":namespaced.get("ephemeral-storage"),# We pop this one in limits}limits={"cpu":namespaced.pop("limit_cpu",None),"memory":namespaced.pop("limit_memory",None),"ephemeral-storage":namespaced.pop("ephemeral-storage",None),}all_resources=list(requests.values())+list(limits.values())ifall(risNoneforrinall_resources):resources=Noneelse:# remove None's so they don't become 0'srequests={k:vfork,vinrequests.items()ifvisnotNone}limits={k:vfork,vinlimits.items()ifvisnotNone}resources=k8s.V1ResourceRequirements(requests=requests,limits=limits)namespaced["resources"]=resourcesreturnPodGeneratorDeprecated(**namespaced).gen_pod()
@staticmethod
[docs]defreconcile_pods(base_pod:k8s.V1Pod,client_pod:k8s.V1Pod|None)->k8s.V1Pod:""" Merge Kubernetes Pod objects. :param base_pod: has the base attributes which are overwritten if they exist in the client pod and remain if they do not exist in the client_pod :param client_pod: the pod that the client wants to create. :return: the merged pods This can't be done recursively as certain fields are overwritten and some are concatenated. """ifclient_podisNone:returnbase_podclient_pod_cp=copy.deepcopy(client_pod)client_pod_cp.spec=PodGenerator.reconcile_specs(base_pod.spec,client_pod_cp.spec)client_pod_cp.metadata=PodGenerator.reconcile_metadata(base_pod.metadata,client_pod_cp.metadata)client_pod_cp=merge_objects(base_pod,client_pod_cp)returnclient_pod_cp
@staticmethod
[docs]defreconcile_metadata(base_meta,client_meta):""" Merge Kubernetes Metadata objects. :param base_meta: has the base attributes which are overwritten if they exist in the client_meta and remain if they do not exist in the client_meta :param client_meta: the spec that the client wants to create. :return: the merged specs """ifbase_metaandnotclient_meta:returnbase_metaifnotbase_metaandclient_meta:returnclient_metaelifclient_metaandbase_meta:client_meta.labels=merge_objects(base_meta.labels,client_meta.labels)client_meta.annotations=merge_objects(base_meta.annotations,client_meta.annotations)extend_object_field(base_meta,client_meta,"managed_fields")extend_object_field(base_meta,client_meta,"finalizers")extend_object_field(base_meta,client_meta,"owner_references")returnmerge_objects(base_meta,client_meta)returnNone
@staticmethod
[docs]defreconcile_specs(base_spec:k8s.V1PodSpec|None,client_spec:k8s.V1PodSpec|None)->k8s.V1PodSpec|None:""" Merge Kubernetes PodSpec objects. :param base_spec: has the base attributes which are overwritten if they exist in the client_spec and remain if they do not exist in the client_spec :param client_spec: the spec that the client wants to create. :return: the merged specs """ifbase_specandnotclient_spec:returnbase_specifnotbase_specandclient_spec:returnclient_specelifclient_specandbase_spec:client_spec.containers=PodGenerator.reconcile_containers(base_spec.containers,client_spec.containers)merged_spec=extend_object_field(base_spec,client_spec,"init_containers")merged_spec=extend_object_field(base_spec,merged_spec,"volumes")returnmerge_objects(base_spec,merged_spec)returnNone
@staticmethod
[docs]defreconcile_containers(base_containers:list[k8s.V1Container],client_containers:list[k8s.V1Container])->list[k8s.V1Container]:""" Merge Kubernetes Container objects. :param base_containers: has the base attributes which are overwritten if they exist in the client_containers and remain if they do not exist in the client_containers :param client_containers: the containers that the client wants to create. :return: the merged containers The runs recursively over the list of containers. """ifnotbase_containers:returnclient_containersifnotclient_containers:returnbase_containersclient_container=client_containers[0]base_container=base_containers[0]client_container=extend_object_field(base_container,client_container,"volume_mounts")client_container=extend_object_field(base_container,client_container,"env")client_container=extend_object_field(base_container,client_container,"env_from")client_container=extend_object_field(base_container,client_container,"ports")client_container=extend_object_field(base_container,client_container,"volume_devices")client_container=merge_objects(base_container,client_container)return[client_container,*PodGenerator.reconcile_containers(base_containers[1:],client_containers[1:]),]
@classmethod
[docs]defconstruct_pod(cls,dag_id:str,task_id:str,pod_id:str,try_number:int,kube_image:str,date:datetime.datetime|None,args:list[str],pod_override_object:k8s.V1Pod|None,base_worker_pod:k8s.V1Pod,namespace:str,scheduler_job_id:str,run_id:str|None=None,map_index:int=-1,*,with_mutation_hook:bool=False,)->k8s.V1Pod:""" Create a Pod. Construct a pod by gathering and consolidating the configuration from 3 places: - airflow.cfg - executor_config - dynamic arguments """iflen(pod_id)>POD_NAME_MAX_LENGTH:warnings.warn(f"pod_id supplied is longer than {POD_NAME_MAX_LENGTH} characters; "f"truncating and adding unique suffix.",UserWarning,stacklevel=2,)pod_id=add_unique_suffix(name=pod_id,max_len=POD_NAME_MAX_LENGTH)try:image=pod_override_object.spec.containers[0].image# type: ignoreifnotimage:image=kube_imageexceptException:image=kube_imageannotations={"dag_id":dag_id,"task_id":task_id,"try_number":str(try_number),}ifmap_index>=0:annotations["map_index"]=str(map_index)ifdate:annotations["execution_date"]=date.isoformat()ifrun_id:annotations["run_id"]=run_iddynamic_pod=k8s.V1Pod(metadata=k8s.V1ObjectMeta(namespace=namespace,annotations=annotations,name=pod_id,labels=cls.build_labels_for_k8s_executor_pod(dag_id=dag_id,task_id=task_id,try_number=try_number,airflow_worker=scheduler_job_id,map_index=map_index,execution_date=date,run_id=run_id,),),spec=k8s.V1PodSpec(containers=[k8s.V1Container(name="base",args=args,image=image,env=[k8s.V1EnvVar(name="AIRFLOW_IS_K8S_EXECUTOR_POD",value="True")],)]),)# Reconcile the pods starting with the first chronologically,# Pod from the pod_template_File -> Pod from the K8s executor -> Pod from executor_config argpod_list=[base_worker_pod,dynamic_pod,pod_override_object]try:pod=reduce(PodGenerator.reconcile_pods,pod_list)exceptExceptionase:raisePodReconciliationErrorfromeifwith_mutation_hook:fromairflow.settingsimportpod_mutation_hooktry:pod_mutation_hook(pod)exceptExceptionase:raisePodMutationHookExceptionfromereturnpod
@classmethoddefbuild_selector_for_k8s_executor_pod(cls,*,dag_id,task_id,try_number,map_index=None,execution_date=None,run_id=None,airflow_worker=None,include_version=False,):""" Generate selector for kubernetes executor pod. :meta private: """labels=cls.build_labels_for_k8s_executor_pod(dag_id=dag_id,task_id=task_id,try_number=try_number,map_index=map_index,execution_date=execution_date,run_id=run_id,airflow_worker=airflow_worker,include_version=include_version,)label_strings=[f"{label_id}={label}"forlabel_id,labelinsorted(labels.items())]selector=",".join(label_strings)ifnotairflow_worker:# this filters out KPO pods even when we don't know the scheduler job idselector+=",airflow-worker"returnselector@classmethoddefbuild_labels_for_k8s_executor_pod(cls,*,dag_id,task_id,try_number,airflow_worker=None,map_index=None,execution_date=None,run_id=None,include_version=True,):""" Generate labels for kubernetes executor pod. :meta private: """labels={"dag_id":make_safe_label_value(dag_id),"task_id":make_safe_label_value(task_id),"try_number":str(try_number),"kubernetes_executor":"True",}ifinclude_version:labels["airflow_version"]=airflow_version.replace("+","-")ifairflow_workerisnotNone:labels["airflow-worker"]=make_safe_label_value(str(airflow_worker))ifmap_indexisnotNoneandmap_index>=0:labels["map_index"]=str(map_index)ifexecution_date:labels["execution_date"]=datetime_to_label_safe_datestring(execution_date)ifrun_id:labels["run_id"]=make_safe_label_value(run_id)returnlabels@staticmethod
[docs]defserialize_pod(pod:k8s.V1Pod)->dict:""" Convert a k8s.V1Pod into a json serializable dictionary. :param pod: k8s.V1Pod object :return: Serialized version of the pod returned as dict """api_client=ApiClient()returnapi_client.sanitize_for_serialization(pod)
@staticmethod
[docs]defdeserialize_model_file(path:str)->k8s.V1Pod:""" Generate a Pod from a file. :param path: Path to the file :return: a kubernetes.client.models.V1Pod """ifos.path.exists(path):withopen(path)asstream:pod=yaml.safe_load(stream)else:pod=Nonelog.warning("Model file %s does not exist",path)returnPodGenerator.deserialize_model_dict(pod)
@staticmethod
[docs]defdeserialize_model_dict(pod_dict:dict|None)->k8s.V1Pod:""" Deserializes a Python dictionary to k8s.V1Pod. Unfortunately we need access to the private method ``_ApiClient__deserialize_model`` from the kubernetes client. This issue is tracked here; https://github.com/kubernetes-client/python/issues/977. :param pod_dict: Serialized dict of k8s.V1Pod object :return: De-serialized k8s.V1Pod """api_client=ApiClient()returnapi_client._ApiClient__deserialize_model(pod_dict,k8s.V1Pod)
@staticmethod@deprecated(reason="This method is deprecated. Use `add_pod_suffix` in `kubernetes_helper_functions`.",category=AirflowProviderDeprecationWarning,)
[docs]defmake_unique_pod_id(pod_id:str)->str|None:r""" Generate a unique Pod name. Kubernetes pod names must consist of one or more lowercase rfc1035/rfc1123 labels separated by '.' with a maximum length of 253 characters. Name must pass the following regex for validation ``^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$`` For more details, see: https://github.com/kubernetes/kubernetes/blob/release-1.1/docs/design/identifiers.md :param pod_id: requested pod name :return: ``str`` valid Pod name of appropriate length """ifnotpod_id:returnNonemax_pod_id_len=100# arbitrarily chosensuffix=rand_str(8)# 8 seems good enoughbase_pod_id_len=max_pod_id_len-len(suffix)-1# -1 for separatortrimmed_pod_id=pod_id[:base_pod_id_len].rstrip("-.")returnf"{trimmed_pod_id}-{suffix}"
[docs]defmerge_objects(base_obj,client_obj):""" Merge objects. :param base_obj: has the base attributes which are overwritten if they exist in the client_obj and remain if they do not exist in the client_obj :param client_obj: the object that the client wants to create. :return: the merged objects """ifnotbase_obj:returnclient_objifnotclient_obj:returnbase_objclient_obj_cp=copy.deepcopy(client_obj)ifisinstance(base_obj,dict)andisinstance(client_obj_cp,dict):base_obj_cp=copy.deepcopy(base_obj)base_obj_cp.update(client_obj_cp)returnbase_obj_cpforbase_keyinbase_obj.to_dict():base_val=getattr(base_obj,base_key,None)ifnotgetattr(client_obj,base_key,None)andbase_val:ifnotisinstance(client_obj_cp,dict):setattr(client_obj_cp,base_key,base_val)else:client_obj_cp[base_key]=base_valreturnclient_obj_cp
[docs]defextend_object_field(base_obj,client_obj,field_name):""" Add field values to existing objects. :param base_obj: an object which has a property `field_name` that is a list :param client_obj: an object which has a property `field_name` that is a list. A copy of this object is returned with `field_name` modified :param field_name: the name of the list field :return: the client_obj with the property `field_name` being the two properties appended """client_obj_cp=copy.deepcopy(client_obj)base_obj_field=getattr(base_obj,field_name,None)client_obj_field=getattr(client_obj,field_name,None)if(notisinstance(base_obj_field,list)andbase_obj_fieldisnotNone)or(notisinstance(client_obj_field,list)andclient_obj_fieldisnotNone):raiseValueError(f"The chosen field must be a list. Got {type(base_obj_field)} base_object_field "f"and {type(client_obj_field)} client_object_field.")ifnotbase_obj_field:returnclient_obj_cpifnotclient_obj_field:setattr(client_obj_cp,field_name,base_obj_field)returnclient_obj_cpappended_fields=base_obj_field+client_obj_fieldsetattr(client_obj_cp,field_name,appended_fields)returnclient_obj_cp