dbt Cloud Operators

These operators can execute dbt Cloud jobs, poll for status of a currently-executing job, and download run artifacts locally.

Each of the operators can be tied to a specific dbt Cloud Account in two ways:

  • Explicitly provide the Account ID (via the account_id parameter) to the operator.

  • Or, specify the dbt Cloud Account in the Airflow Connection. The operators will fallback to using this automatically if the Account ID is not passed to the operator.

Trigger a dbt Cloud Job

Use the DbtCloudRunJobOperator to trigger a run of a dbt Cloud job. By default, the operator will periodically check on the status of the executed job to terminate with a successful status every check_interval seconds or until the job reaches a timeout length of execution time. This functionality is controlled by the wait_for_termination parameter. Alternatively, wait_for_termination can be set to False to perform an asynchronous wait (typically paired with the DbtCloudJobRunSensor). Setting wait_for_termination to False is a good approach for long-running dbt Cloud jobs.

The deferrable parameter along with wait_for_termination will control the functionality whether to poll the job status on the worker or defer using the Triggerer. When wait_for_termination is True and deferrable is False,we submit the job and poll for its status on the worker. This will keep the worker slot occupied till the job execution is done. When wait_for_termination is True and deferrable is True, we submit the job and defer using Triggerer. This will release the worker slot leading to savings in resource utilization while the job is running.

When wait_for_termination is False and deferrable is False, we just submit the job and can only track the job status with the DbtCloudJobRunSensor.

When retry_from_failure is True, we retry the run for a job from the point of failure, if the run failed. Otherwise we trigger a new run. For more information on the retry logic, reference the API documentation.

While schema_override and steps_override are explicit, optional parameters for the DbtCloudRunJobOperator, custom run configurations can also be passed to the operator using the additional_run_config dictionary. This parameter can be used to initialize additional runtime configurations or overrides for the job run such as threads_override, generate_docs_override, git_branch, etc. For a complete list of the other configurations that can used at runtime, reference the API documentation.

The below examples demonstrate how to instantiate DbtCloudRunJobOperator tasks with both synchronous and asynchronous waiting for run termination, respectively. To note, the account_id for the operators is referenced within the default_args of the example DAG.

tests/system/dbt/cloud/example_dbt_cloud.py[source]

trigger_job_run1 = DbtCloudRunJobOperator(
    task_id="trigger_job_run1",
    job_id=48617,
    check_interval=10,
    timeout=300,
)

This next example also shows how to pass in custom runtime configuration (in this case for threads_override) via the additional_run_config dictionary.

tests/system/dbt/cloud/example_dbt_cloud.py[source]

trigger_job_run2 = DbtCloudRunJobOperator(
    task_id="trigger_job_run2",
    job_id=48617,
    wait_for_termination=False,
    additional_run_config={"threads_override": 8},
)

Poll for status of a dbt Cloud Job run

Use the DbtCloudJobRunSensor to periodically retrieve the status of a dbt Cloud job run and check whether the run has succeeded. This sensor provides all of the same functionality available with the BaseSensorOperator.

In the example below, the run_id value in the example below comes from the output of a previous DbtCloudRunJobOperator task by utilizing the .output property exposed for all operators. Also, to note, the account_id for the task is referenced within the default_args of the example DAG.

tests/system/dbt/cloud/example_dbt_cloud.py[source]

job_run_sensor = DbtCloudJobRunSensor(
    task_id="job_run_sensor", run_id=trigger_job_run2.output, timeout=20
)

Also, you can poll for status of the job run asynchronously using deferrable mode. In this mode, worker slots are freed up while the sensor is running.

tests/system/dbt/cloud/example_dbt_cloud.py[source]

job_run_sensor_deferred = DbtCloudJobRunSensor(
    task_id="job_run_sensor_deferred", run_id=trigger_job_run2.output, timeout=20, deferrable=True
)

Download run artifacts

Use the DbtCloudGetJobRunArtifactOperator to download dbt-generated artifacts for a dbt Cloud job run. The specified path value should be rooted at the target/ directory. Typical artifacts include manifest.json, catalog.json, and run_results.json, but other artifacts such as raw SQL of models or sources.json can also be downloaded.

For more information on dbt Cloud artifacts, reference this documentation.

tests/system/dbt/cloud/example_dbt_cloud.py[source]

get_run_results_artifact = DbtCloudGetJobRunArtifactOperator(
    task_id="get_run_results_artifact", run_id=trigger_job_run1.output, path="run_results.json"
)

List jobs

Use the DbtCloudListJobsOperator to list all jobs tied to a specified dbt Cloud account. The account_id must be supplied either through the connection or supplied as a parameter to the task.

If a project_id is supplied, only jobs pertaining to this project id will be retrieved.

For more information on dbt Cloud list jobs, reference this documentation.

tests/system/dbt/cloud/example_dbt_cloud.py[source]

list_dbt_jobs = DbtCloudListJobsOperator(task_id="list_dbt_jobs", account_id=106277, project_id=160645)

Was this entry helpful?