BashOperator

Use the BashOperator to execute commands in a Bash shell. The Bash command or script to execute is determined by:

  1. The bash_command argument when using BashOperator, or

  2. If using the TaskFlow decorator, @task.bash, a non-empty string value returned from the decorated callable.

Tip

The @task.bash decorator is recommended over the classic BashOperator to execute Bash commands.

airflow/example_dags/example_bash_decorator.py[source]

@task.bash
def run_after_loop() -> str:
    return "echo https://airflow.apache.org/"

run_this = run_after_loop()

airflow/example_dags/example_bash_operator.py[source]

run_this = BashOperator(
    task_id="run_after_loop",
    bash_command="echo https://airflow.apache.org/",
)

Templating

You can use Jinja templates to parameterize the Bash command.

airflow/example_dags/example_bash_decorator.py[source]

@task.bash
def also_run_this() -> str:
    return 'echo "ti_key={{ task_instance_key_str }}"'

also_this = also_run_this()

airflow/example_dags/example_bash_operator.py[source]

also_run_this = BashOperator(
    task_id="also_run_this",
    bash_command='echo "ti_key={{ task_instance_key_str }}"',
)

Using the @task.bash TaskFlow decorator allows you to return a formatted string and take advantage of having all execution context variables directly accessible to decorated tasks.

airflow/example_dags/example_bash_decorator.py[source]

@task.bash
def also_run_this_again(task_instance_key_str) -> str:
    return f'echo "ti_key={task_instance_key_str}"'

also_this_again = also_run_this_again()

You are encouraged to take advantage of this approach as it fits nicely into the overall TaskFlow paradigm.

Caution

Care should be taken with “user” input when using Jinja templates in the Bash command as escaping and sanitization of the Bash command is not performed.

This applies mostly to using dag_run.conf, as that can be submitted via users in the Web UI. Most of the default template variables are not at risk.

For example, do not do:

@task.bash
def bash_task() -> str:
    return 'echo "Here is the message: \'{{ dag_run.conf["message"] if dag_run.conf else "" }}\'"'


# Or directly accessing `dag_run.conf`
@task.bash
def bash_task(dag_run) -> str:
    message = dag_run.conf["message"] if dag_run.conf else ""
    return f'echo "here is the message: {message}"'
bash_task = BashOperator(
    task_id="bash_task",
    bash_command='echo "Here is the message: \'{{ dag_run.conf["message"] if dag_run.conf else "" }}\'"',
)

Instead, you should pass this via the env kwarg and use double-quotes inside the Bash command.

@task.bash(env={"message": '{{ dag_run.conf["message"] if dag_run.conf else "" }}'})
def bash_task() -> str:
    return "echo \"here is the message: '$message'\""
bash_task = BashOperator(
    task_id="bash_task",
    bash_command="echo \"here is the message: '$message'\"",
    env={"message": '{{ dag_run.conf["message"] if dag_run.conf else "" }}'},
)

Skipping

In general a non-zero exit code produces an AirflowException and thus a task failure. In cases where it is desirable to instead have the task end in a skipped state, you can exit with code 99 (or with another exit code if you pass skip_on_exit_code).

airflow/example_dags/example_bash_decorator.py[source]

@task.bash
def this_will_skip() -> str:
    return 'echo "hello world"; exit 99;'

this_skips = this_will_skip()

airflow/example_dags/example_bash_operator.py[source]

this_will_skip = BashOperator(
    task_id="this_will_skip",
    bash_command='echo "hello world"; exit 99;',
    dag=dag,
)

Output processor

The output_processor parameter allows you to specify a lambda function that processes the output of the bash script before it is pushed as an XCom. This feature is particularly useful for manipulating the script’s output directly within the BashOperator, without the need for additional operators or tasks.

For example, consider a scenario where the output of the bash script is a JSON string. With the output_processor, you can transform this string into a JSON object before storing it in XCom. This simplifies the workflow and ensures that downstream tasks receive the processed data in the desired format.

Here’s how you can use the result_processor with the BashOperator:

@task.bash(output_processor=lambda output: json.loads(output))
def bash_task() -> str:
    return """
        jq -c '.[] | select(.lastModified > "{{ data_interval_start | ts_zulu }}" or .created > "{{ data_interval_start | ts_zulu }}")' \\
        example.json
    """
