airflow.providers.databricks.hooks.databricks

Databricks hook.

This hook enable the submitting and running of jobs to the Databricks platform. Internally the operators talk to the api/2.1/jobs/run-now endpoint <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunNow>_ or the api/2.1/jobs/runs/submit endpoint.

Module Contents

Classes

RunLifeCycleState

Enum for the run life cycle state concept of Databricks runs.

RunState

Utility class for the run state concept of Databricks runs.

ClusterState

Utility class for the cluster state concept of Databricks cluster.

DatabricksHook

Interact with Databricks.

Attributes

GET_CLUSTER_ENDPOINT

RESTART_CLUSTER_ENDPOINT

START_CLUSTER_ENDPOINT

TERMINATE_CLUSTER_ENDPOINT

CREATE_ENDPOINT

RESET_ENDPOINT

UPDATE_ENDPOINT

RUN_NOW_ENDPOINT

SUBMIT_RUN_ENDPOINT

GET_RUN_ENDPOINT

CANCEL_RUN_ENDPOINT

DELETE_RUN_ENDPOINT

REPAIR_RUN_ENDPOINT

OUTPUT_RUNS_JOB_ENDPOINT

CANCEL_ALL_RUNS_ENDPOINT

INSTALL_LIBS_ENDPOINT

UNINSTALL_LIBS_ENDPOINT

LIST_JOBS_ENDPOINT

LIST_PIPELINES_ENDPOINT

WORKSPACE_GET_STATUS_ENDPOINT

SPARK_VERSIONS_ENDPOINT

airflow.providers.databricks.hooks.databricks.GET_CLUSTER_ENDPOINT = ('GET', 'api/2.0/clusters/get')[source]
airflow.providers.databricks.hooks.databricks.RESTART_CLUSTER_ENDPOINT = ('POST', 'api/2.0/clusters/restart')[source]
airflow.providers.databricks.hooks.databricks.START_CLUSTER_ENDPOINT = ('POST', 'api/2.0/clusters/start')[source]
airflow.providers.databricks.hooks.databricks.TERMINATE_CLUSTER_ENDPOINT = ('POST', 'api/2.0/clusters/delete')[source]
airflow.providers.databricks.hooks.databricks.CREATE_ENDPOINT = ('POST', 'api/2.1/jobs/create')[source]
airflow.providers.databricks.hooks.databricks.RESET_ENDPOINT = ('POST', 'api/2.1/jobs/reset')[source]
airflow.providers.databricks.hooks.databricks.UPDATE_ENDPOINT = ('POST', 'api/2.1/jobs/update')[source]
airflow.providers.databricks.hooks.databricks.RUN_NOW_ENDPOINT = ('POST', 'api/2.1/jobs/run-now')[source]
airflow.providers.databricks.hooks.databricks.SUBMIT_RUN_ENDPOINT = ('POST', 'api/2.1/jobs/runs/submit')[source]
airflow.providers.databricks.hooks.databricks.GET_RUN_ENDPOINT = ('GET', 'api/2.1/jobs/runs/get')[source]
airflow.providers.databricks.hooks.databricks.CANCEL_RUN_ENDPOINT = ('POST', 'api/2.1/jobs/runs/cancel')[source]
airflow.providers.databricks.hooks.databricks.DELETE_RUN_ENDPOINT = ('POST', 'api/2.1/jobs/runs/delete')[source]
airflow.providers.databricks.hooks.databricks.REPAIR_RUN_ENDPOINT = ('POST', 'api/2.1/jobs/runs/repair')[source]
airflow.providers.databricks.hooks.databricks.OUTPUT_RUNS_JOB_ENDPOINT = ('GET', 'api/2.1/jobs/runs/get-output')[source]
airflow.providers.databricks.hooks.databricks.CANCEL_ALL_RUNS_ENDPOINT = ('POST', 'api/2.1/jobs/runs/cancel-all')[source]
airflow.providers.databricks.hooks.databricks.INSTALL_LIBS_ENDPOINT = ('POST', 'api/2.0/libraries/install')[source]
airflow.providers.databricks.hooks.databricks.UNINSTALL_LIBS_ENDPOINT = ('POST', 'api/2.0/libraries/uninstall')[source]
airflow.providers.databricks.hooks.databricks.LIST_JOBS_ENDPOINT = ('GET', 'api/2.1/jobs/list')[source]
airflow.providers.databricks.hooks.databricks.LIST_PIPELINES_ENDPOINT = ('GET', 'api/2.0/pipelines')[source]
airflow.providers.databricks.hooks.databricks.WORKSPACE_GET_STATUS_ENDPOINT = ('GET', 'api/2.0/workspace/get-status')[source]
airflow.providers.databricks.hooks.databricks.SPARK_VERSIONS_ENDPOINT = ('GET', 'api/2.0/clusters/spark-versions')[source]
class airflow.providers.databricks.hooks.databricks.RunLifeCycleState[source]

Bases: enum.Enum

Enum for the run life cycle state concept of Databricks runs.

See more information at: https://docs.databricks.com/api/azure/workspace/jobs/listruns#runs-state-life_cycle_state

