TeradataOperator

The purpose of TeradataOperator is to define tasks involving interactions with the Teradata.

To execute arbitrary SQL in an Teradata, use the TeradataOperator.

Common Database Operations with TeradataOperator

Creating a Teradata database table

An example usage of the TeradataOperator is as follows:

tests/system/providers/teradata/example_teradata.py[source]

create_table = TeradataOperator(
    task_id="create_table",
    sql=r"""
    CREATE TABLE Country (
        country_id INTEGER,
        name CHAR(25),
        continent CHAR(25)
    );
    """,
)

You can also use an external file to execute the SQL commands. External file must be at the same level as DAG.py file. This way you can easily maintain the SQL queries separated from the code.

tests/system/providers/teradata/example_teradata.py[source]

    create_table_from_external_file = TeradataOperator(
        task_id="create_table_from_external_file",
        sql="create_table.sql",
        dag=dag,
    )

Your dags/create_table.sql should look like this:

  -- create Users table
  CREATE TABLE Users, FALLBACK (
    username   varchar(50),
    description           varchar(256)
);

Inserting data into a Teradata database table

We can then create a TeradataOperator task that populate the Users table.

tests/system/providers/teradata/example_teradata.py[source]

    populate_table = TeradataOperator(
        task_id="populate_table",
        sql=r"""
        INSERT INTO Users (username, description)
            VALUES ( 'Danny', 'Musician');
        INSERT INTO Users (username, description)
            VALUES ( 'Simone', 'Chef');
        INSERT INTO Users (username, description)
            VALUES ( 'Lily', 'Florist');
        INSERT INTO Users (username, description)
            VALUES ( 'Tim', 'Pet shop owner');
        """,
    )

Fetching records from your Teradata database table

Fetching records from your Teradata database table can be as simple as:

tests/system/providers/teradata/example_teradata.py[source]

    get_all_countries = TeradataOperator(
        task_id="get_all_countries",
        sql=r"""
        SELECT * FROM Country;
        """,
    )

Passing Parameters into TeradataOperator

TeradataOperator provides parameters attribute which makes it possible to dynamically inject values into your SQL requests during runtime.

To find the countries in Asian continent:

tests/system/providers/teradata/example_teradata.py[source]

    get_countries_from_continent = TeradataOperator(
        task_id="get_countries_from_continent",
        sql=r"""
        SELECT * FROM Country WHERE {{ params.column }}='{{ params.value }}';
        """,
        params={"column": "continent", "value": "Asia"},
    )

Dropping a Teradata database table

We can then create a TeradataOperator task that drops the Users table.

tests/system/providers/teradata/example_teradata.py[source]

    drop_users_table = TeradataOperator(
        task_id="drop_users_table",
        sql=r"""
        DROP TABLE Users;
        """,
        dag=dag,
    )

The complete Teradata Operator DAG

When we put everything together, our DAG should look like this:

tests/system/providers/teradata/example_teradata.py[source]



ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_teradata"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime.datetime(2020, 2, 2),
    schedule="@once",
    catchup=False,
    default_args={"conn_id": "teradata_default"},
) as dag:
    create_table = TeradataOperator(
        task_id="create_table",
        sql=r"""
        CREATE TABLE Country (
            country_id INTEGER,
            name CHAR(25),
            continent CHAR(25)
        );
        """,
    )
    create_table_from_external_file = TeradataOperator(
        task_id="create_table_from_external_file",
        sql="create_table.sql",
        dag=dag,
    )
    populate_table = TeradataOperator(
        task_id="populate_table",
        sql=r"""
        INSERT INTO Users (username, description)
            VALUES ( 'Danny', 'Musician');
        INSERT INTO Users (username, description)
            VALUES ( 'Simone', 'Chef');
        INSERT INTO Users (username, description)
            VALUES ( 'Lily', 'Florist');
        INSERT INTO Users (username, description)
            VALUES ( 'Tim', 'Pet shop owner');
        """,
    )
    get_all_countries = TeradataOperator(
        task_id="get_all_countries",
        sql=r"""
        SELECT * FROM Country;
        """,
    )
    get_countries_from_continent = TeradataOperator(
        task_id="get_countries_from_continent",
        sql=r"""
        SELECT * FROM Country WHERE {{ params.column }}='{{ params.value }}';
        """,
        params={"column": "continent", "value": "Asia"},
    )
    drop_country_table = TeradataOperator(
        task_id="drop_country_table",
        sql=r"""
        DROP TABLE Country;
        """,
        dag=dag,
    )
    drop_users_table = TeradataOperator(
        task_id="drop_users_table",
        sql=r"""
        DROP TABLE Users;
        """,
        dag=dag,
    )
    create_schema = TeradataOperator(
        task_id="create_schema",
        sql=r"""
        CREATE DATABASE airflow_temp AS PERM=10e6;
        """,
    )
    create_table_with_schema = TeradataOperator(
        task_id="create_table_with_schema",
        sql=r"""
        CREATE TABLE schema_table (
           country_id INTEGER,
           name CHAR(25),
           continent CHAR(25)
        );
        """,
        schema="airflow_temp",
    )
    drop_schema_table = TeradataOperator(
        task_id="drop_schema_table",
        sql=r"""
        DROP TABLE schema_table;
        """,
        dag=dag,
        schema="airflow_temp",
    )
    drop_schema = TeradataOperator(
        task_id="drop_schema",
        sql=r"""
        DROP DATABASE airflow_temp;
        """,
        dag=dag,
    )
    (
        create_table
        >> create_table_from_external_file
        >> populate_table
        >> get_all_countries
        >> get_countries_from_continent
        >> drop_country_table
        >> drop_users_table
        >> create_schema
        >> create_table_with_schema
        >> drop_schema_table
        >> drop_schema
    )

Was this entry helpful?