## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportdatetimeimportfunctoolsimporthashlibimportloggingimporttimeimporttracebackfromdatetimeimporttimedeltafromtypingimportAny,Callable,Iterablefromairflowimportsettingsfromairflow.configurationimportconffromairflow.exceptionsimport(AirflowException,AirflowFailException,AirflowRescheduleException,AirflowSensorTimeout,AirflowSkipException,AirflowTaskTimeout,)fromairflow.executors.executor_loaderimportExecutorLoaderfromairflow.models.baseoperatorimportBaseOperatorfromairflow.models.skipmixinimportSkipMixinfromairflow.models.taskrescheduleimportTaskReschedulefromairflow.ti_deps.deps.ready_to_rescheduleimportReadyToRescheduleDepfromairflow.utilsimporttimezonefromairflow.utils.contextimportContext# We need to keep the import here because GCSToLocalFilesystemOperator released in# Google Provider before 3.0.0 imported apply_defaults from here.# See https://github.com/apache/airflow/issues/16035fromairflow.utils.decoratorsimportapply_defaults# noqa: F401# As documented in https://dev.mysql.com/doc/refman/5.7/en/datetime.html._MYSQL_TIMESTAMP_MAX=datetime.datetime(2038,1,19,3,14,7,tzinfo=timezone.utc)@functools.lru_cache(maxsize=None)def_is_metadatabase_mysql()->bool:ifsettings.engineisNone:raiseAirflowException("Must initialize ORM first")returnsettings.engine.url.get_backend_name()=="mysql"
[docs]classPokeReturnValue:""" Optional return value for poke methods. Sensors can optionally return an instance of the PokeReturnValue class in the poke method. If an XCom value is supplied when the sensor is done, then the XCom value will be pushed through the operator return value. :param is_done: Set to true to indicate the sensor can stop poking. :param xcom_value: An optional XCOM value to be returned by the operator. """def__init__(self,is_done:bool,xcom_value:Any|None=None)->None:self.xcom_value=xcom_valueself.is_done=is_done
[docs]classBaseSensorOperator(BaseOperator,SkipMixin):""" Sensor operators are derived from this class and inherit these attributes. Sensor operators keep executing at a time interval and succeed when a criteria is met and fail if and when they time out. :param soft_fail: Set to true to mark the task as SKIPPED on failure :param poke_interval: Time in seconds that the job should wait in between each try :param timeout: Time, in seconds before the task times out and fails. :param mode: How the sensor operates. Options are: ``{ poke | reschedule }``, default is ``poke``. When set to ``poke`` the sensor is taking up a worker slot for its whole execution time and sleeps between pokes. Use this mode if the expected runtime of the sensor is short or if a short poke interval is required. Note that the sensor will hold onto a worker slot and a pool slot for the duration of the sensor's runtime in this mode. When set to ``reschedule`` the sensor task frees the worker slot when the criteria is not yet met and it's rescheduled at a later time. Use this mode if the time before the criteria is met is expected to be quite long. The poke interval should be more than one minute to prevent too much load on the scheduler. :param exponential_backoff: allow progressive longer waits between pokes by using exponential backoff algorithm :param max_wait: maximum wait interval between pokes, can be ``timedelta`` or ``float`` seconds :param silent_fail: If true, and poke method raises an exception different from AirflowSensorTimeout, AirflowTaskTimeout, AirflowSkipException and AirflowFailException, the sensor will log the error and continue its execution. Otherwise, the sensor task fails, and it can be retried based on the provided `retries` parameter. """
def__init__(self,*,poke_interval:float=60,timeout:float=conf.getfloat("sensors","default_timeout"),soft_fail:bool=False,mode:str="poke",exponential_backoff:bool=False,max_wait:timedelta|float|None=None,silent_fail:bool=False,**kwargs,)->None:super().__init__(**kwargs)self.poke_interval=poke_intervalself.soft_fail=soft_failself.timeout=timeoutself.mode=modeself.exponential_backoff=exponential_backoffself.max_wait=self._coerce_max_wait(max_wait)self.silent_fail=silent_failself._validate_input_values()@staticmethoddef_coerce_max_wait(max_wait:float|timedelta|None)->timedelta|None:ifmax_waitisNoneorisinstance(max_wait,timedelta):returnmax_waitifisinstance(max_wait,(int,float))andmax_wait>=0:returntimedelta(seconds=max_wait)raiseAirflowException("Operator arg `max_wait` must be timedelta object or a non-negative number")def_validate_input_values(self)->None:ifnotisinstance(self.poke_interval,(int,float))orself.poke_interval<0:raiseAirflowException("The poke_interval must be a non-negative number")ifnotisinstance(self.timeout,(int,float))orself.timeout<0:raiseAirflowException("The timeout must be a non-negative number")ifself.modenotinself.valid_modes:raiseAirflowException(f"The mode must be one of {self.valid_modes},'{self.dag.dag_idifself.has_dag()else''} "f".{self.task_id}'; received '{self.mode}'.")# Quick check for poke_interval isn't immediately over MySQL's TIMESTAMP limit.# This check is only rudimentary to catch trivial user errors, e.g. mistakenly# set the value to milliseconds instead of seconds. There's another check when# we actually try to reschedule to ensure database coherence.ifself.rescheduleand_is_metadatabase_mysql():iftimezone.utcnow()+datetime.timedelta(seconds=self.poke_interval)>_MYSQL_TIMESTAMP_MAX:raiseAirflowException(f"Cannot set poke_interval to {self.poke_interval} seconds in reschedule "f"mode since it will take reschedule time over MySQL's TIMESTAMP limit.")
[docs]defpoke(self,context:Context)->bool|PokeReturnValue:"""Function defined by the sensors while deriving this class should override."""raiseAirflowException("Override me.")
[docs]defexecute(self,context:Context)->Any:started_at:datetime.datetime|floatifself.reschedule:# If reschedule, use the start date of the first try (first try can be either the very# first execution of the task, or the first execution after the task was cleared.)first_try_number=context["ti"].max_tries-self.retries+1task_reschedules=TaskReschedule.find_for_task_instance(context["ti"],try_number=first_try_number)ifnottask_reschedules:start_date=timezone.utcnow()else:start_date=task_reschedules[0].start_datestarted_at=start_datedefrun_duration()->float:# If we are in reschedule mode, then we have to compute diff# based on the time in a DB, so can't use time.monotonicreturn(timezone.utcnow()-start_date).total_seconds()else:started_at=start_monotonic=time.monotonic()defrun_duration()->float:returntime.monotonic()-start_monotonictry_number=1log_dag_id=self.dag.dag_idifself.has_dag()else""xcom_value=NonewhileTrue:try:poke_return=self.poke(context)except(AirflowSensorTimeout,AirflowTaskTimeout,AirflowSkipException,AirflowFailException,)ase:raiseeexceptExceptionase:ifself.silent_fail:logging.error("Sensor poke failed: \n%s",traceback.format_exc())poke_return=Falseelse:raiseeifpoke_return:ifisinstance(poke_return,PokeReturnValue):xcom_value=poke_return.xcom_valuebreakifrun_duration()>self.timeout:# If sensor is in soft fail mode but times out raise AirflowSkipException.message=(f"Sensor has timed out; run duration of {run_duration()} seconds exceeds "f"the specified timeout of {self.timeout}.")ifself.soft_fail:raiseAirflowSkipException(message)else:raiseAirflowSensorTimeout(message)ifself.reschedule:next_poke_interval=self._get_next_poke_interval(started_at,run_duration,try_number)reschedule_date=timezone.utcnow()+timedelta(seconds=next_poke_interval)if_is_metadatabase_mysql()andreschedule_date>_MYSQL_TIMESTAMP_MAX:raiseAirflowSensorTimeout(f"Cannot reschedule DAG {log_dag_id} to {reschedule_date.isoformat()} "f"since it is over MySQL's TIMESTAMP storage limit.")raiseAirflowRescheduleException(reschedule_date)else:time.sleep(self._get_next_poke_interval(started_at,run_duration,try_number))try_number+=1self.log.info("Success criteria met. Exiting.")returnxcom_value
def_get_next_poke_interval(self,started_at:datetime.datetime|float,run_duration:Callable[[],float],try_number:int,)->float:"""Using the similar logic which is used for exponential backoff retry delay for operators."""ifnotself.exponential_backoff:returnself.poke_intervalmin_backoff=int(self.poke_interval*(2**(try_number-2)))run_hash=int(hashlib.sha1(f"{self.dag_id}#{self.task_id}#{started_at}#{try_number}".encode()).hexdigest(),16,)modded_hash=min_backoff+run_hash%min_backoffdelay_backoff_in_seconds=min(modded_hash,timedelta.max.total_seconds()-1)new_interval=min(self.timeout-int(run_duration()),delay_backoff_in_seconds)ifself.max_wait:new_interval=min(self.max_wait.total_seconds(),new_interval)self.log.info("new %s interval is %s",self.mode,new_interval)returnnew_interval
[docs]defprepare_for_execution(self)->BaseOperator:task=super().prepare_for_execution()# Sensors in `poke` mode can block execution of DAGs when running# with single process executor, thus we change the mode to`reschedule`# to allow parallel task being scheduled and executedexecutor,_=ExecutorLoader.import_default_executor_cls()ifexecutor.change_sensor_mode_to_reschedule:self.log.warning("%s changes sensor mode to 'reschedule'.",executor.__name__)task.mode="reschedule"returntask
[docs]defpoke_mode_only(cls):""" Decorate a subclass of BaseSensorOperator with poke. Indicate that instances of this class are only safe to use poke mode. Will decorate all methods in the class to assert they did not change the mode from 'poke'. :param cls: BaseSensor class to enforce methods only use 'poke' mode. """defdecorate(cls_type):defmode_getter(_):return"poke"defmode_setter(_,value):ifvalue!="poke":raiseValueError(f"Cannot set mode to '{value}'. Only 'poke' is acceptable")ifnotissubclass(cls_type,BaseSensorOperator):raiseValueError(f"poke_mode_only decorator should only be "f"applied to subclasses of BaseSensorOperator,"f" got:{cls_type}.")cls_type.mode=property(mode_getter,mode_setter)returncls_typereturndecorate(cls)