Papermill¶
Apache Airflow supports integration with Papermill. Papermill is a tool for
parameterizing and executing Jupyter Notebooks. Perhaps you have a financial
report that you wish to run with different values on the first or last day of
a month or at the beginning or end of the year. Using parameters in your
notebook and using the PapermillOperator
makes this a breeze.
Usage¶
Creating a notebook¶
To parameterize your notebook designate a cell with the tag parameters. Papermill looks for the parameters cell and treats this cell as defaults for the parameters passed in at execution time. Papermill will add a new cell tagged with injected-parameters with input parameters in order to overwrite the values in parameters. If no cell is tagged with parameters the injected cell will be inserted at the top of the notebook.
Make sure that you save your notebook somewhere so that Airflow can access it. Papermill supports S3, GCS, Azure and Local. HDFS is not supported.
Example DAG¶
Use the PapermillOperator
to execute a jupyter notebook:
run_this = PapermillOperator(
task_id="run_example_notebook",
input_nb="/tmp/hello_world.ipynb",
output_nb="/tmp/out-{{ execution_date }}.ipynb",
parameters={"msgs": "Ran from Airflow at {{ execution_date }}!"},
)
Example DAG to Verify the message in the notebook:
@task
def check_notebook(inlets, execution_date):
"""
Verify the message in the notebook
"""
notebook = sb.read_notebook(inlets[0].url)
message = notebook.scraps["message"]
print(f"Message in notebook {message} for {execution_date}")
if message.data != f"Ran from Airflow at {execution_date}!":
return False
return True
with DAG(
dag_id="example_papermill_operator_verify",
schedule=SCHEDULE_INTERVAL,
start_date=START_DATE,
dagrun_timeout=DAGRUN_TIMEOUT,
catchup=False,
) as dag:
run_this = PapermillOperator(
task_id="run_example_notebook",
input_nb=os.path.join(os.path.dirname(os.path.realpath(__file__)), "input_notebook.ipynb"),
output_nb="/tmp/out-{{ execution_date }}.ipynb",
parameters={"msgs": "Ran from Airflow at {{ execution_date }}!"},
)
run_this >> check_notebook(inlets=AUTO, execution_date="{{ execution_date }}")
Example DAG to Verify the message in the notebook using a remote jupyter kernel:
@task
def check_notebook(output_notebook, execution_date):
"""
Verify the message in the notebook
"""
notebook = sb.read_notebook(output_notebook)
message = notebook.scraps["message"]
print(f"Message in notebook {message} for {execution_date}")
if message.data != f"Ran from Airflow at {execution_date}!":
return False
return True
with DAG(
dag_id="example_papermill_operator_remote_verify",
schedule="@once",
start_date=START_DATE,
dagrun_timeout=DAGRUN_TIMEOUT,
catchup=False,
) as dag:
run_this = PapermillOperator(
task_id="run_example_notebook",
input_nb=os.path.join(os.path.dirname(os.path.realpath(__file__)), "input_notebook.ipynb"),
output_nb="/tmp/out-{{ execution_date }}.ipynb",
parameters={"msgs": "Ran from Airflow at {{ execution_date }}!"},
kernel_conn_id="jupyter_kernel_default",
)
run_this >> check_notebook(
output_notebook="/tmp/out-{{ execution_date }}.ipynb", execution_date="{{ execution_date }}"
)