from time import sleep

from airflow.exceptions import AirflowException, AirflowSensorTimeout, \
from airflow.models import BaseOperator, SkipMixin
from airflow.utils import timezone
from airflow.utils.decorators import apply_defaults

[docs]class BaseSensorOperator(BaseOperator, SkipMixin): """ Sensor operators are derived from this class an inherit these attributes. Sensor operators keep executing at a time interval and succeed when a criteria is met and fail if and when they time out. :param soft_fail: Set to true to mark the task as SKIPPED on failure :type soft_fail: bool :param poke_interval: Time in seconds that the job should wait in between each tries :type poke_interval: int :param timeout: Time, in seconds before the task times out and fails. :type timeout: int """ ui_color = '#e6f1f2' @apply_defaults def __init__(self, poke_interval=60, timeout=60 * 60 * 24 * 7, soft_fail=False, *args, **kwargs): super(BaseSensorOperator, self).__init__(*args, **kwargs) self.poke_interval = poke_interval self.soft_fail = soft_fail self.timeout = timeout
[docs] def poke(self, context): """ Function that the sensors defined while deriving this class should override. """ raise AirflowException('Override me.')
def execute(self, context): started_at = timezone.utcnow() while not self.poke(context): if (timezone.utcnow() - started_at).total_seconds() > self.timeout: # If sensor is in soft fail mode but will be retried then # give it a chance and fail with timeout. # This gives the ability to set up non-blocking AND soft-fail sensors. if self.soft_fail and not context['ti'].is_eligible_to_retry(): self._do_skip_downstream_tasks(context) raise AirflowSkipException('Snap. Time is OUT.') else: raise AirflowSensorTimeout('Snap. Time is OUT.') sleep(self.poke_interval)"Success criteria met. Exiting.") def _do_skip_downstream_tasks(self, context): downstream_tasks = context['task'].get_flat_relatives(upstream=False) self.log.debug("Downstream task_ids %s", downstream_tasks) if downstream_tasks: self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)