TeradataOperator¶
The purpose of TeradataOperator is to define tasks involving interactions with the Teradata.
To execute arbitrary SQL in an Teradata, use the
TeradataOperator
.
Common Database Operations with TeradataOperator¶
Creating a Teradata database table¶
An example usage of the TeradataOperator is as follows:
create_table = TeradataOperator(
task_id="create_table",
sql=r"""
CREATE TABLE Country (
country_id INTEGER,
name CHAR(25),
continent CHAR(25)
);
""",
)
You can also use an external file to execute the SQL commands. External file must be at the same level as DAG.py file. This way you can easily maintain the SQL queries separated from the code.
create_table_from_external_file = TeradataOperator(
task_id="create_table_from_external_file",
sql="create_table.sql",
dag=dag,
)
Your dags/create_table.sql
should look like this:
-- create Users table
CREATE TABLE Users, FALLBACK (
username varchar(50),
description varchar(256)
);
Inserting data into a Teradata database table¶
We can then create a TeradataOperator task that populate the Users
table.
populate_table = TeradataOperator(
task_id="populate_table",
sql=r"""
INSERT INTO Users (username, description)
VALUES ( 'Danny', 'Musician');
INSERT INTO Users (username, description)
VALUES ( 'Simone', 'Chef');
INSERT INTO Users (username, description)
VALUES ( 'Lily', 'Florist');
INSERT INTO Users (username, description)
VALUES ( 'Tim', 'Pet shop owner');
""",
)
Fetching records from your Teradata database table¶
Fetching records from your Teradata database table can be as simple as:
get_all_countries = TeradataOperator(
task_id="get_all_countries",
sql=r"SELECT * FROM Country;",
)
Passing Parameters into TeradataOperator¶
TeradataOperator provides parameters
attribute which makes it possible to dynamically inject values into your
SQL requests during runtime.
To find the countries in Asian continent:
get_countries_from_continent = TeradataOperator(
task_id="get_countries_from_continent",
sql=r"SELECT * FROM Country WHERE {{ params.column }}='{{ params.value }}';",
params={"column": "continent", "value": "Asia"},
)
Dropping a Teradata database table¶
We can then create a TeradataOperator task that drops the Users
table.
drop_users_table = TeradataOperator(
task_id="drop_users_table",
sql=r"DROP TABLE Users;",
dag=dag,
)
The complete Teradata Operator DAG¶
When we put everything together, our DAG should look like this:
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_teradata"
CONN_ID = "teradata_default"
with DAG(
dag_id=DAG_ID,
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
catchup=False,
default_args={"teradata_conn_id": CONN_ID},
) as dag:
create_table = TeradataOperator(
task_id="create_table",
sql=r"""
CREATE TABLE Country (
country_id INTEGER,
name CHAR(25),
continent CHAR(25)
);
""",
)
create_table_from_external_file = TeradataOperator(
task_id="create_table_from_external_file",
sql="create_table.sql",
dag=dag,
)
populate_table = TeradataOperator(
task_id="populate_table",
sql=r"""
INSERT INTO Users (username, description)
VALUES ( 'Danny', 'Musician');
INSERT INTO Users (username, description)
VALUES ( 'Simone', 'Chef');
INSERT INTO Users (username, description)
VALUES ( 'Lily', 'Florist');
INSERT INTO Users (username, description)
VALUES ( 'Tim', 'Pet shop owner');
""",
)
get_all_countries = TeradataOperator(
task_id="get_all_countries",
sql=r"SELECT * FROM Country;",
)
get_countries_from_continent = TeradataOperator(
task_id="get_countries_from_continent",
sql=r"SELECT * FROM Country WHERE {{ params.column }}='{{ params.value }}';",
params={"column": "continent", "value": "Asia"},
)
drop_country_table = TeradataOperator(
task_id="drop_country_table",
sql=r"DROP TABLE Country;",
dag=dag,
)
drop_users_table = TeradataOperator(
task_id="drop_users_table",
sql=r"DROP TABLE Users;",
dag=dag,
)
create_schema = TeradataOperator(
task_id="create_schema",
sql=r"CREATE DATABASE airflow_temp AS PERM=10e6;",
)
create_table_with_schema = TeradataOperator(
task_id="create_table_with_schema",
sql=r"""
CREATE TABLE schema_table (
country_id INTEGER,
name CHAR(25),
continent CHAR(25)
);
""",
schema="airflow_temp",
)
drop_schema_table = TeradataOperator(
task_id="drop_schema_table",
sql=r"DROP TABLE schema_table;",
dag=dag,
schema="airflow_temp",
)
drop_schema = TeradataOperator(
task_id="drop_schema",
sql=r"DROP DATABASE airflow_temp;",
dag=dag,
)
(
create_table
>> create_table_from_external_file
>> populate_table
>> get_all_countries
>> get_countries_from_continent
>> drop_country_table
>> drop_users_table
>> create_schema
>> create_table_with_schema
>> drop_schema_table
>> drop_schema
)
TeradataStoredProcedureOperator¶
The purpose of TeradataStoredProcedureOperator is to define tasks involving executing teradata stored procedures.
Execute a Stored Procedure in a Teradata database¶
To execute a Stored Procedure in an Teradata, use the
TeradataStoredProcedureOperator
.
Assume a stored procedure exists in the database that looks like this:
REPLACE PROCEDURE TEST_PROCEDURE ( IN val_in INTEGER, INOUT val_in_out INTEGER, OUT val_out INTEGER, OUT value_str_out varchar(100) ) BEGIN set val_out = val_in * 2; set val_in_out = val_in_out * 4; set value_str_out = 'string output'; END; /
This stored procedure takes an integer argument, val_in, as input. It operates with a single inout argument, val_in_out, which serves as both input and output. Additionally, it returns an integer argument, val_out, and a string argument, value_str_out.
This stored procedure can be invoked using
TeradataStoredProcedureOperator
in various manners.
One approach involves passing parameters positionally as a list, with output parameters specified as Python data types:
opr_sp_types = TeradataStoredProcedureOperator(
task_id="opr_sp_types",
procedure="TEST_PROCEDURE",
parameters=[3, 1, int, str],
)
Alternatively, parameters can be passed positionally as a list, with output parameters designated as placeholders:
opr_sp_place_holder = TeradataStoredProcedureOperator(
task_id="opr_sp_place_holder",
procedure="TEST_PROCEDURE",
parameters=[3, 1, "?", "?"],
)
Another method entails passing parameters positionally as a dictionary:
opr_sp_dict = TeradataStoredProcedureOperator(
task_id="opr_sp_dict",
procedure="TEST_PROCEDURE",
parameters={"val_in": 3, "val_in_out": 1, "val_out": int, "str_out": str},
)
Assume a stored procedure exists in the database that looks like this:
REPLACE PROCEDURE GetTimestampOutParameter (OUT out_timestamp TIMESTAMP) BEGIN -- Assign current timestamp to the OUT parameter SET out_timestamp = CURRENT_TIMESTAMP; END; /
This stored procedure yields a singular timestamp argument, out_timestamp, and is callable through
TeradataStoredProcedureOperator
with parameters passed positionally as a list:
opr_sp_timestamp = TeradataStoredProcedureOperator(
task_id="opr_sp_timestamp",
procedure="GetTimestampOutParameter",
parameters=["?"],
)
Assume a stored procedure exists in the database that looks like this:
REPLACE PROCEDURE TEST_PROCEDURE (IN val_in INTEGER, OUT val_out INTEGER) BEGIN DECLARE cur1 CURSOR WITH RETURN FOR SELECT * from DBC.DBCINFO ORDER BY 1 ; DECLARE cur2 CURSOR WITH RETURN FOR SELECT infodata, infokey from DBC.DBCINFO order by 1 ; open cur1 ; open cur2 ; set val_out = val_in * 2; END; /
This stored procedure takes a single integer argument, val_in, as input and produces a single integer argument, val_out.
Additionally, it yields two cursors representing the outputs of select queries.
This stored procedure can be invoked using
TeradataStoredProcedureOperator
with parameters passed positionally as a list:
create_sp_param_dr = TeradataOperator(
task_id="create_sp_param_dr",
sql=r"""replace procedure examplestoredproc (in p1 integer, inout p2 integer, out p3 integer)
dynamic result sets 2
begin
declare cur1 cursor with return for select * from dbc.dbcinfo order by 1 ;
declare cur2 cursor with return for select infodata, infokey from dbc.dbcinfo order by 1 ;
open cur1 ;
open cur2 ;
set p2 = p1 + p2 ;
set p3 = p1 * p2 ;
end ;
""",
)
The complete TeradataStoredProcedureOperator DAG¶
When we put everything together, our DAG should look like this:
CONN_ID = "teradata_sp_call"
DAG_ID = "example_teradata_call_sp"
with DAG(
dag_id=DAG_ID,
max_active_runs=1,
max_active_tasks=3,
catchup=False,
default_args={"teradata_conn_id": CONN_ID},
schedule="@once",
start_date=datetime(2023, 1, 1),
) as dag:
create_sp_in_inout = TeradataOperator(
task_id="create_sp_in_inout",
sql=r"""REPLACE PROCEDURE TEST_PROCEDURE (
IN val_in INTEGER,
INOUT val_in_out INTEGER,
OUT val_out INTEGER,
OUT value_str_out varchar(100)
)
BEGIN
set val_out = val_in * 2;
set val_in_out = val_in_out * 4;
set value_str_out = 'string output';
END;
""",
)
opr_sp_types = TeradataStoredProcedureOperator(
task_id="opr_sp_types",
procedure="TEST_PROCEDURE",
parameters=[3, 1, int, str],
)
opr_sp_place_holder = TeradataStoredProcedureOperator(
task_id="opr_sp_place_holder",
procedure="TEST_PROCEDURE",
parameters=[3, 1, "?", "?"],
)
opr_sp_dict = TeradataStoredProcedureOperator(
task_id="opr_sp_dict",
procedure="TEST_PROCEDURE",
parameters={"val_in": 3, "val_in_out": 1, "val_out": int, "str_out": str},
)
create_sp_timestamp = TeradataOperator(
task_id="create_sp_timestamp",
sql=r"""REPLACE PROCEDURE GetTimestampOutParameter (OUT out_timestamp TIMESTAMP)
BEGIN
-- Assign current timestamp to the OUT parameter
SET out_timestamp = CURRENT_TIMESTAMP;
END;
""",
)
opr_sp_timestamp = TeradataStoredProcedureOperator(
task_id="opr_sp_timestamp",
procedure="GetTimestampOutParameter",
parameters=["?"],
)
create_sp_param_dr = TeradataOperator(
task_id="create_sp_param_dr",
sql=r"""replace procedure examplestoredproc (in p1 integer, inout p2 integer, out p3 integer)
dynamic result sets 2
begin
declare cur1 cursor with return for select * from dbc.dbcinfo order by 1 ;
declare cur2 cursor with return for select infodata, infokey from dbc.dbcinfo order by 1 ;
open cur1 ;
open cur2 ;
set p2 = p1 + p2 ;
set p3 = p1 * p2 ;
end ;
""",
)
opr_sp_param_dr = TeradataStoredProcedureOperator(
task_id="opr_sp_param_dr",
procedure="examplestoredproc",
parameters=[3, 2, int],
)
drop_sp = TeradataOperator(
task_id="drop_sp",
sql=r"drop procedure examplestoredproc;",
)
drop_sp_test = TeradataOperator(
task_id="drop_sp_test",
sql=r"drop procedure TEST_PROCEDURE;",
)
drop_sp_timestamp = TeradataOperator(
task_id="drop_sp_timestamp",
sql=r"drop procedure GetTimestampOutParameter;",
)
(
create_sp_in_inout
>> opr_sp_types
>> opr_sp_dict
>> opr_sp_place_holder
>> create_sp_param_dr
>> opr_sp_param_dr
>> drop_sp
>> drop_sp_test
>> create_sp_timestamp
>> opr_sp_timestamp
>> drop_sp_timestamp
)