BLOCKED = 'BLOCKED'[source]
INTERNAL_ERROR = 'INTERNAL_ERROR'[source]
PENDING = 'PENDING'[source]
QUEUED = 'QUEUED'[source]
RUNNING = 'RUNNING'[source]
SKIPPED = 'SKIPPED'[source]
TERMINATED = 'TERMINATED'[source]
TERMINATING = 'TERMINATING'[source]
WAITING_FOR_RETRY = 'WAITING_FOR_RETRY'[source]
class airflow.providers.databricks.hooks.databricks.RunState(life_cycle_state, result_state='', state_message='', *args, **kwargs)[source]

Utility class for the run state concept of Databricks runs.

property is_terminal: bool[source]

True if the current state is a terminal state.

property is_successful: bool[source]

True if the result state is SUCCESS.

RUN_LIFE_CYCLE_STATES = ['PENDING', 'RUNNING', 'TERMINATING', 'TERMINATED', 'SKIPPED', 'INTERNAL_ERROR', 'QUEUED'][source]
__eq__(other)[source]

Return self==value.

__repr__()[source]

Return repr(self).

to_json()[source]
classmethod from_json(data)[source]
class airflow.providers.databricks.hooks.databricks.ClusterState(state='', state_message='', *args, **kwargs)[source]

Utility class for the cluster state concept of Databricks cluster.

property is_terminal: bool[source]

True if the current state is a terminal state.

property is_running: bool[source]

True if the current state is running.

CLUSTER_LIFE_CYCLE_STATES = ['PENDING', 'RUNNING', 'RESTARTING', 'RESIZING', 'TERMINATING', 'TERMINATED', 'ERROR', 'UNKNOWN'][source]
__eq__(other)[source]

Return self==value.

__repr__()[source]

Return repr(self).

to_json()[source]
classmethod from_json(data)[source]
class airflow.providers.databricks.hooks.databricks.DatabricksHook(databricks_conn_id=BaseDatabricksHook.default_conn_name, timeout_seconds=180, retry_limit=3, retry_delay=1.0, retry_args=None, caller='DatabricksHook')[source]

Bases: airflow.providers.databricks.hooks.databricks_base.BaseDatabricksHook

Interact with Databricks.

Parameters
  • databricks_conn_id (str) – Reference to the Databricks connection.

  • timeout_seconds (int) – The amount of time in seconds the requests library will wait before timing-out.

  • retry_limit (int) – The number of times to retry the connection in case of service outages.

  • retry_delay (float) – The number of seconds to wait between retries (it might be a floating point number).

  • retry_args (dict[Any, Any] | None) – An optional dictionary with arguments passed to tenacity.Retrying class.

hook_name = 'Databricks'[source]
create_job(json)[source]

Call the api/2.1/jobs/create endpoint.

Parameters

json (dict) – The data used in the body of the request to the create endpoint.

Returns

the job_id as an int

Return type

int

reset_job(job_id, json)[source]

Call the api/2.1/jobs/reset endpoint.

Parameters

json (dict) – The data used in the new_settings of the request to the reset endpoint.

update_job(job_id, json)[source]

Call the api/2.1/jobs/update endpoint.

Parameters
  • job_id (str) – The id of the job to update.

  • json (dict) – The data used in the new_settings of the request to the update endpoint.

run_now(json)[source]

Call the api/2.1/jobs/run-now endpoint.

Parameters

json (dict) – The data used in the body of the request to the run-now endpoint.

Returns

the run_id as an int

Return type

int

submit_run(json)[source]

Call the api/2.1/jobs/runs/submit endpoint.

Parameters

json (dict) – The data used in the body of the request to the submit endpoint.

Returns

the run_id as an int

Return type

int

list_jobs(limit=25, expand_tasks=False, job_name=None, page_token=None, include_user_names=False)[source]

List the jobs in the Databricks Job Service.

Parameters
  • limit (int) – The limit/batch size used to retrieve jobs.

  • expand_tasks (bool) – Whether to include task and cluster details in the response.

  • job_name (str | None) – Optional name of a job to search.

  • page_token (str | None) – The optional page token pointing at the first first job to return.

Returns

A list of jobs.

Return type

list[dict[str, Any]]

find_job_id_by_name(job_name)[source]

Find job id by its name; if there are multiple jobs with the same name, raise AirflowException.

Parameters

job_name (str) – The name of the job to look up.

Returns

The job_id as an int or None if no job was found.

Return type

int | None

list_pipelines(batch_size=25, pipeline_name=None, notebook_path=None)[source]

List the pipelines in Databricks Delta Live Tables.

Parameters
  • batch_size (int) – The limit/batch size used to retrieve pipelines.

  • pipeline_name (str | None) – Optional name of a pipeline to search. Cannot be combined with path.

  • notebook_path (str | None) – Optional notebook of a pipeline to search. Cannot be combined with name.

Returns

A list of pipelines.

Return type

list[dict[str, Any]]

find_pipeline_id_by_name(pipeline_name)[source]

Find pipeline id by its name; if multiple pipelines with the same name, raise AirflowException.

Parameters

pipeline_name (str) – The name of the pipeline to look up.

Returns

