## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""The module which provides a way to nest your DAGs and so your levels of complexity."""fromenumimportEnumfromtypingimportDict,Optionalfromsqlalchemy.orm.sessionimportSessionfromairflow.api.common.experimental.get_task_instanceimportget_task_instancefromairflow.exceptionsimportAirflowException,TaskInstanceNotFoundfromairflow.modelsimportDagRunfromairflow.models.dagimportDAG,DagContextfromairflow.models.poolimportPoolfromairflow.models.taskinstanceimportTaskInstancefromairflow.sensors.baseimportBaseSensorOperatorfromairflow.utils.sessionimportcreate_session,provide_sessionfromairflow.utils.stateimportStatefromairflow.utils.typesimportDagRunType
[docs]classSkippedStatePropagationOptions(Enum):"""Available options for skipped state propagation of subdag's tasks to parent dag tasks."""
[docs]classSubDagOperator(BaseSensorOperator):""" This runs a sub dag. By convention, a sub dag's dag_id should be prefixed by its parent and a dot. As in `parent.child`. Although SubDagOperator can occupy a pool/concurrency slot, user can specify the mode=reschedule so that the slot will be released periodically to avoid potential deadlock. :param subdag: the DAG object to run as a subdag of the current DAG. :param session: sqlalchemy session :param conf: Configuration for the subdag :type conf: dict :param propagate_skipped_state: by setting this argument you can define whether the skipped state of leaf task(s) should be propagated to the parent dag's downstream task. """
[docs]def_validate_dag(self,kwargs):dag=kwargs.get('dag')orDagContext.get_current_dag()ifnotdag:raiseAirflowException('Please pass in the `dag` param or call within a DAG context manager')ifdag.dag_id+'.'+kwargs['task_id']!=self.subdag.dag_id:raiseAirflowException("The subdag's dag_id should have the form '{{parent_dag_id}}.{{this_task_id}}'. ""Expected '{d}.{t}'; received '{rcvd}'.".format(d=dag.dag_id,t=kwargs['task_id'],rcvd=self.subdag.dag_id
))
[docs]def_validate_pool(self,session):ifself.pool:conflicts=[tfortinself.subdag.tasksift.pool==self.pool]ifconflicts:# only query for pool conflicts if one may existpool=session.query(Pool).filter(Pool.slots==1).filter(Pool.pool==self.pool).first()ifpoolandany(t.pool==self.poolfortinself.subdag.tasks):raiseAirflowException('SubDagOperator {sd} and subdag task{plural}{t} both ''use pool {p}, but the pool only has 1 slot. The ''subdag tasks will never run.'.format(sd=self.task_id,plural=len(conflicts)>1,t=', '.join(t.task_idfortinconflicts),p=self.pool,
[docs]def_reset_dag_run_and_task_instances(self,dag_run,execution_date):""" Set the DagRun state to RUNNING and set the failed TaskInstances to None state for scheduler to pick up. :param dag_run: DAG run :param execution_date: Execution date :return: None """withcreate_session()assession:dag_run.state=State.RUNNINGsession.merge(dag_run)failed_task_instances=(session.query(TaskInstance).filter(TaskInstance.dag_id==self.subdag.dag_id).filter(TaskInstance.execution_date==execution_date).filter(TaskInstance.state.in_([State.FAILED,State.UPSTREAM_FAILED])))fortask_instanceinfailed_task_instances:task_instance.state=State.NONEsession.merge(task_instance)session.commit()
[docs]defpost_execute(self,context,result=None):execution_date=context['execution_date']dag_run=self._get_dagrun(execution_date=execution_date)self.log.info("Execution finished. State is %s",dag_run.state)ifdag_run.state!=State.SUCCESS:raiseAirflowException(f"Expected state: SUCCESS. Actual state: {dag_run.state}")ifself.propagate_skipped_stateandself._check_skipped_states(context):self._skip_downstream_tasks(context)
[docs]def_skip_downstream_tasks(self,context):self.log.info('Skipping downstream tasks because propagate_skipped_state is set to %s ''and skipped task(s) were found.',self.propagate_skipped_state,)downstream_tasks=context['task'].downstream_listself.log.debug('Downstream task_ids %s',downstream_tasks)ifdownstream_tasks:self.skip(context['dag_run'],context['execution_date'],downstream_tasks)self.log.info('Done.')