## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""LocalExecutor.. seealso:: For more information on how the LocalExecutor works, take a look at the guide: :ref:`executor:LocalExecutor`"""importloggingimportosimportsubprocessfromabcimportabstractmethodfrommultiprocessingimportManager,Processfrommultiprocessing.managersimportSyncManagerfromqueueimportEmpty,Queue# pylint: disable=unused-import # noqa: F401fromtypingimportAny,List,Optional,Tuple,Union# pylint: disable=unused-import # noqa: F401fromsetproctitleimportsetproctitle# pylint: disable=no-name-in-modulefromairflowimportsettingsfromairflow.exceptionsimportAirflowExceptionfromairflow.executors.base_executorimportNOT_STARTED_MESSAGE,PARALLELISM,BaseExecutor,CommandTypefromairflow.models.taskinstanceimport(# pylint: disable=unused-import # noqa: F401TaskInstanceKey,TaskInstanceStateType,)fromairflow.utils.log.logging_mixinimportLoggingMixinfromairflow.utils.stateimportState# This is a work to be executed by a worker.# It can Key and Command - but it can also be None, None which is actually a# "Poison Pill" - worker seeing Poison Pill should take the pill and ... die instantly.
[docs]classLocalWorkerBase(Process,LoggingMixin):""" LocalWorkerBase implementation to run airflow commands. Executes the given command and puts the result into a result queue when done, terminating execution. :param result_queue: the queue to store result state """def__init__(self,result_queue:'Queue[TaskInstanceStateType]'):super().__init__(target=self.do_work)self.daemon:bool=Trueself.result_queue:'Queue[TaskInstanceStateType]'=result_queue
[docs]defrun(self):# We know we've just started a new process, so lets disconnect from the metadata db nowsettings.engine.pool.dispose()settings.engine.dispose()returnsuper().run()
[docs]defexecute_work(self,key:TaskInstanceKey,command:CommandType)->None:""" Executes command received and stores result state in queue. :param key: the key to identify the task instance :param command: the command to execute """ifkeyisNone:returnself.log.info("%s running %s",self.__class__.__name__,command)ifsettings.EXECUTE_TASKS_NEW_PYTHON_INTERPRETER:state=self._execute_work_in_subprocess(command)else:state=self._execute_work_in_fork(command)self.result_queue.put((key,state))
[docs]def_execute_work_in_subprocess(self,command:CommandType)->str:try:subprocess.check_call(command,close_fds=True)returnState.SUCCESSexceptsubprocess.CalledProcessErrorase:self.log.error("Failed to execute task %s.",str(e))returnState.FAILED
[docs]def_execute_work_in_fork(self,command:CommandType)->str:pid=os.fork()ifpid:# In parent, wait for the childpid,ret=os.waitpid(pid,0)returnState.SUCCESSifret==0elseState.FAILEDfromairflow.sentryimportSentryret=1try:importsignalfromairflow.cli.cli_parserimportget_parsersignal.signal(signal.SIGINT,signal.SIG_DFL)signal.signal(signal.SIGTERM,signal.SIG_DFL)signal.signal(signal.SIGUSR2,signal.SIG_DFL)parser=get_parser()# [1:] - remove "airflow" from the start of the commandargs=parser.parse_args(command[1:])args.shut_down_logging=Falsesetproctitle(f"airflow task supervisor: {command}")args.func(args)ret=0returnState.SUCCESSexceptExceptionase:# pylint: disable=broad-exceptself.log.error("Failed to execute task %s.",str(e))finally:Sentry.flush()logging.shutdown()os._exit(ret)# pylint: disable=protected-accessraiseRuntimeError('unreachable -- keep mypy happy')
@abstractmethod
[docs]defdo_work(self):"""Called in the subprocess and should then execute tasks"""raiseNotImplementedError()
[docs]classLocalWorker(LocalWorkerBase):""" Local worker that executes the task. :param result_queue: queue where results of the tasks are put. :param key: key identifying task instance :param command: Command to execute """def__init__(self,result_queue:'Queue[TaskInstanceStateType]',key:TaskInstanceKey,command:CommandType):super().__init__(result_queue)self.key:TaskInstanceKey=keyself.command:CommandType=command
[docs]classQueuedLocalWorker(LocalWorkerBase):""" LocalWorker implementation that is waiting for tasks from a queue and will continue executing commands as they become available in the queue. It will terminate execution once the poison token is found. :param task_queue: queue from which worker reads tasks :param result_queue: queue where worker puts results after finishing tasks """def__init__(self,task_queue:'Queue[ExecutorWorkType]',result_queue:'Queue[TaskInstanceStateType]'):super().__init__(result_queue=result_queue)self.task_queue=task_queue
[docs]defdo_work(self)->None:whileTrue:try:key,command=self.task_queue.get()exceptEOFError:self.log.info("Failed to read tasks from the task queue because the other ""end has closed the connection. Terminating worker %s.",self.name,)breaktry:ifkeyisNoneorcommandisNone:# Received poison pill, no more tasks to runbreakself.execute_work(key=key,command=command)finally:self.task_queue.task_done()
[docs]classLocalExecutor(BaseExecutor):""" LocalExecutor executes tasks locally in parallel. It uses the multiprocessing Python library and queues to parallelize the execution of tasks. :param parallelism: how many parallel processes are run in the executor """def__init__(self,parallelism:int=PARALLELISM):super().__init__(parallelism=parallelism)self.manager:Optional[SyncManager]=Noneself.result_queue:Optional['Queue[TaskInstanceStateType]']=Noneself.workers:List[QueuedLocalWorker]=[]self.workers_used:int=0self.workers_active:int=0self.impl:Optional[Union['LocalExecutor.UnlimitedParallelism','LocalExecutor.LimitedParallelism']]=None
[docs]classUnlimitedParallelism:""" Implements LocalExecutor with unlimited parallelism, starting one process per each command to execute. :param executor: the executor instance to implement. """def__init__(self,executor:'LocalExecutor'):self.executor:'LocalExecutor'=executor
[docs]defstart(self)->None:"""Starts the executor."""self.executor.workers_used=0self.executor.workers_active=0
# pylint: disable=unused-argument # pragma: no cover
[docs]defexecute_async(self,key:TaskInstanceKey,command:CommandType,queue:Optional[str]=None,executor_config:Optional[Any]=None,)->None:""" Executes task asynchronously. :param key: the key to identify the task instance :param command: the command to execute :param queue: Name of the queue :param executor_config: configuration for the executor """ifnotself.executor.result_queue:raiseAirflowException(NOT_STARTED_MESSAGE)local_worker=LocalWorker(self.executor.result_queue,key=key,command=command)self.executor.workers_used+=1self.executor.workers_active+=1local_worker.start()
# pylint: enable=unused-argument # pragma: no cover
[docs]defsync(self)->None:"""Sync will get called periodically by the heartbeat method."""ifnotself.executor.result_queue:raiseAirflowException("Executor should be started first")whilenotself.executor.result_queue.empty():results=self.executor.result_queue.get()self.executor.change_state(*results)self.executor.workers_active-=1
[docs]defend(self)->None:""" This method is called when the caller is done submitting job and wants to wait synchronously for the job submitted previously to be all done. """whileself.executor.workers_active>0:self.executor.sync()
[docs]classLimitedParallelism:""" Implements LocalExecutor with limited parallelism using a task queue to coordinate work distribution. :param executor: the executor instance to implement. """def__init__(self,executor:'LocalExecutor'):self.executor:'LocalExecutor'=executorself.queue:Optional['Queue[ExecutorWorkType]']=None
[docs]defexecute_async(self,key:TaskInstanceKey,command:CommandType,queue:Optional[str]=None,# pylint: disable=unused-argumentexecutor_config:Optional[Any]=None,# pylint: disable=unused-argument)->None:""" Executes task asynchronously. :param key: the key to identify the task instance :param command: the command to execute :param queue: name of the queue :param executor_config: configuration for the executor """ifnotself.queue:raiseAirflowException(NOT_STARTED_MESSAGE)self.queue.put((key,command))
[docs]defsync(self):"""Sync will get called periodically by the heartbeat method."""whileTrue:try:results=self.executor.result_queue.get_nowait()try:self.executor.change_state(*results)finally:self.executor.result_queue.task_done()exceptEmpty:break
[docs]defend(self):"""Ends the executor. Sends the poison pill to all workers."""for_inself.executor.workers:self.queue.put((None,None))# Wait for commands to finishself.queue.join()self.executor.sync()
[docs]defstart(self)->None:"""Starts the executor"""self.manager=Manager()self.result_queue=self.manager.Queue()self.workers=[]self.workers_used=0self.workers_active=0self.impl=(LocalExecutor.UnlimitedParallelism(self)ifself.parallelism==0elseLocalExecutor.LimitedParallelism(self))self.impl.start()
[docs]defsync(self)->None:"""Sync will get called periodically by the heartbeat method."""ifnotself.impl:raiseAirflowException(NOT_STARTED_MESSAGE)self.impl.sync()
[docs]defend(self)->None:""" Ends the executor. :return: """ifnotself.impl:raiseAirflowException(NOT_STARTED_MESSAGE)ifnotself.manager:raiseAirflowException(NOT_STARTED_MESSAGE)self.log.info("Shutting down LocalExecutor""; waiting for running tasks to finish. Signal again if you don't want to wait.")self.impl.end()self.manager.shutdown()