The pipeline_id as a GUID string or None if no pipeline was found.

Return type

str | None

get_run_page_url(run_id)[source]

Retrieve run_page_url.

Parameters

run_id (int) – id of the run

Returns

URL of the run page

Return type

str

async a_get_run_page_url(run_id)[source]

Async version of get_run_page_url().

Parameters

run_id (int) – id of the run

Returns

URL of the run page

Return type

str

get_job_id(run_id)[source]

Retrieve job_id from run_id.

Parameters

run_id (int) – id of the run

Returns

Job id for given Databricks run

Return type

int

get_run_state(run_id)[source]

Retrieve run state of the run.

Please note that any Airflow tasks that call the get_run_state method will result in failure unless you have enabled xcom pickling. This can be done using the following environment variable: AIRFLOW__CORE__ENABLE_XCOM_PICKLING

If you do not want to enable xcom pickling, use the get_run_state_str method to get a string describing state, or get_run_state_lifecycle, get_run_state_result, or get_run_state_message to get individual components of the run state.

Parameters

run_id (int) – id of the run

Returns

state of the run

Return type

RunState

async a_get_run_state(run_id)[source]

Async version of get_run_state().

Parameters

run_id (int) – id of the run

Returns

state of the run

Return type

RunState

get_run(run_id)[source]

Retrieve run information.

Parameters

run_id (int) – id of the run

Returns

state of the run

Return type

dict[str, Any]

async a_get_run(run_id)[source]

Async version of get_run.

Parameters

run_id (int) – id of the run

Returns

state of the run

Return type

dict[str, Any]

get_run_state_str(run_id)[source]

Return the string representation of RunState.

Parameters

run_id (int) – id of the run

Returns

string describing run state

Return type

str

get_run_state_lifecycle(run_id)[source]

Return the lifecycle state of the run.

Parameters

run_id (int) – id of the run

Returns

string with lifecycle state

Return type

str

get_run_state_result(run_id)[source]

Return the resulting state of the run.

Parameters

run_id (int) – id of the run

Returns

string with resulting state

Return type

str

get_run_state_message(run_id)[source]

Return the state message for the run.

Parameters

run_id (int) – id of the run

Returns

string with state message

Return type

str

get_run_output(run_id)[source]

Retrieve run output of the run.

Parameters

run_id (int) – id of the run

Returns

output of the run

Return type

dict

async a_get_run_output(run_id)[source]

Async version of get_run_output().

Parameters

run_id (int) – id of the run

Returns

output of the run

Return type

dict

cancel_run(run_id)[source]

Cancel the run.

Parameters

run_id (int) – id of the run

cancel_all_runs(job_id)[source]

Cancel all active runs of a job asynchronously.

Parameters

job_id (int) – The canonical identifier of the job to cancel all runs of

delete_run(run_id)[source]

Delete a non-active run.

Parameters

run_id (int) – id of the run

repair_run(json)[source]

Re-run one or more tasks.

Parameters

json (dict) – repair a job run.

get_latest_repair_id(run_id)[source]

Get latest repair id if any exist for run_id else None.

get_cluster_state(cluster_id)[source]

Retrieve run state of the cluster.

Parameters

cluster_id (str) – id of the cluster

Returns

state of the cluster

Return type

ClusterState

async a_get_cluster_state(cluster_id)[source]

Async version of get_cluster_state.

Parameters

cluster_id (str) – id of the cluster

Returns

state of the cluster

Return type

ClusterState

restart_cluster(json)[source]

Restarts the cluster.

Parameters

json (dict) – json dictionary containing cluster specification.

start_cluster(json)[source]

Start the cluster.

Parameters

json (dict) – json dictionary containing cluster specification.

terminate_cluster(json)[source]

Terminate the cluster.

Parameters

json (dict) – json dictionary containing cluster specification.

install(json)[source]

Install libraries on the cluster.

Utility function to call the 2.0/libraries/install endpoint.

Parameters

json (dict) – json dictionary containing cluster_id and an array of library

uninstall(json)[source]

Uninstall libraries on the cluster.

Utility function to call the 2.0/libraries/uninstall endpoint.

Parameters

json (dict) – json dictionary containing cluster_id and an array of library

update_repo(repo_id, json)[source]

Update given Databricks Repos.

Parameters
  • repo_id (str) – ID of Databricks Repos

  • json (dict[str, Any]) – payload

Returns

metadata from update

Return type

dict

delete_repo(repo_id)[source]

Delete given Databricks Repos.

Parameters

repo_id (str) – ID of Databricks Repos

Returns

create_repo(json)[source]

Create a Databricks Repos.

Parameters

json (dict[str, Any]) – payload

Returns

Return type

dict

get_repo_by_path(path)[source]

Obtain Repos ID by path.

Parameters

path (str) – path to a repository

Returns

Repos ID if it exists, None if doesn’t.

Return type

str | None

update_job_permission(job_id, json)[source]

Update databricks job permission.

Parameters
  • job_id (int) – job id

  • json (dict[str, Any]) – payload

Returns

json containing permission specification

Return type

dict

test_connection()[source]

Test the Databricks connectivity from UI.

Was this entry helpful?