## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""DebugExecutor.. seealso:: For more information on how the DebugExecutor works, take a look at the guide: :ref:`executor:DebugExecutor`"""importthreadingfromtypingimportAny,Dict,List,Optionalfromairflow.configurationimportconffromairflow.executors.base_executorimportBaseExecutorfromairflow.models.taskinstanceimportTaskInstance,TaskInstanceKeyfromairflow.utils.stateimportState
[docs]classDebugExecutor(BaseExecutor):""" This executor is meant for debugging purposes. It can be used with SQLite. It executes one task instance at time. Additionally to support working with sensors, all sensors ``mode`` will be automatically set to "reschedule". """
def__init__(self):super().__init__()self.tasks_to_run:List[TaskInstance]=[]# Place where we keep information for task instance raw runself.tasks_params:Dict[TaskInstanceKey,Dict[str,Any]]={}self.fail_fast=conf.getboolean("debug","fail_fast")
"""The method is replaced by custom trigger_task implementation."""
[docs]defsync(self)->None:task_succeeded=Truewhileself.tasks_to_run:ti=self.tasks_to_run.pop(0)ifself.fail_fastandnottask_succeeded:self.log.info("Setting %s to %s",ti.key,State.UPSTREAM_FAILED)ti.set_state(State.UPSTREAM_FAILED)self.change_state(ti.key,State.UPSTREAM_FAILED)continueifself._terminated.is_set():self.log.info("Executor is terminated! Stopping %s to %s",ti.key,State.FAILED)ti.set_state(State.FAILED)self.change_state(ti.key,State.FAILED)ti._run_finished_callback()continuetask_succeeded=self._run_task(ti)
[docs]def_run_task(self,ti:TaskInstance)->bool:self.log.debug("Executing task: %s",ti)key=ti.keytry:params=self.tasks_params.pop(ti.key,{})ti._run_raw_task(job_id=ti.job_id,**params)self.change_state(key,State.SUCCESS)ti._run_finished_callback()returnTrueexceptExceptionase:ti.set_state(State.FAILED)self.change_state(key,State.FAILED)ti._run_finished_callback()self.log.exception("Failed to execute task: %s.",str(e))returnFalse
[docs]defqueue_task_instance(self,task_instance:TaskInstance,mark_success:bool=False,pickle_id:Optional[str]=None,ignore_all_deps:bool=False,ignore_depends_on_past:bool=False,ignore_task_deps:bool=False,ignore_ti_state:bool=False,pool:Optional[str]=None,cfg_path:Optional[str]=None,)->None:"""Queues task instance with empty command because we do not need it."""self.queue_command(task_instance,[str(task_instance)],# Just for better logging, it's not used anywherepriority=task_instance.task.priority_weight_total,queue=task_instance.task.queue,)# Save params for TaskInstance._run_raw_taskself.tasks_params[task_instance.key]={"mark_success":mark_success,"pool":pool,
}
[docs]deftrigger_tasks(self,open_slots:int)->None:""" Triggers tasks. Instead of calling exec_async we just add task instance to tasks_to_run queue. :param open_slots: Number of open slots """sorted_queue=sorted(((k,v)fork,vinself.queued_tasks.items()),# pylint: disable=unnecessary-comprehensionkey=lambdax:x[1][1],reverse=True,)for_inrange(min((open_slots,len(self.queued_tasks)))):key,(_,_,_,ti)=sorted_queue.pop(0)self.queued_tasks.pop(key)self.running.add(key)self.tasks_to_run.append(ti)# type: ignore
[docs]defend(self)->None:""" When the method is called we just set states of queued tasks to UPSTREAM_FAILED marking them as not executed. """fortiinself.tasks_to_run:self.log.info("Setting %s to %s",ti.key,State.UPSTREAM_FAILED)ti.set_state(State.UPSTREAM_FAILED)self.change_state(ti.key,State.UPSTREAM_FAILED)