Connect to Trino using SQLExecuteQueryOperator

Use the SQLExecuteQueryOperator to execute SQL commands in a Trino query engine.

Warning

TrinoOperator is deprecated in favor of SQLExecuteQueryOperator. If you are using TrinoOperator you should migrate as soon as possible.

Using the Operator

Use the trino_conn_id argument to connect to your Trino instance

An example usage of the SQLExecuteQueryOperator to connect to Trino is as follows:

tests/system/trino/example_trino.py[source]


with models.DAG(
    dag_id="example_trino",
    schedule="@once",  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:
    trino_create_schema = SQLExecuteQueryOperator(
        task_id="trino_create_schema",
        sql=f"CREATE SCHEMA IF NOT EXISTS {SCHEMA} WITH (location = 's3://irisbkt/cities/');",
        handler=list,
    )
    trino_create_table = SQLExecuteQueryOperator(
        task_id="trino_create_table",
        sql=f"""CREATE TABLE IF NOT EXISTS {SCHEMA}.{TABLE}(
        cityid bigint,
        cityname varchar
        )""",
        handler=list,
    )
    trino_insert = SQLExecuteQueryOperator(
        task_id="trino_insert",
        sql=f"""INSERT INTO {SCHEMA}.{TABLE} VALUES (1, 'San Francisco');""",
        handler=list,
    )
    trino_multiple_queries = SQLExecuteQueryOperator(
        task_id="trino_multiple_queries",
        sql=f"""CREATE TABLE IF NOT EXISTS {SCHEMA}.{TABLE1}(cityid bigint,cityname varchar);
        INSERT INTO {SCHEMA}.{TABLE1} VALUES (2, 'San Jose');
        CREATE TABLE IF NOT EXISTS {SCHEMA}.{TABLE2}(cityid bigint,cityname varchar);
        INSERT INTO {SCHEMA}.{TABLE2} VALUES (3, 'San Diego');""",
        handler=list,
    )
    trino_templated_query = SQLExecuteQueryOperator(
        task_id="trino_templated_query",
        sql="SELECT * FROM {{ params.SCHEMA }}.{{ params.TABLE }}",
        handler=list,
        params={"SCHEMA": SCHEMA, "TABLE": TABLE1},
    )
    trino_parameterized_query = SQLExecuteQueryOperator(
        task_id="trino_parameterized_query",
        sql=f"select * from {SCHEMA}.{TABLE2} where cityname = ?",
        parameters=("San Diego",),
        handler=list,
    )

    (
        trino_create_schema
        >> trino_create_table
        >> trino_insert
        >> trino_multiple_queries
        >> trino_templated_query
        >> trino_parameterized_query
    )

Note

This Operator can be used to run any syntactically correct Trino query, and multiple queries can be passed either using a list or a string

Was this entry helpful?