PythonOperator

Use the PythonOperator to execute Python callables.

Tip

The @task decorator is recommended over the classic PythonOperator to execute Python callables.

airflow/example_dags/example_python_decorator.py[source]

@task(task_id="print_the_context")
def print_context(ds=None, **kwargs):
    """Print the Airflow context and ds variable from the context."""
    pprint(kwargs)
    print(ds)
    return "Whatever you return gets printed in the logs"

run_this = print_context()

airflow/example_dags/example_python_operator.py[source]

def print_context(ds=None, **kwargs):
    """Print the Airflow context and ds variable from the context."""
    print("::group::All kwargs")
    pprint(kwargs)
    print("::endgroup::")
    print("::group::Context variable ds")
    print(ds)
    print("::endgroup::")
    return "Whatever you return gets printed in the logs"

run_this = PythonOperator(task_id="print_the_context", python_callable=print_context)

Passing in arguments

Pass extra arguments to the @task decorated function as you would with a normal Python function.

airflow/example_dags/example_python_decorator.py[source]

# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
@task
def my_sleeping_function(random_base):
    """This is a function that will run within the DAG execution"""
    time.sleep(random_base)

for i in range(5):
    sleeping_task = my_sleeping_function.override(task_id=f"sleep_for_{i}")(random_base=i / 10)

    run_this >> log_the_sql >> sleeping_task

airflow/example_dags/example_python_operator.py[source]

# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
def my_sleeping_function(random_base):
    """This is a function that will run within the DAG execution"""
    time.sleep(random_base)

for i in range(5):
    sleeping_task = PythonOperator(
        task_id=f"sleep_for_{i}", python_callable=my_sleeping_function, op_kwargs={"random_base": i / 10}
    )

    run_this >> log_the_sql >> sleeping_task

Templating

Airflow passes in an additional set of keyword arguments: one for each of the Jinja template variables and a templates_dict argument.

templates_dict, op_args, op_kwargs arguments are templated, so each value in the dictionary is evaluated as a Jinja template.

airflow/example_dags/example_python_decorator.py[source]

@task(task_id="log_sql_query", templates_dict={"query": "sql/sample.sql"}, templates_exts=[".sql"])
def log_sql(**kwargs):
    log.info("Python task decorator query: %s", str(kwargs["templates_dict"]["query"]))

log_the_sql = log_sql()

airflow/example_dags/example_python_operator.py[source]

def log_sql(**kwargs):
    log.info("Python task decorator query: %s", str(kwargs["templates_dict"]["query"]))

log_the_sql = PythonOperator(
    task_id="log_sql_query",
    python_callable=log_sql,
    templates_dict={"query": "sql/sample.sql"},
    templates_exts=[".sql"],
)

Context

The Context is a dictionary object that contains information about the environment of the DagRun. For example, selecting task_instance will get the currently running TaskInstance object.

It can be used implicitly, such as with **kwargs, but can also be used explicitly with get_current_context(). In this case, the type hint can be used for static analysis.

airflow/example_dags/example_python_context_decorator.py[source]

@task(task_id="print_the_context")
def print_context() -> str:
    """Print the Airflow context."""
    from pprint import pprint

    from airflow.providers.standard.operators.python import get_current_context

    context = get_current_context()
    pprint(context)
    return "Whatever you return gets printed in the logs"

print_the_context = print_context()

airflow/example_dags/example_python_context_operator.py[source]

def print_context() -> str:
    """Print the Airflow context."""
    from pprint import pprint

    from airflow.providers.standard.operators.python import get_current_context

    context = get_current_context()
    pprint(context)
    return "Whatever you return gets printed in the logs"

print_the_context = PythonOperator(task_id="print_the_context", python_callable=print_context)

PythonVirtualenvOperator

Use the PythonVirtualenvOperator decorator to execute Python callables inside a new Python virtual environment. The virtualenv package needs to be installed in the environment that runs Airflow (as optional dependency pip install apache-airflow[virtualenv] --constraint ...).

Additionally, the cloudpickle package needs to be installed as an optional dependency using command pip install [cloudpickle] --constraint .... This package is a replacement for currently used dill package. Cloudpickle offers a strong advantage for its focus on standard pickling protocol, ensuring wider compatibility and smoother data exchange, while still effectively handling common Python objects and global variables within functions.

Tip

