## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This module is deprecated. Please use :mod:`airflow.utils.task_group`.The module which provides a way to nest your DAGs and so your levels of complexity."""from__future__importannotationsimportwarningsfromdatetimeimportdatetimefromenumimportEnumfromsqlalchemy.orm.sessionimportSessionfromairflow.api.common.experimental.get_task_instanceimportget_task_instancefromairflow.exceptionsimportAirflowException,RemovedInAirflow3Warning,TaskInstanceNotFoundfromairflow.modelsimportDagRunfromairflow.models.dagimportDAG,DagContextfromairflow.models.poolimportPoolfromairflow.models.taskinstanceimportTaskInstancefromairflow.sensors.baseimportBaseSensorOperatorfromairflow.utils.contextimportContextfromairflow.utils.sessionimportNEW_SESSION,create_session,provide_sessionfromairflow.utils.stateimportStatefromairflow.utils.typesimportDagRunType
[docs]classSkippedStatePropagationOptions(Enum):"""Available options for skipped state propagation of subdag's tasks to parent dag tasks."""
[docs]classSubDagOperator(BaseSensorOperator):""" This class is deprecated. Please use `airflow.utils.task_group.TaskGroup`. This runs a sub dag. By convention, a sub dag's dag_id should be prefixed by its parent and a dot. As in `parent.child`. Although SubDagOperator can occupy a pool/concurrency slot, user can specify the mode=reschedule so that the slot will be released periodically to avoid potential deadlock. :param subdag: the DAG object to run as a subdag of the current DAG. :param session: sqlalchemy session :param conf: Configuration for the subdag :param propagate_skipped_state: by setting this argument you can define whether the skipped state of leaf task(s) should be propagated to the parent dag's downstream task. """
@provide_sessiondef__init__(self,*,subdag:DAG,session:Session=NEW_SESSION,conf:dict|None=None,propagate_skipped_state:SkippedStatePropagationOptions|None=None,**kwargs,)->None:super().__init__(**kwargs)self.subdag=subdagself.conf=confself.propagate_skipped_state=propagate_skipped_stateself._validate_dag(kwargs)self._validate_pool(session)warnings.warn("""This class is deprecated. Please use `airflow.utils.task_group.TaskGroup`.""",RemovedInAirflow3Warning,stacklevel=4,)def_validate_dag(self,kwargs):dag=kwargs.get("dag")orDagContext.get_current_dag()ifnotdag:raiseAirflowException("Please pass in the `dag` param or call within a DAG context manager")ifdag.dag_id+"."+kwargs["task_id"]!=self.subdag.dag_id:raiseAirflowException(f"The subdag's dag_id should have the form '{{parent_dag_id}}.{{this_task_id}}'. "f"Expected '{dag.dag_id}.{kwargs['task_id']}'; received '{self.subdag.dag_id}'.")def_validate_pool(self,session):ifself.pool:conflicts=[tfortinself.subdag.tasksift.pool==self.pool]ifconflicts:# only query for pool conflicts if one may existpool=session.query(Pool).filter(Pool.slots==1).filter(Pool.pool==self.pool).first()ifpoolandany(t.pool==self.poolfortinself.subdag.tasks):raiseAirflowException(f"SubDagOperator {self.task_id} and subdag task{'s'iflen(conflicts)>1else''} "f"{', '.join(t.task_idfortinconflicts)} both use pool {self.pool}, "f"but the pool only has 1 slot. The subdag tasks will never run.")def_get_dagrun(self,execution_date):dag_runs=DagRun.find(dag_id=self.subdag.dag_id,execution_date=execution_date,)returndag_runs[0]ifdag_runselseNonedef_reset_dag_run_and_task_instances(self,dag_run,execution_date):""" Set the DagRun state to RUNNING and set the failed TaskInstances to None state for scheduler to pick up. :param dag_run: DAG run :param execution_date: Execution date :return: None """withcreate_session()assession:dag_run.state=State.RUNNINGsession.merge(dag_run)failed_task_instances=(session.query(TaskInstance).filter(TaskInstance.dag_id==self.subdag.dag_id).filter(TaskInstance.execution_date==execution_date).filter(TaskInstance.state.in_([State.FAILED,State.UPSTREAM_FAILED])))fortask_instanceinfailed_task_instances:task_instance.state=State.NONEsession.merge(task_instance)session.commit()
[docs]defpost_execute(self,context,result=None):super().post_execute(context)execution_date=context["execution_date"]dag_run=self._get_dagrun(execution_date=execution_date)self.log.info("Execution finished. State is %s",dag_run.state)ifdag_run.state!=State.SUCCESS:raiseAirflowException(f"Expected state: SUCCESS. Actual state: {dag_run.state}")ifself.propagate_skipped_stateandself._check_skipped_states(context):self._skip_downstream_tasks(context)
def_check_skipped_states(self,context):leaves_tis=self._get_leaves_tis(context["execution_date"])ifself.propagate_skipped_state==SkippedStatePropagationOptions.ANY_LEAF:returnany(ti.state==State.SKIPPEDfortiinleaves_tis)ifself.propagate_skipped_state==SkippedStatePropagationOptions.ALL_LEAVES:returnall(ti.state==State.SKIPPEDfortiinleaves_tis)raiseAirflowException(f"Unimplemented SkippedStatePropagationOptions {self.propagate_skipped_state} used.")def_get_leaves_tis(self,execution_date):leaves_tis=[]forleafinself.subdag.leaves:try:ti=get_task_instance(dag_id=self.subdag.dag_id,task_id=leaf.task_id,execution_date=execution_date)leaves_tis.append(ti)exceptTaskInstanceNotFound:continuereturnleaves_tisdef_skip_downstream_tasks(self,context):self.log.info("Skipping downstream tasks because propagate_skipped_state is set to %s ""and skipped task(s) were found.",self.propagate_skipped_state,)downstream_tasks=context["task"].downstream_listself.log.debug("Downstream task_ids %s",downstream_tasks)ifdownstream_tasks:self.skip(context["dag_run"],context["execution_date"],downstream_tasks)self.log.info("Done.")