Source code for airflow.providers.cncf.kubernetes.triggers.pod
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportasyncioimportdatetimeimportwarningsfromasyncioimportCancelledErrorfromenumimportEnumfromtypingimportTYPE_CHECKING,Any,AsyncIteratorfromairflow.exceptionsimportAirflowProviderDeprecationWarningfromairflow.providers.cncf.kubernetes.hooks.kubernetesimportAsyncKubernetesHookfromairflow.providers.cncf.kubernetes.utils.pod_managerimportOnFinishAction,PodPhasefromairflow.triggers.baseimportBaseTrigger,TriggerEventifTYPE_CHECKING:fromkubernetes_asyncio.client.modelsimportV1Pod
[docs]classContainerState(str,Enum):""" Possible container states. See https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase. """
[docs]classKubernetesPodTrigger(BaseTrigger):""" KubernetesPodTrigger run on the trigger worker to check the state of Pod. :param pod_name: The name of the pod. :param pod_namespace: The namespace of the pod. :param kubernetes_conn_id: The :ref:`kubernetes connection id <howto/connection:kubernetes>` for the Kubernetes cluster. :param cluster_context: Context that points to kubernetes cluster. :param config_file: Path to kubeconfig file. :param poll_interval: Polling period in seconds to check for the status. :param trigger_start_time: time in Datetime format when the trigger was started :param in_cluster: run kubernetes client with in_cluster configuration. :param get_logs: get the stdout of the container as logs of the tasks. :param startup_timeout: timeout in seconds to start up the pod. :param on_finish_action: What to do when the pod reaches its final state, or the execution is interrupted. If "delete_pod", the pod will be deleted regardless its state; if "delete_succeeded_pod", only succeeded pod will be deleted. You can set to "keep_pod" to keep the pod. :param should_delete_pod: What to do when the pod reaches its final state, or the execution is interrupted. If True (default), delete the pod; if False, leave the pod. Deprecated - use `on_finish_action` instead. """def__init__(self,pod_name:str,pod_namespace:str,trigger_start_time:datetime.datetime,base_container_name:str,kubernetes_conn_id:str|None=None,poll_interval:float=2,cluster_context:str|None=None,config_file:str|None=None,in_cluster:bool|None=None,get_logs:bool=True,startup_timeout:int=120,on_finish_action:str="delete_pod",should_delete_pod:bool|None=None,):super().__init__()self.pod_name=pod_nameself.pod_namespace=pod_namespaceself.trigger_start_time=trigger_start_timeself.base_container_name=base_container_nameself.kubernetes_conn_id=kubernetes_conn_idself.poll_interval=poll_intervalself.cluster_context=cluster_contextself.config_file=config_fileself.in_cluster=in_clusterself.get_logs=get_logsself.startup_timeout=startup_timeoutifshould_delete_podisnotNone:warnings.warn("`should_delete_pod` parameter is deprecated, please use `on_finish_action`",AirflowProviderDeprecationWarning,)self.on_finish_action=(OnFinishAction.DELETE_PODifshould_delete_podelseOnFinishAction.KEEP_POD)self.should_delete_pod=should_delete_podelse:self.on_finish_action=OnFinishAction(on_finish_action)self.should_delete_pod=self.on_finish_action==OnFinishAction.DELETE_PODself._hook:AsyncKubernetesHook|None=Noneself._since_time=None
[docs]defserialize(self)->tuple[str,dict[str,Any]]:"""Serializes KubernetesCreatePodTrigger arguments and classpath."""return("airflow.providers.cncf.kubernetes.triggers.pod.KubernetesPodTrigger",{"pod_name":self.pod_name,"pod_namespace":self.pod_namespace,"base_container_name":self.base_container_name,"kubernetes_conn_id":self.kubernetes_conn_id,"poll_interval":self.poll_interval,"cluster_context":self.cluster_context,"config_file":self.config_file,"in_cluster":self.in_cluster,"get_logs":self.get_logs,"startup_timeout":self.startup_timeout,"trigger_start_time":self.trigger_start_time,"should_delete_pod":self.should_delete_pod,"on_finish_action":self.on_finish_action.value,},)
[docs]asyncdefrun(self)->AsyncIterator[TriggerEvent]:# type: ignore[override]"""Gets current pod status and yields a TriggerEvent."""hook=self._get_async_hook()self.log.info("Checking pod %r in namespace %r.",self.pod_name,self.pod_namespace)try:whileTrue:pod=awaithook.get_pod(name=self.pod_name,namespace=self.pod_namespace,)pod_status=pod.status.phaseself.log.debug("Pod %s status: %s",self.pod_name,pod_status)container_state=self.define_container_state(pod)self.log.debug("Container %s status: %s",self.base_container_name,container_state)ifcontainer_state==ContainerState.TERMINATED:yieldTriggerEvent({"name":self.pod_name,"namespace":self.pod_namespace,"status":"success","message":"All containers inside pod have started successfully.",})returnelifself.should_wait(pod_phase=pod_status,container_state=container_state):self.log.info("Container is not completed and still working.")ifpod_status==PodPhase.PENDINGandcontainer_state==ContainerState.UNDEFINED:delta=datetime.datetime.now(tz=datetime.timezone.utc)-self.trigger_start_timeifdelta.total_seconds()>=self.startup_timeout:message=(f"Pod took longer than {self.startup_timeout} seconds to start. ""Check the pod events in kubernetes to determine why.")yieldTriggerEvent({"name":self.pod_name,"namespace":self.pod_namespace,"status":"timeout","message":message,})returnself.log.info("Sleeping for %s seconds.",self.poll_interval)awaitasyncio.sleep(self.poll_interval)else:yieldTriggerEvent({"name":self.pod_name,"namespace":self.pod_namespace,"status":"failed","message":pod.status.message,})returnexceptCancelledError:# That means that task was marked as failedifself.get_logs:self.log.info("Outputting container logs...")awaitself._get_async_hook().read_logs(name=self.pod_name,namespace=self.pod_namespace,)ifself.on_finish_action==OnFinishAction.DELETE_POD:self.log.info("Deleting pod...")awaitself._get_async_hook().delete_pod(name=self.pod_name,namespace=self.pod_namespace,)yieldTriggerEvent({"name":self.pod_name,"namespace":self.pod_namespace,"status":"cancelled","message":"Pod execution was cancelled",})exceptExceptionase:self.log.exception("Exception occurred while checking pod phase:")yieldTriggerEvent({"name":self.pod_name,"namespace":self.pod_namespace,"status":"error","message":str(e),})