DatabricksSqlOperator

Use the DatabricksSqlOperator to execute SQL on a Databricks SQL warehouse or a Databricks cluster.

Using the Operator

Operator executes given SQL queries against configured warehouse. The only required parameters are:

  • sql - SQL queries to execute. There are 3 ways of specifying SQL queries:

    1. Simple string with SQL statement.

    2. List of strings representing SQL statements.

    3. Name of the file with SQL queries. File must have .sql extension. Each query should finish with ;<new_line>

  • One of sql_warehouse_name (name of Databricks SQL warehouse to use) or http_path (HTTP path for Databricks SQL warehouse or Databricks cluster).

Other parameters are optional and could be found in the class documentation.

Examples

Selecting data

An example usage of the DatabricksSqlOperator to select data from a table is as follows:

tests/system/databricks/example_databricks_sql.py[source]

    # Example of using the Databricks SQL Operator to select data.
    select = DatabricksSqlOperator(
        databricks_conn_id=connection_id,
        sql_endpoint_name=sql_endpoint_name,
        task_id="select_data",
        sql="select * from default.my_airflow_table",
    )

Selecting data into a file

An example usage of the DatabricksSqlOperator to select data from a table and store in a file is as follows:

tests/system/databricks/example_databricks_sql.py[source]

    # Example of using the Databricks SQL Operator to select data into a file with JSONL format.
    select_into_file = DatabricksSqlOperator(
        databricks_conn_id=connection_id,
        sql_endpoint_name=sql_endpoint_name,
        task_id="select_data_into_file",
        sql="select * from default.my_airflow_table",
        output_path="/tmp/1.jsonl",
        output_format="jsonl",
    )

Executing multiple statements

An example usage of the DatabricksSqlOperator to perform multiple SQL statements is as follows:

tests/system/databricks/example_databricks_sql.py[source]

    # Example of using the Databricks SQL Operator to perform multiple operations.
    create = DatabricksSqlOperator(
        databricks_conn_id=connection_id,
        sql_endpoint_name=sql_endpoint_name,
        task_id="create_and_populate_table",
        sql=[
            "drop table if exists default.my_airflow_table",
            "create table default.my_airflow_table(id int, v string)",
            "insert into default.my_airflow_table values (1, 'test 1'), (2, 'test 2')",
        ],
    )

Executing multiple statements from a file

An example usage of the DatabricksSqlOperator to perform statements from a file is as follows:

tests/system/databricks/example_databricks_sql.py[source]

    # Example of using the Databricks SQL Operator to select data.
    # SQL statements should be in the file with name test.sql
    create_file = DatabricksSqlOperator(
        databricks_conn_id=connection_id,
        sql_endpoint_name=sql_endpoint_name,
        task_id="create_and_populate_from_file",
        sql="test.sql",
    )

DatabricksSqlSensor

Use the DatabricksSqlSensor to run the sensor for a table accessible via a Databricks SQL warehouse or interactive cluster.

Using the Sensor

The sensor executes the SQL statement supplied by the user. The only required parameters are:

  • sql - SQL query to execute for the sensor.

  • One of sql_warehouse_name (name of Databricks SQL warehouse to use) or http_path (HTTP path for Databricks SQL warehouse or Databricks cluster).

Other parameters are optional and could be found in the class documentation.

Examples

Configuring Databricks connection to be used with the Sensor.

tests/system/databricks/example_databricks_sensors.py[source]

# Connection string setup for Databricks workspace.
connection_id = "databricks_default"
sql_warehouse_name = "Starter Warehouse"

Poking the specific table with the SQL statement:

tests/system/databricks/example_databricks_sensors.py[source]

# Example of using the Databricks SQL Sensor to check existence of data in a table.
sql_sensor = DatabricksSqlSensor(
    databricks_conn_id=connection_id,
    sql_warehouse_name=sql_warehouse_name,
    catalog="hive_metastore",
    task_id="sql_sensor_task",
    sql="select * from hive_metastore.temp.sample_table_3 limit 1",
    timeout=60 * 2,
)

DatabricksPartitionSensor

Sensors are a special type of Operator that are designed to do exactly one thing - wait for something to occur. It can be time-based, or waiting for a file, or an external event, but all they do is wait until something happens, and then succeed so their downstream tasks can run.

For the Databricks Partition Sensor, we check if a partition and its related value exists and if not, it waits until the partition value arrives. The waiting time and interval to check can be configured in the timeout and poke_interval parameters respectively.

Use the DatabricksPartitionSensor to run the sensor for a table accessible via a Databricks SQL warehouse or interactive cluster.

Using the Sensor

The sensor accepts the table name and partition name(s), value(s) from the user and generates the SQL query to check if the specified partition name, value(s) exist in the specified table.

The required parameters are:

  • table_name (name of the table for partition check).

  • partitions (name of the partitions to check).

  • partition_operator (comparison operator for partitions, to be used for range or limit of values, such as partition_name >= partition_value). Databricks comparison operators are supported.

  • One of sql_warehouse_name (name of Databricks SQL warehouse to use) or http_path (HTTP path for Databricks SQL warehouse or Databricks cluster).

Other parameters are optional and can be found in the class documentation.

Examples

Configuring Databricks connection to be used with the Sensor.

tests/system/databricks/example_databricks_sensors.py[source]

# Connection string setup for Databricks workspace.
connection_id = "databricks_default"
sql_warehouse_name = "Starter Warehouse"

Poking the specific table for existence of data/partition:

tests/system/databricks/example_databricks_sensors.py[source]

# Example of using the Databricks Partition Sensor to check the presence
# of the specified partition(s) in a table.
partition_sensor = DatabricksPartitionSensor(
    databricks_conn_id=connection_id,
    sql_warehouse_name=sql_warehouse_name,
    catalog="hive_metastore",
    task_id="partition_sensor_task",
    table_name="sample_table_2",
    schema="temp",
    partitions={"date": "2023-01-03", "name": ["abc", "def"]},
    partition_operator="=",
    timeout=60 * 2,
)

Was this entry helpful?