The @task.virtualenv decorator is recommended over the classic PythonVirtualenvOperator to execute Python callables inside new Python virtual environments.

airflow/example_dags/example_python_decorator.py[source]

@task.virtualenv(
    task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False
)
def callable_virtualenv():
    """
    Example function that will be performed in a virtual environment.

    Importing at the module level ensures that it will not attempt to import the
    library before it is installed.
    """
    from time import sleep

    from colorama import Back, Fore, Style

    print(Fore.RED + "some red text")
    print(Back.GREEN + "and with a green background")
    print(Style.DIM + "and in dim text")
    print(Style.RESET_ALL)
    for _ in range(4):
        print(Style.DIM + "Please wait...", flush=True)
        sleep(1)
    print("Finished")

virtualenv_task = callable_virtualenv()

airflow/example_dags/example_python_operator.py[source]

def callable_virtualenv():
    """
    Example function that will be performed in a virtual environment.

    Importing at the function level ensures that it will not attempt to import the
    library before it is installed.
    """
    from time import sleep

    from colorama import Back, Fore, Style

    print(Fore.RED + "some red text")
    print(Back.GREEN + "and with a green background")
    print(Style.DIM + "and in dim text")
    print(Style.RESET_ALL)
    for _ in range(4):
        print(Style.DIM + "Please wait...", flush=True)
        sleep(1)
    print("Finished")

virtualenv_task = PythonVirtualenvOperator(
    task_id="virtualenv_python",
    python_callable=callable_virtualenv,
    requirements=["colorama==0.4.0"],
    system_site_packages=False,
)

Passing in arguments

Pass extra arguments to the @task.virtualenv decorated function as you would with a normal Python function. Unfortunately, Airflow does not support serializing var, ti and task_instance due to incompatibilities with the underlying library. For Airflow context variables make sure that you either have access to Airflow through setting system_site_packages to True or add apache-airflow to the requirements argument. Otherwise you won’t have access to the most context variables of Airflow in op_kwargs. If you want the context related to datetime objects like data_interval_start you can add pendulum and lazy_object_proxy.

Important

The Python function body defined to be executed is cut out of the DAG into a temporary file w/o surrounding code. As in the examples you need to add all imports again and you can not rely on variables from the global Python context.

If you want to pass variables into the classic PythonVirtualenvOperator use op_args and op_kwargs.

If additional parameters for package installation are needed pass them in via the pip_install_options parameter or use a requirements.txt as in the example below:

SomePackage==0.2.1 --pre --index-url http://some.archives.com/archives
AnotherPackage==1.4.3 --no-index --find-links /my/local/archives

All supported options are listed in the requirements file format.

Templating

Jinja templating can be used in same way as described for the PythonOperator.

Virtual environment setup options

The virtual environment is created based on the global python pip configuration on your worker. Using additional ENVs in your environment or adjustments in the general pip configuration as described in pip config.

If you want to use additional task specific private python repositories to setup the virtual environment, you can pass the index_urls parameter which will adjust the pip install configurations. Passed index urls replace the standard system configured index url settings. To prevent adding secrets to the private repository in your DAG code you can use the Airflow Connections & Hooks. For this purpose the connection type Package Index (Python) can be used.

In the special case you want to prevent remote calls for setup of a virtual environment, pass the index_urls as empty list as index_urls=[] which forced pip installer to use the --no-index option.

Caching and reuse

Setup of virtual environments is made per task execution in a temporary directory. After execution the virtual environment is deleted again. Ensure that the $tmp folder on your workers have sufficient disk space. Usually (if not configured differently) the local pip cache will be used preventing a re-download of packages for each execution.

But still setting up the virtual environment for every execution needs some time. For repeated execution you can set the option venv_cache_path to a file system folder on your worker. In this case the virtual environment will be set up once and be re-used. If virtual environment caching is used, per unique requirements set different virtual environment subfolders are created in the cache path. So depending on your variations in the DAGs in your system setup sufficient disk space is needed.

Note that no automated cleanup is made and in case of cached mode. All worker slots share the same virtual environment but if tasks are scheduled over and over on different workers, it might happen that virtual environment are created on multiple workers individually. Also if the worker is started in a Kubernetes POD, a restart of the worker will drop the cache (assuming venv_cache_path is not on a persistent volume).

