Source code for airflow.providers.cncf.kubernetes.operators.pod
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Executes task in a Kubernetes POD."""from__future__importannotationsimportdatetimeimportjsonimportloggingimportreimportshleximportstringimportwarningsfromcollections.abcimportContainerfromcontextlibimportAbstractContextManagerfromfunctoolsimportcached_propertyfromtypingimportTYPE_CHECKING,Any,Callable,Iterable,Sequenceimportkubernetesfromdeprecatedimportdeprecatedfromkubernetes.clientimportCoreV1Api,V1Pod,modelsask8sfromkubernetes.streamimportstreamfromurllib3.exceptionsimportHTTPErrorfromairflow.configurationimportconffromairflow.exceptionsimport(AirflowException,AirflowProviderDeprecationWarning,AirflowSkipException,TaskDeferred,)fromairflow.modelsimportBaseOperatorfromairflow.providers.cncf.kubernetesimportpod_generatorfromairflow.providers.cncf.kubernetes.backcompat.backwards_compat_convertersimport(convert_affinity,convert_configmap,convert_env_vars,convert_image_pull_secrets,convert_pod_runtime_info_env,convert_port,convert_toleration,convert_volume,convert_volume_mount,)fromairflow.providers.cncf.kubernetes.callbacksimportExecutionMode,KubernetesPodOperatorCallbackfromairflow.providers.cncf.kubernetes.hooks.kubernetesimportKubernetesHookfromairflow.providers.cncf.kubernetes.kubernetes_helper_functionsimport(POD_NAME_MAX_LENGTH,add_pod_suffix,create_pod_id,)fromairflow.providers.cncf.kubernetes.pod_generatorimportPodGeneratorfromairflow.providers.cncf.kubernetes.triggers.podimportKubernetesPodTriggerfromairflow.providers.cncf.kubernetes.utilsimportxcom_sidecar# type: ignore[attr-defined]fromairflow.providers.cncf.kubernetes.utils.pod_managerimport(EMPTY_XCOM_RESULT,OnFinishAction,PodLaunchFailedException,PodManager,PodNotFoundException,PodOperatorHookProtocol,PodPhase,container_is_succeeded,get_container_termination_message,)fromairflow.settingsimportpod_mutation_hookfromairflow.utilsimportyamlfromairflow.utils.helpersimportprune_dict,validate_keyfromairflow.versionimportversionasairflow_versionifTYPE_CHECKING:importjinja2frompendulumimportDateTimefromtyping_extensionsimportLiteralfromairflow.providers.cncf.kubernetes.secretimportSecretfromairflow.utils.contextimportContext
[docs]classPodReattachFailure(AirflowException):"""When we expect to be able to find a pod but cannot."""
[docs]classKubernetesPodOperator(BaseOperator):""" Execute a task in a Kubernetes Pod. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:KubernetesPodOperator` .. note:: If you use `Google Kubernetes Engine <https://cloud.google.com/kubernetes-engine/>`__ and Airflow is not running in the same cluster, consider using :class:`~airflow.providers.google.cloud.operators.kubernetes_engine.GKEStartPodOperator`, which simplifies the authorization process. :param kubernetes_conn_id: The :ref:`kubernetes connection id <howto/connection:kubernetes>` for the Kubernetes cluster. :param namespace: the namespace to run within kubernetes. :param image: Docker image you wish to launch. Defaults to hub.docker.com, but fully qualified URLS will point to custom repositories. (templated) :param name: name of the pod in which the task will run, will be used (plus a random suffix if random_name_suffix is True) to generate a pod id (DNS-1123 subdomain, containing only [a-z0-9.-]). :param random_name_suffix: if True, will generate a random suffix. :param cmds: entrypoint of the container. (templated) The docker images's entrypoint is used if this is not provided. :param arguments: arguments of the entrypoint. (templated) The docker image's CMD is used if this is not provided. :param ports: ports for the launched pod. :param volume_mounts: volumeMounts for the launched pod. :param volumes: volumes for the launched pod. Includes ConfigMaps and PersistentVolumes. :param env_vars: Environment variables initialized in the container. (templated) :param env_from: (Optional) List of sources to populate environment variables in the container. :param secrets: Kubernetes secrets to inject in the container. They can be exposed as environment vars or files in a volume. :param in_cluster: run kubernetes client with in_cluster configuration. :param cluster_context: context that points to kubernetes cluster. Ignored when in_cluster is True. If None, current-context is used. (templated) :param reattach_on_restart: if the worker dies while the pod is running, reattach and monitor during the next try. If False, always create a new pod for each try. :param labels: labels to apply to the Pod. (templated) :param startup_timeout_seconds: timeout in seconds to startup the pod. :param startup_check_interval_seconds: interval in seconds to check if the pod has already started :param get_logs: get the stdout of the base container as logs of the tasks. :param container_logs: list of containers whose logs will be published to stdout Takes a sequence of containers, a single container name or True. If True, all the containers logs are published. Works in conjunction with get_logs param. The default value is the base container. :param image_pull_policy: Specify a policy to cache or always pull an image. :param annotations: non-identifying metadata you can attach to the Pod. Can be a large range of data, and can include characters that are not permitted by labels. (templated) :param container_resources: resources for the launched pod. (templated) :param affinity: affinity scheduling rules for the launched pod. :param config_file: The path to the Kubernetes config file. (templated) If not specified, default value is ``~/.kube/config`` :param node_selector: A dict containing a group of scheduling rules. :param image_pull_secrets: Any image pull secrets to be given to the pod. If more than one secret is required, provide a comma separated list: secret_a,secret_b :param service_account_name: Name of the service account :param hostnetwork: If True enable host networking on the pod. :param host_aliases: A list of host aliases to apply to the containers in the pod. :param tolerations: A list of kubernetes tolerations. :param security_context: security options the pod should run with (PodSecurityContext). :param container_security_context: security options the container should run with. :param dnspolicy: dnspolicy for the pod. :param dns_config: dns configuration (ip addresses, searches, options) for the pod. :param hostname: hostname for the pod. :param subdomain: subdomain for the pod. :param schedulername: Specify a schedulername for the pod :param full_pod_spec: The complete podSpec :param init_containers: init container for the launched Pod :param log_events_on_failure: Log the pod's events if a failure occurs :param do_xcom_push: If True, the content of the file /airflow/xcom/return.json in the container will also be pushed to an XCom when the container completes. :param pod_template_file: path to pod template file (templated) :param pod_template_dict: pod template dictionary (templated) :param priority_class_name: priority class name for the launched Pod :param pod_runtime_info_envs: (Optional) A list of environment variables, to be set in the container. :param termination_grace_period: Termination grace period if task killed in UI, defaults to kubernetes default :param configmaps: (Optional) A list of names of config maps from which it collects ConfigMaps to populate the environment variables with. The contents of the target ConfigMap's Data field will represent the key-value pairs as environment variables. Extends env_from. :param skip_on_exit_code: If task exits with this exit code, leave the task in ``skipped`` state (default: None). If set to ``None``, any non-zero exit code will be treated as a failure. :param base_container_name: The name of the base container in the pod. This container's logs will appear as part of this task's logs if get_logs is True. Defaults to None. If None, will consult the class variable BASE_CONTAINER_NAME (which defaults to "base") for the base container name to use. :param deferrable: Run operator in the deferrable mode. :param poll_interval: Polling period in seconds to check for the status. Used only in deferrable mode. :param log_pod_spec_on_failure: Log the pod's specification if a failure occurs :param on_finish_action: What to do when the pod reaches its final state, or the execution is interrupted. If "delete_pod", the pod will be deleted regardless its state; if "delete_succeeded_pod", only succeeded pod will be deleted. You can set to "keep_pod" to keep the pod. :param is_delete_operator_pod: What to do when the pod reaches its final state, or the execution is interrupted. If True (default), delete the pod; if False, leave the pod. Deprecated - use `on_finish_action` instead. :param termination_message_policy: The termination message policy of the base container. Default value is "File" :param active_deadline_seconds: The active_deadline_seconds which translates to active_deadline_seconds in V1PodSpec. :param callbacks: KubernetesPodOperatorCallback instance contains the callbacks methods on different step of KubernetesPodOperator. :param progress_callback: Callback function for receiving k8s container logs. `progress_callback` is deprecated, please use :param `callbacks` instead. :param logging_interval: max time in seconds that task should be in deferred state before resuming to fetch the latest logs. If ``None``, then the task will remain in deferred state until pod is done, and no logs will be visible until that time. """# !!! Changes in KubernetesPodOperator's arguments should be also reflected in !!!# - airflow/decorators/__init__.pyi (by a separate PR)# This field can be overloaded at the instance level via base_container_name
def__init__(self,*,kubernetes_conn_id:str|None=KubernetesHook.default_conn_name,namespace:str|None=None,image:str|None=None,name:str|None=None,random_name_suffix:bool=True,cmds:list[str]|None=None,arguments:list[str]|None=None,ports:list[k8s.V1ContainerPort]|None=None,volume_mounts:list[k8s.V1VolumeMount]|None=None,volumes:list[k8s.V1Volume]|None=None,env_vars:list[k8s.V1EnvVar]|dict[str,str]|None=None,env_from:list[k8s.V1EnvFromSource]|None=None,secrets:list[Secret]|None=None,in_cluster:bool|None=None,cluster_context:str|None=None,labels:dict|None=None,reattach_on_restart:bool=True,startup_timeout_seconds:int=120,startup_check_interval_seconds:int=1,get_logs:bool=True,container_logs:Iterable[str]|str|Literal[True]=BASE_CONTAINER_NAME,image_pull_policy:str|None=None,annotations:dict|None=None,container_resources:k8s.V1ResourceRequirements|None=None,affinity:k8s.V1Affinity|None=None,config_file:str|None=None,node_selector:dict|None=None,image_pull_secrets:list[k8s.V1LocalObjectReference]|None=None,service_account_name:str|None=None,hostnetwork:bool=False,host_aliases:list[k8s.V1HostAlias]|None=None,tolerations:list[k8s.V1Toleration]|None=None,security_context:k8s.V1PodSecurityContext|dict|None=None,container_security_context:k8s.V1SecurityContext|dict|None=None,dnspolicy:str|None=None,dns_config:k8s.V1PodDNSConfig|None=None,hostname:str|None=None,subdomain:str|None=None,schedulername:str|None=None,full_pod_spec:k8s.V1Pod|None=None,init_containers:list[k8s.V1Container]|None=None,log_events_on_failure:bool=False,do_xcom_push:bool=False,pod_template_file:str|None=None,pod_template_dict:dict|None=None,priority_class_name:str|None=None,pod_runtime_info_envs:list[k8s.V1EnvVar]|None=None,termination_grace_period:int|None=None,configmaps:list[str]|None=None,skip_on_exit_code:int|Container[int]|None=None,base_container_name:str|None=None,deferrable:bool=conf.getboolean("operators","default_deferrable",fallback=False),poll_interval:float=2,log_pod_spec_on_failure:bool=True,on_finish_action:str="delete_pod",is_delete_operator_pod:None|bool=None,termination_message_policy:str="File",active_deadline_seconds:int|None=None,callbacks:type[KubernetesPodOperatorCallback]|None=None,progress_callback:Callable[[str],None]|None=None,logging_interval:int|None=None,**kwargs,)->None:super().__init__(**kwargs)self.kubernetes_conn_id=kubernetes_conn_idself.do_xcom_push=do_xcom_pushself.image=imageself.namespace=namespaceself.cmds=cmdsor[]self.arguments=argumentsor[]self.labels=labelsor{}self.startup_timeout_seconds=startup_timeout_secondsself.startup_check_interval_seconds=startup_check_interval_secondsenv_vars=convert_env_vars(env_vars)ifenv_varselse[]self.env_vars=env_varsifpod_runtime_info_envs:self.env_vars.extend([convert_pod_runtime_info_env(p)forpinpod_runtime_info_envs])self.env_from=env_fromor[]ifconfigmaps:self.env_from.extend([convert_configmap(c)forcinconfigmaps])self.ports=[convert_port(p)forpinports]ifportselse[]volume_mounts=[convert_volume_mount(v)forvinvolume_mounts]ifvolume_mountselse[]self.volume_mounts=volume_mountsvolumes=[convert_volume(volume)forvolumeinvolumes]ifvolumeselse[]self.volumes=volumesself.secrets=secretsor[]self.in_cluster=in_clusterself.cluster_context=cluster_contextself.reattach_on_restart=reattach_on_restartself.get_logs=get_logsself.container_logs=container_logsifself.container_logs==KubernetesPodOperator.BASE_CONTAINER_NAME:self.container_logs=base_container_nameorself.BASE_CONTAINER_NAMEself.image_pull_policy=image_pull_policyself.node_selector=node_selectoror{}self.annotations=annotationsor{}self.affinity=convert_affinity(affinity)ifaffinityelse{}self.container_resources=container_resourcesself.config_file=config_fileself.image_pull_secrets=convert_image_pull_secrets(image_pull_secrets)ifimage_pull_secretselse[]self.service_account_name=service_account_nameself.hostnetwork=hostnetworkself.host_aliases=host_aliasesself.tolerations=([convert_toleration(toleration)fortolerationintolerations]iftolerationselse[])self.security_context=security_contextor{}self.container_security_context=container_security_contextself.dnspolicy=dnspolicyself.dns_config=dns_configself.hostname=hostnameself.subdomain=subdomainself.schedulername=schedulernameself.full_pod_spec=full_pod_specself.init_containers=init_containersor[]self.log_events_on_failure=log_events_on_failureself.priority_class_name=priority_class_nameself.pod_template_file=pod_template_fileself.pod_template_dict=pod_template_dictself.name=self._set_name(name)self.random_name_suffix=random_name_suffixself.termination_grace_period=termination_grace_periodself.pod_request_obj:k8s.V1Pod|None=Noneself.pod:k8s.V1Pod|None=Noneself.skip_on_exit_code=(skip_on_exit_codeifisinstance(skip_on_exit_code,Container)else[skip_on_exit_code]ifskip_on_exit_codeisnotNoneelse[])self.base_container_name=base_container_nameorself.BASE_CONTAINER_NAMEself.deferrable=deferrableself.poll_interval=poll_intervalself.remote_pod:k8s.V1Pod|None=Noneself.log_pod_spec_on_failure=log_pod_spec_on_failureifis_delete_operator_podisnotNone:warnings.warn("`is_delete_operator_pod` parameter is deprecated, please use `on_finish_action`",AirflowProviderDeprecationWarning,stacklevel=2,)self.on_finish_action=(OnFinishAction.DELETE_PODifis_delete_operator_podelseOnFinishAction.KEEP_POD)self.is_delete_operator_pod=is_delete_operator_podelse:self.on_finish_action=OnFinishAction(on_finish_action)self.is_delete_operator_pod=self.on_finish_action==OnFinishAction.DELETE_PODself.termination_message_policy=termination_message_policyself.active_deadline_seconds=active_deadline_secondsself.logging_interval=logging_intervalself._config_dict:dict|None=None# TODO: remove it when removing convert_config_file_to_dictself._progress_callback=progress_callbackself.callbacks=callbacksself._killed:bool=False@cached_propertydef_incluster_namespace(self):frompathlibimportPathpath=Path("/var/run/secrets/kubernetes.io/serviceaccount/namespace")returnpath.exists()andpath.read_text()orNonedef_render_nested_template_fields(self,content:Any,context:Context,jinja_env:jinja2.Environment,seen_oids:set,)->None:ifid(content)notinseen_oids:template_fields:tuple|Noneifisinstance(content,k8s.V1EnvVar):template_fields=("value","name")elifisinstance(content,k8s.V1ResourceRequirements):template_fields=("limits","requests")elifisinstance(content,k8s.V1Volume):template_fields=("name","persistent_volume_claim","config_map")elifisinstance(content,k8s.V1VolumeMount):template_fields=("name","sub_path")elifisinstance(content,k8s.V1PersistentVolumeClaimVolumeSource):template_fields=("claim_name",)elifisinstance(content,k8s.V1ConfigMapVolumeSource):template_fields=("name",)elifisinstance(content,k8s.V1EnvFromSource):template_fields=("config_map_ref",)elifisinstance(content,k8s.V1ConfigMapEnvSource):template_fields=("name",)else:template_fields=Noneiftemplate_fields:seen_oids.add(id(content))self._do_render_template_fields(content,template_fields,context,jinja_env,seen_oids)returnsuper()._render_nested_template_fields(content,context,jinja_env,seen_oids)@staticmethoddef_get_ti_pod_labels(context:Context|None=None,include_try_number:bool=True)->dict[str,str]:""" Generate labels for the pod to track the pod in case of Operator crash. :param context: task context provided by airflow DAG :return: dict """ifnotcontext:return{}ti=context["ti"]run_id=context["run_id"]labels={"dag_id":ti.dag_id,"task_id":ti.task_id,"run_id":run_id,"kubernetes_pod_operator":"True",}map_index=ti.map_indexifmap_index>=0:labels["map_index"]=map_indexifinclude_try_number:labels.update(try_number=ti.try_number)# In the case of sub dags this is just usefulifcontext["dag"].parent_dag:labels["parent_dag_id"]=context["dag"].parent_dag.dag_id# Ensure that label is valid for Kube,# and if not truncate/remove invalid chars and replace with short hash.forlabel_id,labelinlabels.items():safe_label=pod_generator.make_safe_label_value(str(label))labels[label_id]=safe_labelreturnlabels@cached_property
[docs]deffind_pod(self,namespace:str,context:Context,*,exclude_checked:bool=True)->k8s.V1Pod|None:"""Return an already-running pod for this task instance if one exists."""label_selector=self._build_find_pod_label_selector(context,exclude_checked=exclude_checked)pod_list=self.client.list_namespaced_pod(namespace=namespace,label_selector=label_selector,).itemspod=Nonenum_pods=len(pod_list)ifnum_pods>1:raiseAirflowException(f"More than one pod running with labels {label_selector}")elifnum_pods==1:pod=pod_list[0]self.log.info("Found matching pod %s with labels %s",pod.metadata.name,pod.metadata.labels)self.log.info("`try_number` of task_instance: %s",context["ti"].try_number)self.log.info("`try_number` of pod: %s",pod.metadata.labels["try_number"])returnpod
[docs]defextract_xcom(self,pod:k8s.V1Pod):"""Retrieve xcom value and kill xcom sidecar container."""result=self.pod_manager.extract_xcom(pod)ifisinstance(result,str)andresult.rstrip()==EMPTY_XCOM_RESULT:self.log.info("xcom result file is empty.")returnNoneelse:self.log.info("xcom result: \n%s",result)returnjson.loads(result)
[docs]defexecute(self,context:Context):"""Based on the deferrable parameter runs the pod asynchronously or synchronously."""ifself.deferrable:self.execute_async(context)else:returnself.execute_sync(context)
[docs]defexecute_sync(self,context:Context):result=Nonetry:ifself.pod_request_objisNone:self.pod_request_obj=self.build_pod_request_obj(context)ifself.podisNone:self.pod=self.get_or_create_pod(# must set `self.pod` for `on_kill`pod_request_obj=self.pod_request_obj,context=context,)# push to xcom now so that if there is an error we still have the valuesti=context["ti"]ti.xcom_push(key="pod_name",value=self.pod.metadata.name)ti.xcom_push(key="pod_namespace",value=self.pod.metadata.namespace)# get remote pod for use in cleanup methodsself.remote_pod=self.find_pod(self.pod.metadata.namespace,context=context)ifself.callbacks:self.callbacks.on_pod_creation(pod=self.remote_pod,client=self.client,mode=ExecutionMode.SYNC)self.await_pod_start(pod=self.pod)ifself.callbacks:self.callbacks.on_pod_starting(pod=self.find_pod(self.pod.metadata.namespace,context=context),client=self.client,mode=ExecutionMode.SYNC,)ifself.get_logs:self.pod_manager.fetch_requested_container_logs(pod=self.pod,containers=self.container_logs,follow_logs=True,)ifnotself.get_logsor(self.container_logsisnotTrueandself.base_container_namenotinself.container_logs):self.pod_manager.await_container_completion(pod=self.pod,container_name=self.base_container_name)ifself.callbacks:self.callbacks.on_pod_completion(pod=self.find_pod(self.pod.metadata.namespace,context=context),client=self.client,mode=ExecutionMode.SYNC,)ifself.do_xcom_push:self.pod_manager.await_xcom_sidecar_container_start(pod=self.pod)result=self.extract_xcom(pod=self.pod)istio_enabled=self.is_istio_enabled(self.pod)self.remote_pod=self.pod_manager.await_pod_completion(self.pod,istio_enabled,self.base_container_name)finally:pod_to_clean=self.podorself.pod_request_objself.cleanup(pod=pod_to_clean,remote_pod=self.remote_pod,)ifself.callbacks:self.callbacks.on_pod_cleanup(pod=pod_to_clean,client=self.client,mode=ExecutionMode.SYNC)ifself.do_xcom_push:returnresult
[docs]defexecute_async(self,context:Context):self.pod_request_obj=self.build_pod_request_obj(context)self.pod=self.get_or_create_pod(# must set `self.pod` for `on_kill`pod_request_obj=self.pod_request_obj,context=context,)ifself.callbacks:self.callbacks.on_pod_creation(pod=self.find_pod(self.pod.metadata.namespace,context=context),client=self.client,mode=ExecutionMode.SYNC,)ti=context["ti"]ti.xcom_push(key="pod_name",value=self.pod.metadata.name)ti.xcom_push(key="pod_namespace",value=self.pod.metadata.namespace)self.invoke_defer_method()
[docs]definvoke_defer_method(self,last_log_time:DateTime|None=None):"""Redefine triggers which are being used in child classes."""trigger_start_time=datetime.datetime.now(tz=datetime.timezone.utc)self.defer(trigger=KubernetesPodTrigger(pod_name=self.pod.metadata.name,# type: ignore[union-attr]pod_namespace=self.pod.metadata.namespace,# type: ignore[union-attr]trigger_start_time=trigger_start_time,kubernetes_conn_id=self.kubernetes_conn_id,cluster_context=self.cluster_context,config_file=self.config_file,in_cluster=self.in_cluster,poll_interval=self.poll_interval,get_logs=self.get_logs,startup_timeout=self.startup_timeout_seconds,startup_check_interval=self.startup_check_interval_seconds,base_container_name=self.base_container_name,on_finish_action=self.on_finish_action.value,last_log_time=last_log_time,logging_interval=self.logging_interval,),method_name="trigger_reentry",)
[docs]deftrigger_reentry(self,context:Context,event:dict[str,Any])->Any:""" Point of re-entry from trigger. If ``logging_interval`` is None, then at this point, the pod should be done, and we'll just fetch the logs and exit. If ``logging_interval`` is not None, it could be that the pod is still running, and we'll just grab the latest logs and defer back to the trigger again. """self.pod=Nonetry:pod_name=event["name"]pod_namespace=event["namespace"]self.pod=self.hook.get_pod(pod_name,pod_namespace)ifnotself.pod:raisePodNotFoundException("Could not find pod after resuming from deferral")ifself.callbacksandevent["status"]!="running":self.callbacks.on_operator_resuming(pod=self.pod,event=event,client=self.client,mode=ExecutionMode.SYNC)ifevent["status"]in("error","failed","timeout"):# fetch some logs when pod is failedifself.get_logs:self.write_logs(self.pod)ifself.do_xcom_push:_=self.extract_xcom(pod=self.pod)message=event.get("stack_trace",event["message"])raiseAirflowException(message)elifevent["status"]=="running":ifself.get_logs:last_log_time=event.get("last_log_time")self.log.info("Resuming logs read from time %r",last_log_time)pod_log_status=self.pod_manager.fetch_container_logs(pod=self.pod,container_name=self.BASE_CONTAINER_NAME,follow=self.logging_intervalisNone,since_time=last_log_time,)ifpod_log_status.running:self.log.info("Container still running; deferring again.")self.invoke_defer_method(pod_log_status.last_log_time)else:self.invoke_defer_method()elifevent["status"]=="success":# fetch some logs when pod is executed successfullyifself.get_logs:self.write_logs(self.pod)ifself.do_xcom_push:xcom_sidecar_output=self.extract_xcom(pod=self.pod)returnxcom_sidecar_outputreturnexceptTaskDeferred:raisefinally:self._clean(event)
def_clean(self,event:dict[str,Any]):ifevent["status"]=="running":returnistio_enabled=self.is_istio_enabled(self.pod)# Skip await_pod_completion when the event is 'timeout' due to the pod can hang# on the ErrImagePull or ContainerCreating step and it will never completeifevent["status"]!="timeout":self.pod=self.pod_manager.await_pod_completion(self.pod,istio_enabled,self.base_container_name)ifself.podisnotNone:self.post_complete_action(pod=self.pod,remote_pod=self.pod,)@deprecated(reason="use `trigger_reentry` instead.",category=AirflowProviderDeprecationWarning)
[docs]defwrite_logs(self,pod:k8s.V1Pod):try:logs=self.pod_manager.read_pod_logs(pod=pod,container_name=self.base_container_name,follow=False,)forraw_lineinlogs:line=raw_line.decode("utf-8",errors="backslashreplace").rstrip("\n")self.log.info("Container logs: %s",line)exceptHTTPErrorase:self.log.warning("Reading of logs interrupted with error %r; will retry. ""Set log level to DEBUG for traceback.",e,)
[docs]defpost_complete_action(self,*,pod,remote_pod,**kwargs):"""Actions that must be done after operator finishes logic of the deferrable_execution."""self.cleanup(pod=pod,remote_pod=remote_pod,)ifself.callbacks:self.callbacks.on_pod_cleanup(pod=pod,client=self.client,mode=ExecutionMode.SYNC)
[docs]defcleanup(self,pod:k8s.V1Pod,remote_pod:k8s.V1Pod):# Skip cleaning the pod in the following scenarios.# 1. If a task got marked as failed, "on_kill" method would be called and the pod will be cleaned up# there. Cleaning it up again will raise an exception (which might cause retry).# 2. remote pod is null (ex: pod creation failed)ifself._killedornotremote_pod:returnistio_enabled=self.is_istio_enabled(remote_pod)pod_phase=remote_pod.status.phaseifhasattr(remote_pod,"status")elseNone# if the pod fails or success, but we don't want to delete itifpod_phase!=PodPhase.SUCCEEDEDorself.on_finish_action==OnFinishAction.KEEP_POD:self.patch_already_checked(remote_pod,reraise=False)failed=(pod_phase!=PodPhase.SUCCEEDEDandnotistio_enabled)or(istio_enabledandnotcontainer_is_succeeded(remote_pod,self.base_container_name))iffailed:ifself.log_events_on_failure:self._read_pod_events(pod,reraise=False)self.process_pod_deletion(remote_pod,reraise=False)ifself.skip_on_exit_code:container_statuses=(remote_pod.status.container_statusesifremote_podandremote_pod.statuselseNone)or[]base_container_status=next((xforxincontainer_statusesifx.name==self.base_container_name),None)exit_code=(base_container_status.state.terminated.exit_codeifbase_container_statusandbase_container_status.stateandbase_container_status.state.terminatedelseNone)ifexit_codeinself.skip_on_exit_code:raiseAirflowSkipException(f"Pod {podandpod.metadata.name} returned exit code {exit_code}. Skipping.")iffailed:error_message=get_container_termination_message(remote_pod,self.base_container_name)raiseAirflowException("\n".join(filter(None,[f"Pod {podandpod.metadata.name} returned a failure.",error_messageifisinstance(error_message,str)elseNone,f"remote_pod: {remote_pod}"ifself.log_pod_spec_on_failureelseNone,],)))
def_read_pod_events(self,pod,*,reraise=True):"""Will fetch and emit events from pod."""with_optionally_suppress(reraise=reraise):foreventinself.pod_manager.read_pod_events(pod).items:self.log.error("Pod Event: %s - %s",event.reason,event.message)
[docs]defis_istio_enabled(self,pod:V1Pod)->bool:"""Check if istio is enabled for the namespace of the pod by inspecting the namespace labels."""ifnotpod:returnFalseremote_pod=self.pod_manager.read_pod(pod)returnany(container.name==self.ISTIO_CONTAINER_NAMEforcontainerinremote_pod.spec.containers)
[docs]defkill_istio_sidecar(self,pod:V1Pod)->None:command="/bin/sh -c 'curl -fsI -X POST http://localhost:15020/quitquitquit'"command_to_container=shlex.split(command)resp=stream(self.client.connect_get_namespaced_pod_exec,name=pod.metadata.name,namespace=pod.metadata.namespace,container=self.ISTIO_CONTAINER_NAME,command=command_to_container,stderr=True,stdin=True,stdout=True,tty=False,_preload_content=False,)output=[]whileresp.is_open():ifresp.peek_stdout():output.append(resp.read_stdout())resp.close()output_str="".join(output)self.log.info("Output of curl command to kill istio: %s",output_str)resp.close()ifself.KILL_ISTIO_PROXY_SUCCESS_MSGnotinoutput_str:raiseException("Error while deleting istio-proxy sidecar: %s",output_str)
[docs]defpatch_already_checked(self,pod:k8s.V1Pod,*,reraise=True):"""Add an "already checked" annotation to ensure we don't reattach on retries."""with_optionally_suppress(reraise=reraise):self.client.patch_namespaced_pod(name=pod.metadata.name,namespace=pod.metadata.namespace,body={"metadata":{"labels":{self.POD_CHECKED_KEY:"True"}}},)
[docs]defon_kill(self)->None:self._killed=Trueifself.pod:pod=self.podkwargs={"name":pod.metadata.name,"namespace":pod.metadata.namespace,}ifself.termination_grace_periodisnotNone:kwargs.update(grace_period_seconds=self.termination_grace_period)try:self.client.delete_namespaced_pod(**kwargs)exceptkubernetes.client.exceptions.ApiException:self.log.exception("Unable to delete pod %s",self.pod.metadata.name)
[docs]defbuild_pod_request_obj(self,context:Context|None=None)->k8s.V1Pod:""" Return V1Pod object based on pod template file, full pod spec, and other operator parameters. The V1Pod attributes are derived (in order of precedence) from operator params, full pod spec, pod template file. """self.log.debug("Creating pod for KubernetesPodOperator task %s",self.task_id)ifself.pod_template_file:self.log.debug("Pod template file found, will parse for base pod")pod_template=pod_generator.PodGenerator.deserialize_model_file(self.pod_template_file)ifself.full_pod_spec:pod_template=PodGenerator.reconcile_pods(pod_template,self.full_pod_spec)elifself.pod_template_dict:self.log.debug("Pod template dict found, will parse for base pod")pod_template=pod_generator.PodGenerator.deserialize_model_dict(self.pod_template_dict)ifself.full_pod_spec:pod_template=PodGenerator.reconcile_pods(pod_template,self.full_pod_spec)elifself.full_pod_spec:pod_template=self.full_pod_specelse:pod_template=k8s.V1Pod(metadata=k8s.V1ObjectMeta())pod=k8s.V1Pod(api_version="v1",kind="Pod",metadata=k8s.V1ObjectMeta(namespace=self.namespace,labels=self.labels,name=self.name,annotations=self.annotations,),spec=k8s.V1PodSpec(node_selector=self.node_selector,affinity=self.affinity,tolerations=self.tolerations,init_containers=self.init_containers,host_aliases=self.host_aliases,containers=[k8s.V1Container(image=self.image,name=self.base_container_name,command=self.cmds,ports=self.ports,image_pull_policy=self.image_pull_policy,resources=self.container_resources,volume_mounts=self.volume_mounts,args=self.arguments,env=self.env_vars,env_from=self.env_from,security_context=self.container_security_context,termination_message_policy=self.termination_message_policy,)],image_pull_secrets=self.image_pull_secrets,service_account_name=self.service_account_name,host_network=self.hostnetwork,hostname=self.hostname,subdomain=self.subdomain,security_context=self.security_context,dns_policy=self.dnspolicy,dns_config=self.dns_config,scheduler_name=self.schedulername,restart_policy="Never",priority_class_name=self.priority_class_name,volumes=self.volumes,active_deadline_seconds=self.active_deadline_seconds,),)pod=PodGenerator.reconcile_pods(pod_template,pod)ifnotpod.metadata.name:pod.metadata.name=create_pod_id(task_id=self.task_id,unique=self.random_name_suffix,max_length=POD_NAME_MAX_LENGTH)elifself.random_name_suffix:# user has supplied pod name, we're just adding suffixpod.metadata.name=add_pod_suffix(pod_name=pod.metadata.name)ifnotpod.metadata.namespace:hook_namespace=self.hook.get_namespace()pod_namespace=self.namespaceorhook_namespaceorself._incluster_namespaceor"default"pod.metadata.namespace=pod_namespaceforsecretinself.secrets:self.log.debug("Adding secret to task %s",self.task_id)pod=secret.attach_to_pod(pod)ifself.do_xcom_push:self.log.debug("Adding xcom sidecar to task %s",self.task_id)pod=xcom_sidecar.add_xcom_sidecar(pod,sidecar_container_image=self.hook.get_xcom_sidecar_container_image(),sidecar_container_resources=self.hook.get_xcom_sidecar_container_resources(),)labels=self._get_ti_pod_labels(context)self.log.info("Building pod %s with labels: %s",pod.metadata.name,labels)# Merge Pod Identifying labels with labels passed to operatorpod.metadata.labels.update(labels)# Add Airflow Version to the label# And a label to identify that pod is launched by KubernetesPodOperatorpod.metadata.labels.update({"airflow_version":airflow_version.replace("+","-"),"airflow_kpo_in_cluster":str(self.hook.is_in_cluster),})pod_mutation_hook(pod)returnpod
[docs]defdry_run(self)->None:""" Print out the pod definition that would be created by this operator. Does not include labels specific to the task instance (since there isn't one in a dry_run) and excludes all empty elements. """pod=self.build_pod_request_obj()print(yaml.dump(prune_dict(pod.to_dict(),mode="strict")))
class_optionally_suppress(AbstractContextManager):""" Returns context manager that will swallow and log exceptions. By default swallows descendents of Exception, but you can provide other classes through the vararg ``exceptions``. Suppression behavior can be disabled with reraise=True. :meta private: """def__init__(self,*exceptions,reraise=False):self._exceptions=exceptionsor(Exception,)self.reraise=reraiseself.exception=Nonedef__enter__(self):returnselfdef__exit__(self,exctype,excinst,exctb):error=exctypeisnotNonematching_error=errorandissubclass(exctype,self._exceptions)if(errorandnotmatching_error)or(matching_errorandself.reraise):returnFalseelifmatching_error:self.exception=excinstlogger=logging.getLogger(__name__)logger.exception(excinst)returnTrue