# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportdatetimefromtypingimportAnyfromcron_descriptorimportCasingTypeEnum,ExpressionDescriptor,FormatException,MissingFieldExceptionfromcroniterimportCroniterBadCronError,CroniterBadDateError,croniterfrompendulumimportDateTimefrompendulum.tz.timezoneimportTimezonefromairflow.compat.functoolsimportcached_propertyfromairflow.exceptionsimportAirflowTimetableInvalidfromairflow.utils.datesimportcron_presetsfromairflow.utils.timezoneimportconvert_to_utc,make_aware,make_naivedef_is_schedule_fixed(expression:str)->bool:"""Figures out if the schedule has a fixed time (e.g. 3 AM every day). :return: True if the schedule has a fixed time, False if not. Detection is done by "peeking" the next two cron trigger time; if the two times have the same minute and hour value, the schedule is fixed, and we *don't* need to perform the DST fix. This assumes DST happens on whole minute changes (e.g. 12:59 -> 12:00). """cron=croniter(expression)next_a=cron.get_next(datetime.datetime)next_b=cron.get_next(datetime.datetime)returnnext_b.minute==next_a.minuteandnext_b.hour==next_a.hour
[docs]classCronMixin:"""Mixin to provide interface to work with croniter."""def__init__(self,cron:str,timezone:str|Timezone)->None:self._expression=cron_presets.get(cron,cron)ifisinstance(timezone,str):timezone=Timezone(timezone)self._timezone=timezonedescriptor=ExpressionDescriptor(expression=self._expression,casing_type=CasingTypeEnum.Sentence,use_24hour_time_format=True)try:# checking for more than 5 parameters in Cron and avoiding evaluation for now,# as Croniter has inconsistent evaluation with other librariesiflen(croniter(self._expression).expanded)>5:raiseFormatException()interval_description=descriptor.get_description()except(CroniterBadCronError,FormatException,MissingFieldException):interval_description=""self.description=interval_description
[docs]def__eq__(self,other:Any)->bool:"""Both expression and timezone should match. This is only for testing purposes and should not be relied on otherwise. """ifnotisinstance(other,type(self)):returnNotImplementedreturnself._expression==other._expressionandself._timezone==other._timezone
@cached_propertydef_should_fix_dst(self)->bool:# This is lazy so instantiating a schedule does not immediately raise# an exception. Validity is checked with validate() during DAG-bagging.returnnot_is_schedule_fixed(self._expression)def_get_next(self,current:DateTime)->DateTime:"""Get the first schedule after specified time, with DST fixed."""naive=make_naive(current,self._timezone)cron=croniter(self._expression,start_time=naive)scheduled=cron.get_next(datetime.datetime)ifnotself._should_fix_dst:returnconvert_to_utc(make_aware(scheduled,self._timezone))delta=scheduled-naivereturnconvert_to_utc(current.in_timezone(self._timezone)+delta)def_get_prev(self,current:DateTime)->DateTime:"""Get the first schedule before specified time, with DST fixed."""naive=make_naive(current,self._timezone)cron=croniter(self._expression,start_time=naive)scheduled=cron.get_prev(datetime.datetime)ifnotself._should_fix_dst:returnconvert_to_utc(make_aware(scheduled,self._timezone))delta=naive-scheduledreturnconvert_to_utc(current.in_timezone(self._timezone)-delta)def_align_to_next(self,current:DateTime)->DateTime:"""Get the next scheduled time. This is ``current + interval``, unless ``current`` falls right on the interval boundary, when ``current`` is returned. """next_time=self._get_next(current)ifself._get_prev(next_time)!=current:returnnext_timereturncurrentdef_align_to_prev(self,current:DateTime)->DateTime:"""Get the prev scheduled time. This is ``current - interval``, unless ``current`` falls right on the interval boundary, when ``current`` is returned. """prev_time=self._get_prev(current)ifself._get_next(prev_time)!=current:returnprev_timereturncurrent