Source code for airflow.example_dags.example_setup_teardown_taskflow
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Example DAG demonstrating the usage of setup and teardown tasks."""from__future__importannotationsimportpendulumfromairflow.decoratorsimportsetup,task,task_group,teardownfromairflow.models.dagimportDAGwithDAG(dag_id="example_setup_teardown_taskflow",schedule=None,start_date=pendulum.datetime(2021,1,1,tz="UTC"),catchup=False,tags=["example"],)asdag:@task
@taskdefmy_second_task():print("Hello 2")@taskdefmy_third_task():print("Hello 3")# you can set setup / teardown relationships with the `as_teardown` method.task_1=my_first_task()task_2=my_second_task()task_3=my_third_task()task_1>>task_2>>task_3.as_teardown(setups=task_1)# The method `as_teardown` will mark task_3 as teardown, task_1 as setup, and# arrow task_1 >> task_3.# Now if you clear task_2, then its setup task, task_1, will be cleared in# addition to its teardown task, task_3# it's also possible to use a decorator to mark a task as setup or# teardown when you define it. see below.@setupdefouter_setup():print("I am outer_setup")return"some cluster id"@teardowndefouter_teardown(cluster_id):print("I am outer_teardown")print(f"Tearing down cluster: {cluster_id}")@taskdefouter_work():print("I am just a normal task")@task_groupdefsection_1():@setupdefinner_setup():print("I set up")return"some_cluster_id"@taskdefinner_work(cluster_id):print(f"doing some work with {cluster_id=}")@teardowndefinner_teardown(cluster_id):print(f"tearing down {cluster_id=}")# this passes the return value of `inner_setup` to both `inner_work` and `inner_teardown`inner_setup_task=inner_setup()inner_work(inner_setup_task)>>inner_teardown(inner_setup_task)# by using the decorators, outer_setup and outer_teardown are already marked as setup / teardown# now we just need to make sure they are linked directly. At a low level, what we need# to do so is the following::# s = outer_setup()# t = outer_teardown()# s >> t# s >> outer_work() >> t# Thus, s and t are linked directly, and outer_work runs in between. We can take advantage of# the fact that we are in taskflow, along with the context manager on teardowns, as follows:withouter_teardown(outer_setup()):outer_work()# and let's put section 1 inside the outer setup and teardown taskssection_1()