TeradataOperator¶
The purpose of TeradataOperator is to define tasks involving interactions with the Teradata.
To execute arbitrary SQL in an Teradata, use the
TeradataOperator
.
Common Database Operations with TeradataOperator¶
Creating a Teradata database table¶
An example usage of the TeradataOperator is as follows:
create_table = TeradataOperator(
task_id="create_table",
sql=r"""
CREATE TABLE Country (
country_id INTEGER,
name CHAR(25),
continent CHAR(25)
);
""",
)
You can also use an external file to execute the SQL commands. External file must be at the same level as DAG.py file. This way you can easily maintain the SQL queries separated from the code.
create_table_from_external_file = TeradataOperator(
task_id="create_table_from_external_file",
sql="create_table.sql",
dag=dag,
)
Your dags/create_table.sql
should look like this:
-- create Users table
CREATE TABLE Users, FALLBACK (
username varchar(50),
description varchar(256)
);
Inserting data into a Teradata database table¶
We can then create a TeradataOperator task that populate the Users
table.
populate_table = TeradataOperator(
task_id="populate_table",
sql=r"""
INSERT INTO Users (username, description)
VALUES ( 'Danny', 'Musician');
INSERT INTO Users (username, description)
VALUES ( 'Simone', 'Chef');
INSERT INTO Users (username, description)
VALUES ( 'Lily', 'Florist');
INSERT INTO Users (username, description)
VALUES ( 'Tim', 'Pet shop owner');
""",
)
Fetching records from your Teradata database table¶
Fetching records from your Teradata database table can be as simple as:
get_all_countries = TeradataOperator(
task_id="get_all_countries",
sql=r"""
SELECT * FROM Country;
""",
)
Passing Parameters into TeradataOperator¶
TeradataOperator provides parameters
attribute which makes it possible to dynamically inject values into your
SQL requests during runtime.
To find the countries in Asian continent:
get_countries_from_continent = TeradataOperator(
task_id="get_countries_from_continent",
sql=r"""
SELECT * FROM Country WHERE {{ params.column }}='{{ params.value }}';
""",
params={"column": "continent", "value": "Asia"},
)
Dropping a Teradata database table¶
We can then create a TeradataOperator task that drops the Users
table.
drop_users_table = TeradataOperator(
task_id="drop_users_table",
sql=r"""
DROP TABLE Users;
""",
dag=dag,
)
The complete Teradata Operator DAG¶
When we put everything together, our DAG should look like this:
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_teradata"
with DAG(
dag_id=DAG_ID,
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
catchup=False,
default_args={"conn_id": "teradata_default"},
) as dag:
create_table = TeradataOperator(
task_id="create_table",
sql=r"""
CREATE TABLE Country (
country_id INTEGER,
name CHAR(25),
continent CHAR(25)
);
""",
)
create_table_from_external_file = TeradataOperator(
task_id="create_table_from_external_file",
sql="create_table.sql",
dag=dag,
)
populate_table = TeradataOperator(
task_id="populate_table",
sql=r"""
INSERT INTO Users (username, description)
VALUES ( 'Danny', 'Musician');
INSERT INTO Users (username, description)
VALUES ( 'Simone', 'Chef');
INSERT INTO Users (username, description)
VALUES ( 'Lily', 'Florist');
INSERT INTO Users (username, description)
VALUES ( 'Tim', 'Pet shop owner');
""",
)
get_all_countries = TeradataOperator(
task_id="get_all_countries",
sql=r"""
SELECT * FROM Country;
""",
)
get_countries_from_continent = TeradataOperator(
task_id="get_countries_from_continent",
sql=r"""
SELECT * FROM Country WHERE {{ params.column }}='{{ params.value }}';
""",
params={"column": "continent", "value": "Asia"},
)
drop_country_table = TeradataOperator(
task_id="drop_country_table",
sql=r"""
DROP TABLE Country;
""",
dag=dag,
)
drop_users_table = TeradataOperator(
task_id="drop_users_table",
sql=r"""
DROP TABLE Users;
""",
dag=dag,
)
create_schema = TeradataOperator(
task_id="create_schema",
sql=r"""
CREATE DATABASE airflow_temp AS PERM=10e6;
""",
)
create_table_with_schema = TeradataOperator(
task_id="create_table_with_schema",
sql=r"""
CREATE TABLE schema_table (
country_id INTEGER,
name CHAR(25),
continent CHAR(25)
);
""",
schema="airflow_temp",
)
drop_schema_table = TeradataOperator(
task_id="drop_schema_table",
sql=r"""
DROP TABLE schema_table;
""",
dag=dag,
schema="airflow_temp",
)
drop_schema = TeradataOperator(
task_id="drop_schema",
sql=r"""
DROP DATABASE airflow_temp;
""",
dag=dag,
)
(
create_table
>> create_table_from_external_file
>> populate_table
>> get_all_countries
>> get_countries_from_continent
>> drop_country_table
>> drop_users_table
>> create_schema
>> create_table_with_schema
>> drop_schema_table
>> drop_schema
)