BashOperator¶
Use the BashOperator
to execute
commands in a Bash shell. The Bash command or script to execute is
determined by:
The
bash_command
argument when usingBashOperator
, orIf using the TaskFlow decorator,
@task.bash
, a non-empty string value returned from the decorated callable.
Tip
The @task.bash
decorator is recommended over the classic BashOperator
to execute Bash commands.
@task.bash
def run_after_loop() -> str:
return "echo https://airflow.apache.org/"
run_this = run_after_loop()
run_this = BashOperator(
task_id="run_after_loop",
bash_command="echo https://airflow.apache.org/",
)
Templating¶
You can use Jinja templates to parameterize the Bash command.
@task.bash
def also_run_this() -> str:
return 'echo "ti_key={{ task_instance_key_str }}"'
also_this = also_run_this()
also_run_this = BashOperator(
task_id="also_run_this",
bash_command='echo "ti_key={{ task_instance_key_str }}"',
)
Using the @task.bash
TaskFlow decorator allows you to return a formatted string and take advantage of
having all execution context variables directly accessible to decorated tasks.
@task.bash
def also_run_this_again(task_instance_key_str) -> str:
return f'echo "ti_key={task_instance_key_str}"'
also_this_again = also_run_this_again()
You are encouraged to take advantage of this approach as it fits nicely into the overall TaskFlow paradigm.
Caution
Care should be taken with “user” input when using Jinja templates in the Bash command as escaping and sanitization of the Bash command is not performed.
This applies mostly to using dag_run.conf
, as that can be submitted via users in the Web UI. Most of
the default template variables are not at risk.
For example, do not do:
@task.bash
def bash_task() -> str:
return 'echo "Here is the message: \'{{ dag_run.conf["message"] if dag_run.conf else "" }}\'"'
# Or directly accessing `dag_run.conf`
@task.bash
def bash_task(dag_run) -> str:
message = dag_run.conf["message"] if dag_run.conf else ""
return f'echo "here is the message: {message}"'
bash_task = BashOperator(
task_id="bash_task",
bash_command='echo "Here is the message: \'{{ dag_run.conf["message"] if dag_run.conf else "" }}\'"',
)
Instead, you should pass this via the env
kwarg and use double-quotes inside the Bash command.
@task.bash(env={"message": '{{ dag_run.conf["message"] if dag_run.conf else "" }}'})
def bash_task() -> str:
return "echo \"here is the message: '$message'\""
bash_task = BashOperator(
task_id="bash_task",
bash_command="echo \"here is the message: '$message'\"",
env={"message": '{{ dag_run.conf["message"] if dag_run.conf else "" }}'},
)
Skipping¶
In general a non-zero exit code produces an AirflowException and thus a task failure. In cases where it is
desirable to instead have the task end in a skipped
state, you can exit with code 99
(or with another
exit code if you pass skip_on_exit_code
).
@task.bash
def this_will_skip() -> str:
return 'echo "hello world"; exit 99;'
this_skips = this_will_skip()
this_will_skip = BashOperator(
task_id="this_will_skip",
bash_command='echo "hello world"; exit 99;',
dag=dag,
)
Output processor¶
The output_processor
parameter allows you to specify a lambda function that processes the output of the bash script
before it is pushed as an XCom. This feature is particularly useful for manipulating the script’s output directly within
the BashOperator, without the need for additional operators or tasks.
For example, consider a scenario where the output of the bash script is a JSON string. With the output_processor
,
you can transform this string into a JSON object before storing it in XCom. This simplifies the workflow and ensures
that downstream tasks receive the processed data in the desired format.
Here’s how you can use the result_processor with the BashOperator:
@task.bash(output_processor=lambda output: json.loads(output))
def bash_task() -> str:
return """
jq -c '.[] | select(.lastModified > "{{ data_interval_start | ts_zulu }}" or .created > "{{ data_interval_start | ts_zulu }}")' \\
example.json
"""
bash_task = BashOperator(
task_id="filter_today_changes",
bash_command="""
jq -c '.[] | select(.lastModified > "{{ data_interval_start | ts_zulu }}" or .created > "{{ data_interval_start | ts_zulu }}")' \\
example.json
""",
output_processor=lambda output: json.loads(output),
)
Executing commands from files¶
Both the BashOperator
and @task.bash
TaskFlow decorator enables you to execute Bash commands stored
in files. The files must have a .sh
or .bash
extension.
With Jinja template¶
You can execute bash script which contains Jinja templates. When you do so, Airflow
loads the content of your file, render the templates, and write the rendered script
into a temporary file. By default, the file is placed in a temporary directory
(under /tmp
). You can change this location with the cwd
parameter.
Caution
Airflow must have write access to /tmp
or the cwd
directory, to be
able to write the temporary file to the disk.
To execute a bash script, place it in a location relative to the directory containing
the DAG file. So if your DAG file is in /usr/local/airflow/dags/test_dag.py
, you can
move your test.sh
file to any location under /usr/local/airflow/dags/
(Example:
/usr/local/airflow/dags/scripts/test.sh
) and pass the relative path to bash_command
as shown below:
@task.bash
def bash_example():
# "scripts" folder is under "/usr/local/airflow/dags"
return "scripts/test.sh"
t2 = BashOperator(
task_id="bash_example",
# "scripts" folder is under "/usr/local/airflow/dags"
bash_command="scripts/test.sh",
)
Creating separate folder for Bash scripts may be desirable for many reasons, like separating your script’s logic and pipeline code, allowing for proper code highlighting in files composed in different languages, and general flexibility in structuring pipelines.
It is also possible to define your template_searchpath
as pointing to any folder
locations in the DAG constructor call.
@dag(..., template_searchpath="/opt/scripts")
def example_bash_dag():
@task.bash
def bash_example():
return "test.sh "
with DAG("example_bash_dag", ..., template_searchpath="/opt/scripts"):
t2 = BashOperator(
task_id="bash_example",
bash_command="test.sh ",
)
Without Jinja template¶
If your script doesn’t contains any Jinja template, disable Airflow’s rendering by adding a space after the script name.
@task.bash
def run_command_from_script() -> str:
return "$AIRFLOW_HOME/scripts/example.sh "
run_script = run_command_from_script()
run_script = BashOperator(
task_id="run_command_from_script",
bash_command="$AIRFLOW_HOME/scripts/example.sh ",
)
Jinja template not found¶
If you encounter a “Template not found” exception when trying to execute a Bash script, add a space after the script name. This is because Airflow tries to apply a Jinja template to it, which will fail.
@task.bash
def bash_example():
# This fails with 'Jinja template not found' error
# return "/home/batcher/test.sh",
# This works (has a space after)
return "/home/batcher/test.sh "
BashOperator(
task_id="bash_example",
# This fails with 'Jinja template not found' error
# bash_command="/home/batcher/test.sh",
# This works (has a space after)
bash_command="/home/batcher/test.sh ",
)
However, if you want to use templating in your Bash script, do not add the space and instead check the bash script with Jinja template section.
Enriching Bash with Python¶
The @task.bash
TaskFlow decorator allows you to combine both Bash and Python into a powerful combination
within a task.
Using Python conditionals, other function calls, etc. within a @task.bash
task can help define, augment,
or even build the Bash command(s) to execute.
For example, use conditional logic to determine task behavior:
@task.bash
def sleep_in(day: str) -> str:
if day in (WeekDay.SATURDAY, WeekDay.SUNDAY):
return f"sleep {60 * 60}"
else:
raise AirflowSkipException("No sleeping in today!")
sleep_in(day="{{ dag_run.logical_date.strftime('%A').lower() }}")
Or call a function to help build a Bash command:
def _get_files_in_cwd() -> list[str]:
from pathlib import Path
dir_contents = Path.cwd().glob("airflow/example_dags/*.py")
files = [str(elem) for elem in dir_contents if elem.is_file()]
return files
@task.bash
def get_file_stats() -> str:
from shlex import join
files = _get_files_in_cwd()
cmd = join(["stat", *files])
return cmd
get_file_stats()
There are numerous possibilities with this type of pre-execution enrichment.