In case you have problems during runtime with broken cached virtual environments, you can influence the cache directory hash by setting the Airflow variable PythonVirtualenvOperator.cache_key to any text. The content of this variable is uses in the vector to calculate the cache directory key.

Note that any modification of a cached virtual environment (like temp files in binary path, post-installing further requirements) might pollute a cached virtual environment and the operator is not maintaining or cleaning the cache path.

Context

With some limitations, you can also use Context in virtual environments.

Important

Using Context in a virtual environment is a bit of a challenge because it involves library dependencies and serialization issues.

You can bypass this to some extent by using Jinja template variables and explicitly passing it as a parameter.

You can also use get_current_context() in the same way as before, but with some limitations.

  • Requires apache-airflow>=3.0.0.

  • Set use_airflow_context to True to call get_current_context() in the virtual environment.

  • Set system_site_packages to True or set expect_airflow to True

airflow/example_dags/example_python_context_decorator.py[source]

@task.virtualenv(task_id="print_the_context_venv", use_airflow_context=True)
def print_context_venv() -> str:
    """Print the Airflow context in venv."""
    from pprint import pprint

    from airflow.providers.standard.operators.python import get_current_context

    context = get_current_context()
    pprint(context)
    return "Whatever you return gets printed in the logs"

print_the_context_venv = print_context_venv()

airflow/example_dags/example_python_context_operator.py[source]

def print_context_venv() -> str:
    """Print the Airflow context in venv."""
    from pprint import pprint

    from airflow.providers.standard.operators.python import get_current_context

    context = get_current_context()
    pprint(context)
    return "Whatever you return gets printed in the logs"

print_the_context_venv = PythonVirtualenvOperator(
    task_id="print_the_context_venv", python_callable=print_context_venv, use_airflow_context=True
)

ExternalPythonOperator

The ExternalPythonOperator can help you to run some of your tasks with a different set of Python libraries than other tasks (and than the main Airflow environment). This might be a virtual environment or any installation of Python that is preinstalled and available in the environment where Airflow task is running. The operator takes Python binary as python parameter. Note, that even in case of virtual environment, the python path should point to the python binary inside the virtual environment (usually in bin subdirectory of the virtual environment). Contrary to regular use of virtual environment, there is no need for activation of the environment. Merely using python binary automatically activates it. In both examples below PATH_TO_PYTHON_BINARY is such a path, pointing to the executable Python binary.

Use the ExternalPythonOperator to execute Python callables inside a pre-defined environment. The virtualenv package should be preinstalled in the environment where Python is run. In case dill is used, it has to be preinstalled in the environment (the same version that is installed in main Airflow environment).

Tip

The @task.external_python decorator is recommended over the classic ExternalPythonOperator to execute Python code in pre-defined Python environments.

airflow/example_dags/example_python_decorator.py[source]

@task.external_python(task_id="external_python", python=PATH_TO_PYTHON_BINARY)
def callable_external_python():
    """
    Example function that will be performed in a virtual environment.

    Importing at the module level ensures that it will not attempt to import the
    library before it is installed.
    """
    import sys
    from time import sleep

    print(f"Running task via {sys.executable}")
    print("Sleeping")
    for _ in range(4):
        print("Please wait...", flush=True)
        sleep(1)
    print("Finished")

external_python_task = callable_external_python()

airflow/example_dags/example_python_operator.py[source]

def callable_external_python():
    """
    Example function that will be performed in a virtual environment.

    Importing at the module level ensures that it will not attempt to import the
    library before it is installed.
    """
    import sys
    from time import sleep

    print(f"Running task via {sys.executable}")
    print("Sleeping")
    for _ in range(4):
        print("Please wait...", flush=True)
        sleep(1)
    print("Finished")

external_python_task = ExternalPythonOperator(
    task_id="external_python",
    python_callable=callable_external_python,
    python=PATH_TO_PYTHON_BINARY,
)

Passing in arguments

Pass extra arguments to the @task.external_python decorated function as you would with a normal Python function. Unfortunately Airflow does not support serializing var and ti / task_instance due to incompatibilities with the underlying library. For Airflow context variables make sure that Airflow is also installed as part of the virtualenv environment in the same version as the Airflow version the task is run on. Otherwise you won’t have access to the most context variables of Airflow in op_kwargs. If you want the context related to datetime objects like data_interval_start you can add pendulum and lazy_object_proxy to your virtual environment.

