Source code for airflow.providers.cncf.kubernetes.pod_launcher_deprecated
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Launches pods."""from__future__importannotationsimportjsonimportmathimporttimeimportwarningsfromtypingimportTYPE_CHECKINGimportpendulumimporttenacityfromkubernetesimportclient,watchfromkubernetes.client.restimportApiExceptionfromkubernetes.streamimportstreamaskubernetes_streamfromrequests.exceptionsimportHTTPErrorfromairflow.exceptionsimportAirflowException,RemovedInAirflow3Warningfromairflow.providers.cncf.kubernetes.kube_clientimportget_kube_clientfromairflow.providers.cncf.kubernetes.pod_generatorimportPodDefaultsfromairflow.settingsimportpod_mutation_hookfromairflow.utils.log.logging_mixinimportLoggingMixinfromairflow.utils.stateimportStateifTYPE_CHECKING:fromkubernetes.client.models.v1_podimportV1Podwarnings.warn(""" Please use :mod: Please use `airflow.providers.cncf.kubernetes.utils.pod_manager` To use this module install the provider package by installing this pip package: https://pypi.org/project/apache-airflow-providers-cncf-kubernetes/ """,RemovedInAirflow3Warning,stacklevel=2,)
[docs]classPodLauncher(LoggingMixin):""" Deprecated class for launching pods. Please use airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager instead. """def__init__(self,kube_client:client.CoreV1Api=None,in_cluster:bool=True,cluster_context:str|None=None,extract_xcom:bool=False,):""" Launch pods; DEPRECATED. Please use airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager instead to create the launcher. :param kube_client: kubernetes client :param in_cluster: whether we are in cluster :param cluster_context: context of the cluster :param extract_xcom: whether we should extract xcom """super().__init__()self._client=kube_clientorget_kube_client(in_cluster=in_cluster,cluster_context=cluster_context)self._watch=watch.Watch()self.extract_xcom=extract_xcom
[docs]defrun_pod_async(self,pod:V1Pod,**kwargs):"""Run pod asynchronously."""pod_mutation_hook(pod)sanitized_pod=self._client.api_client.sanitize_for_serialization(pod)json_pod=json.dumps(sanitized_pod,indent=2)self.log.debug("Pod Creation Request: \n%s",json_pod)try:resp=self._client.create_namespaced_pod(body=sanitized_pod,namespace=pod.metadata.namespace,**kwargs)self.log.debug("Pod Creation Response: %s",resp)exceptExceptionase:self.log.exception("Exception when attempting to create Namespaced Pod: %s",json_pod)raiseereturnresp
[docs]defdelete_pod(self,pod:V1Pod):"""Delete pod."""try:self._client.delete_namespaced_pod(pod.metadata.name,pod.metadata.namespace,body=client.V1DeleteOptions())exceptApiExceptionase:# If the pod is already deletedife.status!=404:raise
[docs]defstart_pod(self,pod:V1Pod,startup_timeout:int=120):""" Launch the pod synchronously and wait for completion. :param pod: :param startup_timeout: Timeout for startup of the pod (if pod is pending for too long, fails task) :return: """resp=self.run_pod_async(pod)start_time=time.monotonic()ifresp.status.start_timeisNone:whileself.pod_not_started(pod):self.log.warning("Pod not yet started: %s",pod.metadata.name)iftime.monotonic()>=start_time+startup_timeout:raiseAirflowException("Pod took too long to start")time.sleep(1)
[docs]defmonitor_pod(self,pod:V1Pod,get_logs:bool)->tuple[State,str|None]:""" Monitor a pod and return the final state. :param pod: pod spec that will be monitored :param get_logs: whether to read the logs locally """ifget_logs:read_logs_since_sec=Nonelast_log_time=NonewhileTrue:logs=self.read_pod_logs(pod,timestamps=True,since_seconds=read_logs_since_sec)forlineinlogs:timestamp,message=self.parse_log_line(line.decode("utf-8"))iftimestamp:last_log_time=pendulum.parse(timestamp)self.log.info(message)time.sleep(1)ifnotself.base_container_is_running(pod):breakself.log.warning("Pod %s log read interrupted",pod.metadata.name)iflast_log_time:delta=pendulum.now()-last_log_time# Prefer logs duplication rather than lossread_logs_since_sec=math.ceil(delta.total_seconds())result=Noneifself.extract_xcom:whileself.base_container_is_running(pod):self.log.info("Container %s has state %s",pod.metadata.name,State.RUNNING)time.sleep(2)result=self._extract_xcom(pod)self.log.info(result)result=json.loads(result)whileself.pod_is_running(pod):self.log.info("Pod %s has state %s",pod.metadata.name,State.RUNNING)time.sleep(2)returnself._task_status(self.read_pod(pod)),result
[docs]defparse_log_line(self,line:str)->tuple[str|None,str]:""" Parse K8s log line and returns the final state. :param line: k8s log line :return: timestamp and log message """timestamp,sep,message=line.strip().partition(" ")ifnotsep:self.log.error("Error parsing timestamp (no timestamp in message: %r). ""Will continue execution but won't update timestamp",line,)returnNone,linereturntimestamp,message
def_task_status(self,event):self.log.info("Event: %s had an event of type %s",event.metadata.name,event.status.phase)status=self.process_status(event.metadata.name,event.status.phase)returnstatus
[docs]defpod_not_started(self,pod:V1Pod):"""Test if pod has not started."""state=self._task_status(self.read_pod(pod))returnstate==State.QUEUED
[docs]defpod_is_running(self,pod:V1Pod):"""Test if pod is running."""state=self._task_status(self.read_pod(pod))returnstatenotin(State.SUCCESS,State.FAILED)
[docs]defbase_container_is_running(self,pod:V1Pod):"""Test if base container is running."""event=self.read_pod(pod)status=next((sforsinevent.status.container_statusesifs.name=="base"),None)ifnotstatus:returnFalsereturnstatus.state.runningisnotNone
[docs]defread_pod_logs(self,pod:V1Pod,tail_lines:int|None=None,timestamps:bool=False,since_seconds:int|None=None,):"""Read log from the pod."""additional_kwargs={}ifsince_seconds:additional_kwargs["since_seconds"]=since_secondsiftail_lines:additional_kwargs["tail_lines"]=tail_linestry:returnself._client.read_namespaced_pod_log(name=pod.metadata.name,namespace=pod.metadata.namespace,container="base",follow=True,timestamps=timestamps,_preload_content=False,**additional_kwargs,)exceptHTTPErrorase:raiseAirflowException(f"There was an error reading the kubernetes API: {e}")
[docs]defread_pod_events(self,pod):"""Read events from the pod."""try:returnself._client.list_namespaced_event(namespace=pod.metadata.namespace,field_selector=f"involvedObject.name={pod.metadata.name}")exceptHTTPErrorase:raiseAirflowException(f"There was an error reading the kubernetes API: {e}")
[docs]defread_pod(self,pod:V1Pod):"""Read pod information."""try:returnself._client.read_namespaced_pod(pod.metadata.name,pod.metadata.namespace)exceptHTTPErrorase:raiseAirflowException(f"There was an error reading the kubernetes API: {e}")
def_extract_xcom(self,pod:V1Pod):resp=kubernetes_stream(self._client.connect_get_namespaced_pod_exec,pod.metadata.name,pod.metadata.namespace,container=PodDefaults.SIDECAR_CONTAINER_NAME,command=["/bin/sh"],stdin=True,stdout=True,stderr=True,tty=False,_preload_content=False,)try:result=self._exec_pod_command(resp,f"cat {PodDefaults.XCOM_MOUNT_PATH}/return.json")self._exec_pod_command(resp,"kill -s SIGINT 1")finally:resp.close()ifresultisNone:raiseAirflowException(f"Failed to extract xcom from pod: {pod.metadata.name}")returnresultdef_exec_pod_command(self,resp,command):ifresp.is_open():self.log.info("Running command... %s\n",command)resp.write_stdin(command+"\n")whileresp.is_open():resp.update(timeout=1)ifresp.peek_stdout():returnresp.read_stdout()ifresp.peek_stderr():self.log.info(resp.read_stderr())breakreturnNone
[docs]defprocess_status(self,job_id,status):"""Process status information for the job."""status=status.lower()ifstatus==PodStatus.PENDING:returnState.QUEUEDelifstatus==PodStatus.FAILED:self.log.error("Event with job id %s Failed",job_id)returnState.FAILEDelifstatus==PodStatus.SUCCEEDED:self.log.info("Event with job id %s Succeeded",job_id)returnState.SUCCESSelifstatus==PodStatus.RUNNING:returnState.RUNNINGelse:self.log.error("Event: Invalid state %s on job %s",status,job_id)returnState.FAILED