Source code for airflow.providers.cncf.kubernetes.utils.pod_manager
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Launches PODs"""from__future__importannotationsimportjsonimportmathimporttimeimportwarningsfromcontextlibimportclosing,suppressfromdataclassesimportdataclassfromdatetimeimportdatetimefromtypingimportTYPE_CHECKING,Iterable,castimportpendulumimporttenacityfromkubernetesimportclient,watchfromkubernetes.client.models.v1_podimportV1Podfromkubernetes.client.restimportApiExceptionfromkubernetes.streamimportstreamaskubernetes_streamfrompendulumimportDateTimefrompendulum.parsing.exceptionsimportParserErrorfromurllib3.exceptionsimportHTTPErrorasBaseHTTPErrorfromairflow.exceptionsimportAirflowExceptionfromairflow.kubernetes.kube_clientimportget_kube_clientfromairflow.kubernetes.pod_generatorimportPodDefaultsfromairflow.utils.log.logging_mixinimportLoggingMixinifTYPE_CHECKING:fromkubernetes.client.models.core_v1_event_listimportCoreV1EventList
[docs]classPodLaunchFailedException(AirflowException):"""When pod launching fails in KubernetesPodOperator."""
[docs]defshould_retry_start_pod(exception:BaseException)->bool:"""Check if an Exception indicates a transient error and warrants retrying"""ifisinstance(exception,ApiException):returnexception.status==409returnFalse
[docs]classPodPhase:""" Possible pod phases See https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase. """
[docs]defcontainer_is_running(pod:V1Pod,container_name:str)->bool:""" Examines V1Pod ``pod`` to determine whether ``container_name`` is running. If that container is present and running, returns True. Returns False otherwise. """container_statuses=pod.status.container_statusesifpodandpod.statuselseNoneifnotcontainer_statuses:returnFalsecontainer_status=next((xforxincontainer_statusesifx.name==container_name),None)ifnotcontainer_status:returnFalsereturncontainer_status.state.runningisnotNone
[docs]classPodManager(LoggingMixin):""" Helper class for creating, monitoring, and otherwise interacting with Kubernetes pods for use with the KubernetesPodOperator """def__init__(self,kube_client:client.CoreV1Api=None,in_cluster:bool=True,cluster_context:str|None=None,):""" Creates the launcher. :param kube_client: kubernetes client :param in_cluster: whether we are in cluster :param cluster_context: context of the cluster """super().__init__()ifkube_client:self._client=kube_clientelse:self._client=get_kube_client(in_cluster=in_cluster,cluster_context=cluster_context)warnings.warn("`kube_client` not supplied to PodManager. ""This will be a required argument in a future release. ""Please use KubernetesHook to create the client before calling.",DeprecationWarning,)self._watch=watch.Watch()
[docs]defrun_pod_async(self,pod:V1Pod,**kwargs)->V1Pod:"""Runs POD asynchronously"""sanitized_pod=self._client.api_client.sanitize_for_serialization(pod)json_pod=json.dumps(sanitized_pod,indent=2)self.log.debug("Pod Creation Request: \n%s",json_pod)try:resp=self._client.create_namespaced_pod(body=sanitized_pod,namespace=pod.metadata.namespace,**kwargs)self.log.debug("Pod Creation Response: %s",resp)exceptExceptionase:self.log.exception("Exception when attempting to create Namespaced Pod: %s",str(json_pod).replace("\n"," "))raiseereturnresp
[docs]defdelete_pod(self,pod:V1Pod)->None:"""Deletes POD"""try:self._client.delete_namespaced_pod(pod.metadata.name,pod.metadata.namespace,body=client.V1DeleteOptions())exceptApiExceptionase:# If the pod is already deletedife.status!=404:raise
[docs])defcreate_pod(self,pod:V1Pod)->V1Pod:"""Launches the pod asynchronously."""returnself.run_pod_async(pod)
[docs]defawait_pod_start(self,pod:V1Pod,startup_timeout:int=120)->None:""" Waits for the pod to reach phase other than ``Pending`` :param pod: :param startup_timeout: Timeout (in seconds) for startup of the pod (if pod is pending for too long, fails task) :return: """curr_time=datetime.now()whileTrue:remote_pod=self.read_pod(pod)ifremote_pod.status.phase!=PodPhase.PENDING:breakself.log.warning("Pod not yet started: %s",pod.metadata.name)delta=datetime.now()-curr_timeifdelta.total_seconds()>=startup_timeout:msg=(f"Pod took longer than {startup_timeout} seconds to start. ""Check the pod events in kubernetes to determine why.")raisePodLaunchFailedException(msg)time.sleep(1)
[docs]deffollow_container_logs(self,pod:V1Pod,container_name:str)->PodLoggingStatus:warnings.warn("Method `follow_container_logs` is deprecated. Use `fetch_container_logs` instead""with option `follow=True`.",DeprecationWarning,)returnself.fetch_container_logs(pod=pod,container_name=container_name,follow=True)
[docs]deffetch_container_logs(self,pod:V1Pod,container_name:str,*,follow=False,since_time:DateTime|None=None)->PodLoggingStatus:""" Follows the logs of container and streams to airflow logging. Returns when container exits. """defconsume_logs(*,since_time:DateTime|None=None,follow:bool=True)->DateTime|None:""" Tries to follow container logs until container completes. For a long-running container, sometimes the log read may be interrupted Such errors of this kind are suppressed. Returns the last timestamp observed in logs. """timestamp=Nonetry:logs=self.read_pod_logs(pod=pod,container_name=container_name,timestamps=True,since_seconds=(math.ceil((pendulum.now()-since_time).total_seconds())ifsince_timeelseNone),follow=follow,)forraw_lineinlogs:line=raw_line.decode("utf-8",errors="backslashreplace")timestamp,message=self.parse_log_line(line)self.log.info(message)exceptBaseHTTPErrorase:self.log.warning("Reading of logs interrupted with error %r; will retry. ""Set log level to DEBUG for traceback.",e,)self.log.debug("Traceback for interrupted logs read for pod %r",pod.metadata.name,exc_info=True,)returntimestamporsince_time# note: `read_pod_logs` follows the logs, so we shouldn't necessarily *need* to# loop as we do here. But in a long-running process we might temporarily lose connectivity.# So the looping logic is there to let us resume following the logs.last_log_time=since_timewhileTrue:last_log_time=consume_logs(since_time=last_log_time,follow=follow)ifnotself.container_is_running(pod,container_name=container_name):returnPodLoggingStatus(running=False,last_log_time=last_log_time)ifnotfollow:returnPodLoggingStatus(running=True,last_log_time=last_log_time)else:self.log.warning("Pod %s log read interrupted but container %s still running",pod.metadata.name,container_name,)time.sleep(1)
[docs]defawait_pod_completion(self,pod:V1Pod)->V1Pod:""" Monitors a pod and returns the final state :param pod: pod spec that will be monitored :return: tuple[State, str | None] """whileTrue:remote_pod=self.read_pod(pod)ifremote_pod.status.phaseinPodPhase.terminal_states:breakself.log.info("Pod %s has phase %s",pod.metadata.name,remote_pod.status.phase)time.sleep(2)returnremote_pod
[docs]defparse_log_line(self,line:str)->tuple[DateTime|None,str]:""" Parse K8s log line and returns the final state :param line: k8s log line :return: timestamp and log message """split_at=line.find(" ")ifsplit_at==-1:self.log.error("Error parsing timestamp (no timestamp in message %r). ""Will continue execution but won't update timestamp",line,)returnNone,linetimestamp=line[:split_at]message=line[split_at+1:].rstrip()try:last_log_time=cast(DateTime,pendulum.parse(timestamp))exceptParserError:self.log.error("Error parsing timestamp. Will continue execution but won't update timestamp")returnNone,linereturnlast_log_time,message
[docs]defcontainer_is_running(self,pod:V1Pod,container_name:str)->bool:"""Reads pod and checks if container is running"""remote_pod=self.read_pod(pod)returncontainer_is_running(pod=remote_pod,container_name=container_name)
[docs]defread_pod_logs(self,pod:V1Pod,container_name:str,tail_lines:int|None=None,timestamps:bool=False,since_seconds:int|None=None,follow=True,)->Iterable[bytes]:"""Reads log from the POD"""additional_kwargs={}ifsince_seconds:additional_kwargs["since_seconds"]=since_secondsiftail_lines:additional_kwargs["tail_lines"]=tail_linestry:returnself._client.read_namespaced_pod_log(name=pod.metadata.name,namespace=pod.metadata.namespace,container=container_name,follow=follow,timestamps=timestamps,_preload_content=False,**additional_kwargs,)exceptBaseHTTPError:self.log.exception("There was an error reading the kubernetes API.")raise
[docs]defread_pod_events(self,pod:V1Pod)->CoreV1EventList:"""Reads events from the POD"""try:returnself._client.list_namespaced_event(namespace=pod.metadata.namespace,field_selector=f"involvedObject.name={pod.metadata.name}")exceptBaseHTTPErrorase:raiseAirflowException(f"There was an error reading the kubernetes API: {e}")
[docs]defread_pod(self,pod:V1Pod)->V1Pod:"""Read POD information"""try:returnself._client.read_namespaced_pod(pod.metadata.name,pod.metadata.namespace)exceptBaseHTTPErrorase:raiseAirflowException(f"There was an error reading the kubernetes API: {e}")
[docs]defawait_xcom_sidecar_container_start(self,pod:V1Pod)->None:self.log.info("Checking if xcom sidecar container is started.")warned=FalsewhileTrue:ifself.container_is_running(pod,PodDefaults.SIDECAR_CONTAINER_NAME):self.log.info("The xcom sidecar container is started.")breakifnotwarned:self.log.warning("The xcom sidecar container is not yet started.")warned=Truetime.sleep(1)
[docs]defextract_xcom(self,pod:V1Pod)->str:"""Retrieves XCom value and kills xcom sidecar container"""withclosing(kubernetes_stream(self._client.connect_get_namespaced_pod_exec,pod.metadata.name,pod.metadata.namespace,container=PodDefaults.SIDECAR_CONTAINER_NAME,command=["/bin/sh"],stdin=True,stdout=True,stderr=True,tty=False,_preload_content=False,))asresp:result=self._exec_pod_command(resp,f"if [ -s {PodDefaults.XCOM_MOUNT_PATH}/return.json ]; then cat {PodDefaults.XCOM_MOUNT_PATH}/return.json; else echo __airflow_xcom_result_empty__; fi",# noqa)self._exec_pod_command(resp,"kill -s SIGINT 1")ifresultisNone:raiseAirflowException(f"Failed to extract xcom from pod: {pod.metadata.name}")returnresult
def_exec_pod_command(self,resp,command:str)->str|None:res=Noneifresp.is_open():self.log.info("Running command... %s\n",command)resp.write_stdin(command+"\n")whileresp.is_open():resp.update(timeout=1)whileresp.peek_stdout():res=res+resp.read_stdout()ifreselseresp.read_stdout()error_res=Nonewhileresp.peek_stderr():error_res=error_res+resp.read_stderr()iferror_reselseresp.read_stderr()iferror_res:self.log.info("stderr from command: %s",error_res)breakifres:returnresreturnres