bash_task = BashOperator(
    task_id="filter_today_changes",
    bash_command="""
        jq -c '.[] | select(.lastModified > "{{ data_interval_start | ts_zulu }}" or .created > "{{ data_interval_start | ts_zulu }}")' \\
        example.json
    """,
    output_processor=lambda output: json.loads(output),
)

Executing commands from files

Both the BashOperator and @task.bash TaskFlow decorator enables you to execute Bash commands stored in files. The files must have a .sh or .bash extension.

With Jinja template

You can execute bash script which contains Jinja templates. When you do so, Airflow loads the content of your file, render the templates, and write the rendered script into a temporary file. By default, the file is placed in a temporary directory (under /tmp). You can change this location with the cwd parameter.

Caution

Airflow must have write access to /tmp or the cwd directory, to be able to write the temporary file to the disk.

To execute a bash script, place it in a location relative to the directory containing the DAG file. So if your DAG file is in /usr/local/airflow/dags/test_dag.py, you can move your test.sh file to any location under /usr/local/airflow/dags/ (Example: /usr/local/airflow/dags/scripts/test.sh) and pass the relative path to bash_command as shown below:

@task.bash
def bash_example():
    # "scripts" folder is under "/usr/local/airflow/dags"
    return "scripts/test.sh"
t2 = BashOperator(
    task_id="bash_example",
    # "scripts" folder is under "/usr/local/airflow/dags"
    bash_command="scripts/test.sh",
)

Creating separate folder for Bash scripts may be desirable for many reasons, like separating your script’s logic and pipeline code, allowing for proper code highlighting in files composed in different languages, and general flexibility in structuring pipelines.

It is also possible to define your template_searchpath as pointing to any folder locations in the DAG constructor call.

@dag(..., template_searchpath="/opt/scripts")
def example_bash_dag():
    @task.bash
    def bash_example():
        return "test.sh "
with DAG("example_bash_dag", ..., template_searchpath="/opt/scripts"):
    t2 = BashOperator(
        task_id="bash_example",
        bash_command="test.sh ",
    )

Without Jinja template

If your script doesn’t contains any Jinja template, disable Airflow’s rendering by adding a space after the script name.

@task.bash
def run_command_from_script() -> str:
    return "$AIRFLOW_HOME/scripts/example.sh "


run_script = run_command_from_script()
run_script = BashOperator(
    task_id="run_command_from_script",
    bash_command="$AIRFLOW_HOME/scripts/example.sh ",
)

Jinja template not found

If you encounter a “Template not found” exception when trying to execute a Bash script, add a space after the script name. This is because Airflow tries to apply a Jinja template to it, which will fail.

@task.bash
def bash_example():
    # This fails with 'Jinja template not found' error
    # return "/home/batcher/test.sh",
    # This works (has a space after)
    return "/home/batcher/test.sh "
BashOperator(
    task_id="bash_example",
    # This fails with 'Jinja template not found' error
    # bash_command="/home/batcher/test.sh",
    # This works (has a space after)
    bash_command="/home/batcher/test.sh ",
)

However, if you want to use templating in your Bash script, do not add the space and instead check the bash script with Jinja template section.

Enriching Bash with Python

The @task.bash TaskFlow decorator allows you to combine both Bash and Python into a powerful combination within a task.

Using Python conditionals, other function calls, etc. within a @task.bash task can help define, augment, or even build the Bash command(s) to execute.

For example, use conditional logic to determine task behavior:

airflow/example_dags/example_bash_decorator.py[source]

@task.bash
def sleep_in(day: str) -> str:
    if day in (WeekDay.SATURDAY, WeekDay.SUNDAY):
        return f"sleep {60 * 60}"
    else:
        raise AirflowSkipException("No sleeping in today!")

sleep_in(day="{{ dag_run.logical_date.strftime('%A').lower() }}")

Or call a function to help build a Bash command:

airflow/example_dags/example_bash_decorator.py[source]

def _get_files_in_cwd() -> list[str]:
    from pathlib import Path

    dir_contents = Path.cwd().glob("airflow/example_dags/*.py")
    files = [str(elem) for elem in dir_contents if elem.is_file()]

    return files

@task.bash
def get_file_stats() -> str:
    from shlex import join

    files = _get_files_in_cwd()
    cmd = join(["stat", *files])

    return cmd

get_file_stats()

There are numerous possibilities with this type of pre-execution enrichment.

Was this entry helpful?