Important

The Python function body defined to be executed is cut out of the DAG into a temporary file w/o surrounding code. As in the examples you need to add all imports again and you can not rely on variables from the global Python context.

If you want to pass variables into the classic ExternalPythonOperator use op_args and op_kwargs.

Templating

Jinja templating can be used in same way as described for the PythonOperator.

Context

You can use Context under the same conditions as PythonVirtualenvOperator.

airflow/example_dags/example_python_context_decorator.py[source]

@task.external_python(
    task_id="print_the_context_external", python=SOME_EXTERNAL_PYTHON, use_airflow_context=True
)
def print_context_external() -> str:
    """Print the Airflow context in external python."""
    from pprint import pprint

    from airflow.providers.standard.operators.python import get_current_context

    context = get_current_context()
    pprint(context)
    return "Whatever you return gets printed in the logs"

print_the_context_external = print_context_external()

airflow/example_dags/example_python_context_operator.py[source]

def print_context_external() -> str:
    """Print the Airflow context in external python."""
    from pprint import pprint

    from airflow.providers.standard.operators.python import get_current_context

    context = get_current_context()
    pprint(context)
    return "Whatever you return gets printed in the logs"

print_the_context_external = ExternalPythonOperator(
    task_id="print_the_context_external",
    python_callable=print_context_external,
    python=SOME_EXTERNAL_PYTHON,
    use_airflow_context=True,
)

PythonBranchOperator

Use the PythonBranchOperator to execute Python branching tasks.

Tip

The @task.branch decorator is recommended over the classic PythonBranchOperator to execute Python code.

airflow/example_dags/example_branch_operator_decorator.py[source]

@task.branch()
def branching(choices: list[str]) -> str:
    return f"branch_{random.choice(choices)}"

airflow/example_dags/example_branch_operator.py[source]

branching = BranchPythonOperator(
    task_id="branching",
    python_callable=lambda: f"branch_{random.choice(options)}",
)

Passing in arguments and Templating

Argument passing and templating options are the same as with PythonOperator.

BranchPythonVirtualenvOperator

Use the BranchPythonVirtualenvOperator decorator to execute Python branching tasks and is a hybrid of the PythonBranchOperator with execution in a virtual environment.

Tip

The @task.branch_virtualenv decorator is recommended over the classic BranchPythonVirtualenvOperator to execute Python code.

airflow/example_dags/example_branch_operator_decorator.py[source]

# Note: Passing a caching dir allows to keep the virtual environment over multiple runs
#       Run the example a second time and see that it re-uses it and is faster.
VENV_CACHE_PATH = tempfile.gettempdir()

@task.branch_virtualenv(requirements=["numpy~=1.26.0"], venv_cache_path=VENV_CACHE_PATH)
def branching_virtualenv(choices) -> str:
    import random

    import numpy as np

    print(f"Some numpy stuff: {np.arange(6)}")
    return f"venv_{random.choice(choices)}"

airflow/example_dags/example_branch_operator.py[source]

# Note: Passing a caching dir allows to keep the virtual environment over multiple runs
#       Run the example a second time and see that it re-uses it and is faster.
VENV_CACHE_PATH = Path(tempfile.gettempdir())

def branch_with_venv(choices):
    import random

    import numpy as np

    print(f"Some numpy stuff: {np.arange(6)}")
    return f"venv_{random.choice(choices)}"

branching_venv = BranchPythonVirtualenvOperator(
    task_id="branching_venv",
    requirements=["numpy~=1.26.0"],
    venv_cache_path=VENV_CACHE_PATH,
    python_callable=branch_with_venv,
    op_args=[options],
)

Passing in arguments and Templating

Argument passing and templating options are the same as with PythonOperator.

BranchExternalPythonOperator

Use the BranchExternalPythonOperator to execute Python branching tasks and is a hybrid of the PythonBranchOperator with execution in an external Python environment.

Tip

The @task.branch_external_python decorator is recommended over the classic BranchExternalPythonOperator to execute Python code.

airflow/example_dags/example_branch_operator_decorator.py[source]

@task.branch_external_python(python=PATH_TO_PYTHON_BINARY)
def branching_ext_python(choices) -> str:
    import random

    return f"ext_py_{random.choice(choices)}"

airflow/example_dags/example_branch_operator.py[source]

