# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.importdatetimefromtypingimportDict,Iterable,Unionfromairflow.exceptionsimportAirflowExceptionfromairflow.operators.branchimportBaseBranchOperatorfromairflow.utilsimporttimezone
[docs]classBranchDateTimeOperator(BaseBranchOperator):""" Branches into one of two lists of tasks depending on the current datetime. For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:BranchDateTimeOperator` True branch will be returned when ``datetime.datetime.now()`` falls below ``target_upper`` and above ``target_lower``. :param follow_task_ids_if_true: task id or task ids to follow if ``datetime.datetime.now()`` falls above target_lower and below ``target_upper``. :type follow_task_ids_if_true: str or list[str] :param follow_task_ids_if_false: task id or task ids to follow if ``datetime.datetime.now()`` falls below target_lower or above ``target_upper``. :type follow_task_ids_if_false: str or list[str] :param target_lower: target lower bound. :type target_lower: Optional[datetime.datetime] :param target_upper: target upper bound. :type target_upper: Optional[datetime.datetime] :param use_task_execution_date: If ``True``, uses task's execution day to compare with targets. Execution date is useful for backfilling. If ``False``, uses system's date. :type use_task_execution_date: bool """def__init__(self,*,follow_task_ids_if_true:Union[str,Iterable[str]],follow_task_ids_if_false:Union[str,Iterable[str]],target_lower:Union[datetime.datetime,datetime.time,None],target_upper:Union[datetime.datetime,datetime.time,None],use_task_execution_date:bool=False,**kwargs,)->None:super().__init__(**kwargs)iftarget_lowerisNoneandtarget_upperisNone:raiseAirflowException("Both target_upper and target_lower are None. At least one ""must be defined to be compared to the current datetime")self.target_lower=target_lowerself.target_upper=target_upperself.follow_task_ids_if_true=follow_task_ids_if_trueself.follow_task_ids_if_false=follow_task_ids_if_falseself.use_task_execution_date=use_task_execution_date
[docs]deftarget_times_as_dates(base_date:datetime.datetime,lower:Union[datetime.datetime,datetime.time,None],upper:Union[datetime.datetime,datetime.time,None],):"""Ensures upper and lower time targets are datetimes by combining them with base_date"""ifisinstance(lower,datetime.datetime)andisinstance(upper,datetime.datetime):returnlower,upperiflowerisnotNoneandisinstance(lower,datetime.time):lower=datetime.datetime.combine(base_date,lower)ifupperisnotNoneandisinstance(upper,datetime.time):upper=datetime.datetime.combine(base_date,upper)ifany(dateisNonefordatein(lower,upper)):returnlower,upperifupper<lower:upper+=datetime.timedelta(days=1)returnlower,upper