Source code for airflow.providers.cncf.kubernetes.executors.kubernetes_executor
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""KubernetesExecutor... seealso:: For more information on how the KubernetesExecutor works, take a look at the guide: :ref:`executor:KubernetesExecutor`"""from__future__importannotationsimportcontextlibimportjsonimportloggingimportmultiprocessingimporttimefromcollectionsimportdefaultdictfromcontextlibimportsuppressfromdatetimeimportdatetimefromqueueimportEmpty,QueuefromtypingimportTYPE_CHECKING,Any,Sequencefromsqlalchemyimportselect,updatefromairflow.providers.cncf.kubernetes.pod_generatorimportPodMutationHookException,PodReconciliationErrortry:fromairflow.cli.cli_configimport(ARG_DAG_ID,ARG_EXECUTION_DATE,ARG_OUTPUT_PATH,ARG_SUBDIR,ARG_VERBOSE,ActionCommand,Arg,GroupCommand,lazy_load_command,positive_int,)exceptImportError:try:fromairflowimport__version__asairflow_versionexceptImportError:fromairflow.versionimportversionasairflow_versionimportpackaging.versionfromairflow.exceptionsimportAirflowOptionalProviderFeatureException
ifpackaging.version.parse(base_version)<packaging.version.parse("2.7.0"):raiseAirflowOptionalProviderFeatureException("Kubernetes Executor from CNCF Provider should only be used with Airflow 2.7.0+.\n"f"This is Airflow {airflow_version} and Kubernetes and CeleryKubernetesExecutor are "f"available in the 'airflow.executors' package. You should not use "f"the provider's executors in this version of Airflow.")raisefromairflow.configurationimportconffromairflow.executors.base_executorimportBaseExecutorfromairflow.providers.cncf.kubernetes.executors.kubernetes_executor_typesimportPOD_EXECUTOR_DONE_KEYfromairflow.providers.cncf.kubernetes.kube_configimportKubeConfigfromairflow.providers.cncf.kubernetes.kubernetes_helper_functionsimportannotations_to_keyfromairflow.utils.event_schedulerimportEventSchedulerfromairflow.utils.log.logging_mixinimportremove_escape_codesfromairflow.utils.sessionimportNEW_SESSION,provide_sessionfromairflow.utils.stateimportTaskInstanceStateifTYPE_CHECKING:importargparsefromkubernetesimportclientfromkubernetes.clientimportmodelsask8sfromsqlalchemy.ormimportSessionfromairflow.executors.base_executorimportCommandTypefromairflow.models.taskinstanceimportTaskInstancefromairflow.models.taskinstancekeyimportTaskInstanceKeyfromairflow.providers.cncf.kubernetes.executors.kubernetes_executor_typesimport(KubernetesJobType,KubernetesResultsType,)fromairflow.providers.cncf.kubernetes.executors.kubernetes_executor_utilsimport(AirflowKubernetesScheduler,)# CLI Args
[docs]ARG_NAMESPACE=Arg(("--namespace",),default=conf.get("kubernetes_executor","namespace"),help="Kubernetes Namespace. Default value is `[kubernetes] namespace` in configuration.",)
[docs]ARG_MIN_PENDING_MINUTES=Arg(("--min-pending-minutes",),default=30,type=positive_int(allow_zero=False),help=("Pending pods created before the time interval are to be cleaned up, ""measured in minutes. Default value is 30(m). The minimum value is 5(m)."),)
# CLI Commands
[docs]KUBERNETES_COMMANDS=(ActionCommand(name="cleanup-pods",help=("Clean up Kubernetes pods ""(created by KubernetesExecutor/KubernetesPodOperator) ""in evicted/failed/succeeded/pending states"),func=lazy_load_command("airflow.cli.commands.kubernetes_command.cleanup_pods"),args=(ARG_NAMESPACE,ARG_MIN_PENDING_MINUTES,ARG_VERBOSE),),ActionCommand(name="generate-dag-yaml",help="Generate YAML files for all tasks in DAG. Useful for debugging tasks without ""launching into a cluster",func=lazy_load_command("airflow.cli.commands.kubernetes_command.generate_pod_yaml"),args=(ARG_DAG_ID,ARG_EXECUTION_DATE,ARG_SUBDIR,ARG_OUTPUT_PATH,ARG_VERBOSE),),)
[docs]classKubernetesExecutor(BaseExecutor):"""Executor for Kubernetes."""
def__init__(self):self.kube_config=KubeConfig()self._manager=multiprocessing.Manager()self.task_queue:Queue[KubernetesJobType]=self._manager.Queue()self.result_queue:Queue[KubernetesResultsType]=self._manager.Queue()self.kube_scheduler:AirflowKubernetesScheduler|None=Noneself.kube_client:client.CoreV1Api|None=Noneself.scheduler_job_id:str|None=Noneself.event_scheduler:EventScheduler|None=Noneself.last_handled:dict[TaskInstanceKey,float]={}self.kubernetes_queue:str|None=Nonesuper().__init__(parallelism=self.kube_config.parallelism)def_list_pods(self,query_kwargs):ifself.kube_config.multi_namespace_mode:ifself.kube_config.multi_namespace_mode_namespace_list:pods=[]fornamespaceinself.kube_config.multi_namespace_mode_namespace_list:pods.extend(self.kube_client.list_namespaced_pod(namespace=namespace,**query_kwargs).items)else:pods=self.kube_client.list_pod_for_all_namespaces(**query_kwargs).itemselse:pods=self.kube_client.list_namespaced_pod(namespace=self.kube_config.kube_namespace,**query_kwargs).itemsreturnpodsdef_make_safe_label_value(self,input_value:str|datetime)->str:""" Normalize a provided label to be of valid length and characters. See airflow.providers.cncf.kubernetes.pod_generator.make_safe_label_value for more details. """# airflow.providers.cncf.kubernetes is an expensive import, locally import it here to# speed up load times of the kubernetes_executor module.fromairflow.providers.cncf.kubernetesimportpod_generatorifisinstance(input_value,datetime):returnpod_generator.datetime_to_label_safe_datestring(input_value)returnpod_generator.make_safe_label_value(input_value)@provide_session
[docs]defclear_not_launched_queued_tasks(self,session:Session=NEW_SESSION)->None:""" Clear tasks that were not yet launched, but were previously queued. Tasks can end up in a "Queued" state when a rescheduled/deferred operator comes back up for execution (with the same try_number) before the pod of its previous incarnation has been fully removed (we think). It's also possible when an executor abruptly shuts down (leaving a non-empty task_queue on that executor), but that scenario is handled via normal adoption. This method checks each of our queued tasks to see if the corresponding pod is around, and if not, and there's no matching entry in our own task_queue, marks it for re-execution. """ifTYPE_CHECKING:assertself.kube_clientfromairflow.models.taskinstanceimportTaskInstanceself.log.debug("Clearing tasks that have not been launched")query=select(TaskInstance).where(TaskInstance.state==TaskInstanceState.QUEUED,TaskInstance.queued_by_job_id==self.job_id)ifself.kubernetes_queue:query=query.where(TaskInstance.queue==self.kubernetes_queue)queued_tis:list[TaskInstance]=session.scalars(query).all()self.log.info("Found %s queued task instances",len(queued_tis))# Go through the "last seen" dictionary and clean out old entriesallowed_age=self.kube_config.worker_pods_queued_check_interval*3forkey,timestampinlist(self.last_handled.items()):iftime.time()-timestamp>allowed_age:delself.last_handled[key]fortiinqueued_tis:self.log.debug("Checking task instance %s",ti)# Check to see if we've handled it ourselves recentlyifti.keyinself.last_handled:continue# Build the pod selectorbase_label_selector=(f"dag_id={self._make_safe_label_value(ti.dag_id)},"f"task_id={self._make_safe_label_value(ti.task_id)},"f"airflow-worker={self._make_safe_label_value(str(ti.queued_by_job_id))}")ifti.map_index>=0:# Old tasks _couldn't_ be mapped, so we don't have to worry about compatbase_label_selector+=f",map_index={ti.map_index}"kwargs={"label_selector":base_label_selector}ifself.kube_config.kube_client_request_args:kwargs.update(**self.kube_config.kube_client_request_args)# Try run_id firstkwargs["label_selector"]+=",run_id="+self._make_safe_label_value(ti.run_id)pod_list=self._list_pods(kwargs)ifpod_list:continue# Fallback to old style of using execution_datekwargs["label_selector"]=f"{base_label_selector},execution_date={self._make_safe_label_value(ti.execution_date)}"pod_list=self._list_pods(kwargs)ifpod_list:continueself.log.info("TaskInstance: %s found in queued state but was not launched, rescheduling",ti)session.execute(update(TaskInstance).where(TaskInstance.dag_id==ti.dag_id,TaskInstance.task_id==ti.task_id,TaskInstance.run_id==ti.run_id,TaskInstance.map_index==ti.map_index,).values(state=TaskInstanceState.SCHEDULED))
[docs]defstart(self)->None:"""Starts the executor."""self.log.info("Start Kubernetes executor")self.scheduler_job_id=str(self.job_id)self.log.debug("Start with scheduler_job_id: %s",self.scheduler_job_id)fromairflow.providers.cncf.kubernetes.executors.kubernetes_executor_utilsimport(AirflowKubernetesScheduler,)fromairflow.providers.cncf.kubernetes.kube_clientimportget_kube_clientself.kube_client=get_kube_client()self.kube_scheduler=AirflowKubernetesScheduler(kube_config=self.kube_config,result_queue=self.result_queue,kube_client=self.kube_client,scheduler_job_id=self.scheduler_job_id,)self.event_scheduler=EventScheduler()self.event_scheduler.call_regular_interval(self.kube_config.worker_pods_queued_check_interval,self.clear_not_launched_queued_tasks,)# We also call this at startup as that's the most likely time to see# stuck queued tasksself.clear_not_launched_queued_tasks()
[docs]defexecute_async(self,key:TaskInstanceKey,command:CommandType,queue:str|None=None,executor_config:Any|None=None,)->None:"""Executes task asynchronously."""ifTYPE_CHECKING:assertself.task_queueifself.log.isEnabledFor(logging.DEBUG):self.log.debug("Add task %s with command %s, executor_config %s",key,command,executor_config)else:self.log.info("Add task %s with command %s",key,command)fromairflow.providers.cncf.kubernetes.pod_generatorimportPodGeneratortry:kube_executor_config=PodGenerator.from_obj(executor_config)exceptException:self.log.error("Invalid executor_config for %s. Executor_config: %s",key,executor_config)self.fail(key=key,info="Invalid executor_config passed")returnifexecutor_config:pod_template_file=executor_config.get("pod_template_file",None)else:pod_template_file=Noneself.event_buffer[key]=(TaskInstanceState.QUEUED,self.scheduler_job_id)self.task_queue.put((key,command,kube_executor_config,pod_template_file))# We keep a temporary local record that we've handled this so we don't# try and remove it from the QUEUED state while we process itself.last_handled[key]=time.time()
[docs]defsync(self)->None:"""Synchronize task state."""ifTYPE_CHECKING:assertself.scheduler_job_idassertself.kube_schedulerassertself.kube_configassertself.result_queueassertself.task_queueassertself.event_schedulerifself.running:self.log.debug("self.running: %s",self.running)ifself.queued_tasks:self.log.debug("self.queued: %s",self.queued_tasks)self.kube_scheduler.sync()last_resource_version:dict[str,str]=defaultdict(lambda:"0")withcontextlib.suppress(Empty):whileTrue:results=self.result_queue.get_nowait()try:key,state,pod_name,namespace,resource_version=resultslast_resource_version[namespace]=resource_versionself.log.info("Changing state of %s to %s",results,state)try:self._change_state(key,state,pod_name,namespace)exceptExceptionase:self.log.exception("Exception: %s when attempting to change state of %s to %s, re-queueing.",e,results,state,)self.result_queue.put(results)finally:self.result_queue.task_done()fromairflow.providers.cncf.kubernetes.executors.kubernetes_executor_utilsimportResourceVersionresource_instance=ResourceVersion()fornsinresource_instance.resource_version:resource_instance.resource_version[ns]=(last_resource_version[ns]orresource_instance.resource_version[ns])fromkubernetes.client.restimportApiExceptionwithcontextlib.suppress(Empty):for_inrange(self.kube_config.worker_pods_creation_batch_size):task=self.task_queue.get_nowait()try:self.kube_scheduler.run_next(task)exceptPodReconciliationErrorase:self.log.error("Pod reconciliation failed, likely due to kubernetes library upgrade. ""Try clearing the task to re-run.",exc_info=True,)self.fail(task[0],e)exceptApiExceptionase:# These codes indicate something is wrong with pod definition; otherwise we assume pod# definition is ok, and that retrying may workife.statusin(400,422):self.log.error("Pod creation failed with reason %r. Failing task",e.reason)key,_,_,_=taskself.change_state(key,TaskInstanceState.FAILED,e)else:self.log.warning("ApiException when attempting to run task, re-queueing. Reason: %r. Message: %s",e.reason,json.loads(e.body)["message"],)self.task_queue.put(task)exceptPodMutationHookExceptionase:key,_,_,_=taskself.log.error("Pod Mutation Hook failed for the task %s. Failing task. Details: %s",key,e.__cause__,)self.fail(key,e)finally:self.task_queue.task_done()# Run any pending timed eventsnext_event=self.event_scheduler.run(blocking=False)self.log.debug("Next timed event is in %f",next_event)
@provide_sessiondef_change_state(self,key:TaskInstanceKey,state:TaskInstanceState|None,pod_name:str,namespace:str,session:Session=NEW_SESSION,)->None:ifTYPE_CHECKING:assertself.kube_schedulerifstate==TaskInstanceState.RUNNING:self.event_buffer[key]=state,Nonereturnifself.kube_config.delete_worker_pods:ifstate!=TaskInstanceState.FAILEDorself.kube_config.delete_worker_pods_on_failure:self.kube_scheduler.delete_pod(pod_name=pod_name,namespace=namespace)self.log.info("Deleted pod: %s in namespace %s",str(key),str(namespace))else:self.kube_scheduler.patch_pod_executor_done(pod_name=pod_name,namespace=namespace)self.log.info("Patched pod %s in namespace %s to mark it as done",str(key),str(namespace))try:self.running.remove(key)exceptKeyError:self.log.debug("TI key not in running, not adding to event_buffer: %s",key)# If we don't have a TI state, look it up from the db. event_buffer expects the TI stateifstateisNone:fromairflow.models.taskinstanceimportTaskInstancestate=session.scalar(select(TaskInstance.state).where(TaskInstance.filter_for_tis([key])))state=TaskInstanceState(state)self.event_buffer[key]=state,None@staticmethoddef_get_pod_namespace(ti:TaskInstance):pod_override=ti.executor_config.get("pod_override")namespace=Nonewithsuppress(Exception):namespace=pod_override.metadata.namespacereturnnamespaceorconf.get("kubernetes_executor","namespace")
[docs]defget_task_log(self,ti:TaskInstance,try_number:int)->tuple[list[str],list[str]]:messages=[]log=[]try:fromairflow.providers.cncf.kubernetes.kube_clientimportget_kube_clientfromairflow.providers.cncf.kubernetes.pod_generatorimportPodGeneratorclient=get_kube_client()messages.append(f"Attempting to fetch logs from pod {ti.hostname} through kube API")selector=PodGenerator.build_selector_for_k8s_executor_pod(dag_id=ti.dag_id,task_id=ti.task_id,try_number=try_number,map_index=ti.map_index,run_id=ti.run_id,airflow_worker=ti.queued_by_job_id,)namespace=self._get_pod_namespace(ti)pod_list=client.list_namespaced_pod(namespace=namespace,label_selector=selector,).itemsifnotpod_list:raiseRuntimeError("Cannot find pod for ti %s",ti)eliflen(pod_list)>1:raiseRuntimeError("Found multiple pods for ti %s: %s",ti,pod_list)res=client.read_namespaced_pod_log(name=pod_list[0].metadata.name,namespace=namespace,container="base",follow=False,tail_lines=self.RUNNING_POD_LOG_LINES,_preload_content=False,)forlineinres:log.append(remove_escape_codes(line.decode()))iflog:messages.append("Found logs through kube API")exceptExceptionase:messages.append(f"Reading from k8s pod logs failed: {e}")returnmessages,["\n".join(log)]
[docs]deftry_adopt_task_instances(self,tis:Sequence[TaskInstance])->Sequence[TaskInstance]:# Always flush TIs without queued_by_job_idtis_to_flush=[tifortiintisifnotti.queued_by_job_id]scheduler_job_ids={ti.queued_by_job_idfortiintis}tis_to_flush_by_key={ti.key:tifortiintisifti.queued_by_job_id}kube_client:client.CoreV1Api=self.kube_clientforscheduler_job_idinscheduler_job_ids:scheduler_job_id=self._make_safe_label_value(str(scheduler_job_id))# We will look for any pods owned by the no-longer-running scheduler,# but will exclude only successful pods, as those TIs will have a terminal state# and not be up for adoption!# Those workers that failed, however, are okay to adopt here as their TI will# still be in queued.query_kwargs={"field_selector":"status.phase!=Succeeded","label_selector":("kubernetes_executor=True,"f"airflow-worker={scheduler_job_id},{POD_EXECUTOR_DONE_KEY}!=True"),}pod_list=self._list_pods(query_kwargs)forpodinpod_list:self.adopt_launched_task(kube_client,pod,tis_to_flush_by_key)self._adopt_completed_pods(kube_client)tis_to_flush.extend(tis_to_flush_by_key.values())returntis_to_flush
[docs]defcleanup_stuck_queued_tasks(self,tis:list[TaskInstance])->list[str]:""" Handle remnants of tasks that were failed because they were stuck in queued. Tasks can get stuck in queued. If such a task is detected, it will be marked as `UP_FOR_RETRY` if the task instance has remaining retries or marked as `FAILED` if it doesn't. :param tis: List of Task Instances to clean up :return: List of readable task instances for a warning message """fromairflow.providers.cncf.kubernetes.pod_generatorimportPodGeneratorifTYPE_CHECKING:assertself.kube_clientassertself.kube_schedulerreadable_tis=[]fortiintis:selector=PodGenerator.build_selector_for_k8s_executor_pod(dag_id=ti.dag_id,task_id=ti.task_id,try_number=ti.try_number,map_index=ti.map_index,run_id=ti.run_id,airflow_worker=ti.queued_by_job_id,)namespace=self._get_pod_namespace(ti)pod_list=self.kube_client.list_namespaced_pod(namespace=namespace,label_selector=selector,).itemsifnotpod_list:self.log.warning("Cannot find pod for ti %s",ti)continueeliflen(pod_list)>1:self.log.warning("Found multiple pods for ti %s: %s",ti,pod_list)continuereadable_tis.append(repr(ti))self.kube_scheduler.delete_pod(pod_name=pod_list[0].metadata.name,namespace=namespace)returnreadable_tis
[docs]defadopt_launched_task(self,kube_client:client.CoreV1Api,pod:k8s.V1Pod,tis_to_flush_by_key:dict[TaskInstanceKey,k8s.V1Pod],)->None:""" Patch existing pod so that the current KubernetesJobWatcher can monitor it via label selectors. :param kube_client: kubernetes client for speaking to kube API :param pod: V1Pod spec that we will patch with new label :param tis_to_flush_by_key: TIs that will be flushed if they aren't adopted """ifTYPE_CHECKING:assertself.scheduler_job_idself.log.info("attempting to adopt pod %s",pod.metadata.name)ti_key=annotations_to_key(pod.metadata.annotations)ifti_keynotintis_to_flush_by_key:self.log.error("attempting to adopt taskinstance which was not specified by database: %s",ti_key)returnnew_worker_id_label=self._make_safe_label_value(self.scheduler_job_id)fromkubernetes.client.restimportApiExceptiontry:kube_client.patch_namespaced_pod(name=pod.metadata.name,namespace=pod.metadata.namespace,body={"metadata":{"labels":{"airflow-worker":new_worker_id_label}}},)exceptApiExceptionase:self.log.info("Failed to adopt pod %s. Reason: %s",pod.metadata.name,e)returndeltis_to_flush_by_key[ti_key]self.running.add(ti_key)
def_adopt_completed_pods(self,kube_client:client.CoreV1Api)->None:""" Patch completed pods so that the KubernetesJobWatcher can delete them. :param kube_client: kubernetes client for speaking to kube API """ifTYPE_CHECKING:assertself.scheduler_job_idnew_worker_id_label=self._make_safe_label_value(self.scheduler_job_id)query_kwargs={"field_selector":"status.phase=Succeeded","label_selector":("kubernetes_executor=True,"f"airflow-worker!={new_worker_id_label},{POD_EXECUTOR_DONE_KEY}!=True"),}pod_list=self._list_pods(query_kwargs)forpodinpod_list:self.log.info("Attempting to adopt pod %s",pod.metadata.name)fromkubernetes.client.restimportApiExceptiontry:kube_client.patch_namespaced_pod(name=pod.metadata.name,namespace=pod.metadata.namespace,body={"metadata":{"labels":{"airflow-worker":new_worker_id_label}}},)exceptApiExceptionase:self.log.info("Failed to adopt pod %s. Reason: %s",pod.metadata.name,e)ti_id=annotations_to_key(pod.metadata.annotations)self.running.add(ti_id)def_flush_task_queue(self)->None:ifTYPE_CHECKING:assertself.task_queueself.log.debug("Executor shutting down, task_queue approximate size=%d",self.task_queue.qsize())withcontextlib.suppress(Empty):whileTrue:task=self.task_queue.get_nowait()# This is a new task to run thus ok to ignore.self.log.warning("Executor shutting down, will NOT run task=%s",task)self.task_queue.task_done()def_flush_result_queue(self)->None:ifTYPE_CHECKING:assertself.result_queueself.log.debug("Executor shutting down, result_queue approximate size=%d",self.result_queue.qsize())withcontextlib.suppress(Empty):whileTrue:results=self.result_queue.get_nowait()self.log.warning("Executor shutting down, flushing results=%s",results)try:key,state,pod_name,namespace,resource_version=resultsself.log.info("Changing state of %s to %s : resource_version=%d",results,state,resource_version)try:self._change_state(key,state,pod_name,namespace)exceptExceptionase:self.log.exception("Ignoring exception: %s when attempting to change state of %s to %s.",e,results,state,)finally:self.result_queue.task_done()
[docs]defend(self)->None:"""Called when the executor shuts down."""ifTYPE_CHECKING:assertself.task_queueassertself.result_queueassertself.kube_schedulerself.log.info("Shutting down Kubernetes executor")try:self.log.debug("Flushing task_queue...")self._flush_task_queue()self.log.debug("Flushing result_queue...")self._flush_result_queue()# Both queues should be empty...self.task_queue.join()self.result_queue.join()exceptConnectionResetError:self.log.exception("Connection Reset error while flushing task_queue and result_queue.")ifself.kube_scheduler:self.kube_scheduler.terminate()self._manager.shutdown()
[docs]defterminate(self):"""Terminate the executor is not doing anything."""
@staticmethod
[docs]defget_cli_commands()->list[GroupCommand]:return[GroupCommand(name="kubernetes",help="Tools to help run the KubernetesExecutor",subcommands=KUBERNETES_COMMANDS,)]
def_get_parser()->argparse.ArgumentParser:"""This method is used by Sphinx to generate documentation. :meta private: """returnKubernetesExecutor._get_parser()