def branch_with_external_python(choices):
    import random

    return f"ext_py_{random.choice(choices)}"

branching_ext_py = BranchExternalPythonOperator(
    task_id="branching_ext_python",
    python=PATH_TO_PYTHON_BINARY,
    python_callable=branch_with_external_python,
    op_args=[options],
)

Passing in arguments and Templating

Argument passing and templating options are the same as with PythonOperator.

ShortCircuitOperator

Use the ShortCircuitOperator to control whether a pipeline continues if a condition is satisfied or a truthy value is obtained.

The evaluation of this condition and truthy value is done via the output of a callable. If the callable returns True or a truthy value, the pipeline is allowed to continue and an XCom of the output will be pushed. If the output is False or a falsy value, the pipeline will be short-circuited based on the configured short-circuiting (more on this later). In the example below, the tasks that follow the “condition_is_true” task will execute while the tasks downstream of the “condition_is_false” task will be skipped.

Tip

The @task.short_circuit decorator is recommended over the classic ShortCircuitOperator to short-circuit pipelines via Python callables.

airflow/example_dags/example_short_circuit_decorator.py[source]

@task.short_circuit()
def check_condition(condition):
    return condition

ds_true = [EmptyOperator(task_id=f"true_{i}") for i in [1, 2]]
ds_false = [EmptyOperator(task_id=f"false_{i}") for i in [1, 2]]

condition_is_true = check_condition.override(task_id="condition_is_true")(condition=True)
condition_is_false = check_condition.override(task_id="condition_is_false")(condition=False)

chain(condition_is_true, *ds_true)
chain(condition_is_false, *ds_false)

airflow/example_dags/example_short_circuit_operator.py[source]

cond_true = ShortCircuitOperator(
    task_id="condition_is_True",
    python_callable=lambda: True,
)

cond_false = ShortCircuitOperator(
    task_id="condition_is_False",
    python_callable=lambda: False,
)

ds_true = [EmptyOperator(task_id=f"true_{i}") for i in [1, 2]]
ds_false = [EmptyOperator(task_id=f"false_{i}") for i in [1, 2]]

chain(cond_true, *ds_true)
chain(cond_false, *ds_false)

The “short-circuiting” can be configured to either respect or ignore the trigger rule defined for downstream tasks. If ignore_downstream_trigger_rules is set to True, the default configuration, all downstream tasks are skipped without considering the trigger_rule defined for tasks. If this parameter is set to False, the direct downstream tasks are skipped but the specified trigger_rule for other subsequent downstream tasks are respected. In this short-circuiting configuration, the operator assumes the direct downstream task(s) were purposely meant to be skipped but perhaps not other subsequent tasks. This configuration is especially useful if only part of a pipeline should be short-circuited rather than all tasks which follow the short-circuiting task.

In the example below, notice that the “short_circuit” task is configured to respect downstream trigger rules. This means while the tasks that follow the “short_circuit” task will be skipped since the decorated function returns False, “task_7” will still execute as its set to execute when upstream tasks have completed running regardless of status (i.e. the TriggerRule.ALL_DONE trigger rule).

airflow/example_dags/example_short_circuit_decorator.py[source]

[task_1, task_2, task_3, task_4, task_5, task_6] = [
    EmptyOperator(task_id=f"task_{i}") for i in range(1, 7)
]

task_7 = EmptyOperator(task_id="task_7", trigger_rule=TriggerRule.ALL_DONE)

short_circuit = check_condition.override(task_id="short_circuit", ignore_downstream_trigger_rules=False)(
    condition=False
)

chain(task_1, [task_2, short_circuit], [task_3, task_4], [task_5, task_6], task_7)

airflow/example_dags/example_short_circuit_operator.py[source]

[task_1, task_2, task_3, task_4, task_5, task_6] = [
    EmptyOperator(task_id=f"task_{i}") for i in range(1, 7)
]

task_7 = EmptyOperator(task_id="task_7", trigger_rule=TriggerRule.ALL_DONE)

short_circuit = ShortCircuitOperator(
    task_id="short_circuit", ignore_downstream_trigger_rules=False, python_callable=lambda: False
)

chain(task_1, [task_2, short_circuit], [task_3, task_4], [task_5, task_6], task_7)

Passing in arguments and Templating

Argument passing and templating options are the same as with PythonOperator.

Was this entry helpful?