BranchDateTimeOperator¶
Use the BranchDateTimeOperator
to branch into one of two execution paths
depending on whether the time falls into the range given by two target arguments,
This operator has two modes. First mode is to use current time (machine clock time at the
moment the DAG is executed), and the second mode is to use the logical_date
of the DAG run it is run
with.
Usage with current time¶
The usages above might be useful in certain situations - for example when DAG is used to perform cleanups and maintenance and is not really supposed to be used for any DAGs that are supposed to be back-filled, because the “current time” make back-filling non-idempotent, its result depend on the time when the DAG actually was run. It’s also slightly non-deterministic potentially even if it is run on schedule. It can take some time between when the DAGRun was scheduled and executed and it might mean that even if the DAGRun was scheduled properly, the actual time used for branching decision will be different than the schedule time and the branching decision might be different depending on those delays.
empty_task_11 = EmptyOperator(task_id="date_in_range", dag=dag1)
empty_task_21 = EmptyOperator(task_id="date_outside_range", dag=dag1)
cond1 = BranchDateTimeOperator(
task_id="datetime_branch",
follow_task_ids_if_true=["date_in_range"],
follow_task_ids_if_false=["date_outside_range"],
target_upper=pendulum.datetime(2020, 10, 10, 15, 0, 0),
target_lower=pendulum.datetime(2020, 10, 10, 14, 0, 0),
dag=dag1,
)
# Run empty_task_11 if cond1 executes between 2020-10-10 14:00:00 and 2020-10-10 15:00:00
cond1 >> [empty_task_11, empty_task_21]
The target parameters, target_upper
and target_lower
, can receive a datetime.datetime
,
a datetime.time
, or None
. When a datetime.time
object is used, it will be combined with
the current date in order to allow comparisons with it. In the event that target_upper
is set
to a datetime.time
that occurs before the given target_lower
, a day will be added to target_upper
.
This is done to allow for time periods that span over two dates.
empty_task_12 = EmptyOperator(task_id="date_in_range", dag=dag2)
empty_task_22 = EmptyOperator(task_id="date_outside_range", dag=dag2)
cond2 = BranchDateTimeOperator(
task_id="datetime_branch",
follow_task_ids_if_true=["date_in_range"],
follow_task_ids_if_false=["date_outside_range"],
target_upper=pendulum.time(0, 0, 0),
target_lower=pendulum.time(15, 0, 0),
dag=dag2,
)
# Since target_lower happens after target_upper, target_upper will be moved to the following day
# Run empty_task_12 if cond2 executes between 15:00:00, and 00:00:00 of the following day
cond2 >> [empty_task_12, empty_task_22]
If a target parameter is set to None
, the operator will perform a unilateral comparison using only
the non-None
target. Setting both target_upper
and target_lower
to None
will raise an exception.
Usage with logical date¶
The usage is much more “data range” friendly. The logical_date
does not change when the DAG is re-run and
it is not affected by execution delays, so this approach is suitable for idempotent DAG runs that might be
back-filled.
empty_task_13 = EmptyOperator(task_id="date_in_range", dag=dag3)
empty_task_23 = EmptyOperator(task_id="date_outside_range", dag=dag3)
cond3 = BranchDateTimeOperator(
task_id="datetime_branch",
use_task_logical_date=True,
follow_task_ids_if_true=["date_in_range"],
follow_task_ids_if_false=["date_outside_range"],
target_upper=pendulum.datetime(2020, 10, 10, 15, 0, 0),
target_lower=pendulum.datetime(2020, 10, 10, 14, 0, 0),
dag=dag3,
)
# Run empty_task_13 if cond3 executes between 2020-10-10 14:00:00 and 2020-10-10 15:00:00
cond3 >> [empty_task_13, empty_task_23]
BranchDayOfWeekOperator¶
Use the BranchDayOfWeekOperator
to branch your workflow based on week day value.
empty_task_1 = EmptyOperator(task_id="branch_true")
empty_task_2 = EmptyOperator(task_id="branch_false")
empty_task_3 = EmptyOperator(task_id="branch_weekend")
empty_task_4 = EmptyOperator(task_id="branch_mid_week")
branch = BranchDayOfWeekOperator(
task_id="make_choice",
follow_task_ids_if_true="branch_true",
follow_task_ids_if_false="branch_false",
week_day="Monday",
)
branch_weekend = BranchDayOfWeekOperator(
task_id="make_weekend_choice",
follow_task_ids_if_true="branch_weekend",
follow_task_ids_if_false="branch_mid_week",
week_day={WeekDay.SATURDAY, WeekDay.SUNDAY},
)
# Run empty_task_1 if branch executes on Monday, empty_task_2 otherwise
branch >> [empty_task_1, empty_task_2]
# Run empty_task_3 if it's a weekend, empty_task_4 otherwise
empty_task_2 >> branch_weekend >> [empty_task_3, empty_task_4]