S3ToTeradataOperator

The purpose of S3ToTeradataOperator is to define tasks involving CSV, JSON and Parquet format data transfer from an AWS Simple Storage Service (S3) to Teradata table. This operator uses Teradata READ_NOS feature to transfer data from an AWS Simple Storage Service (S3) to Teradata table. READ_NOS is a table operator in Teradata Vantage that allows users to list external files at a specified location. For more details, see READ_NOS Functionality

Use the S3ToTeradataOperator to transfer data from S3 to Teradata. This operator leverages the Teradata READ_NOS feature to import data in CSV, JSON, and Parquet formats from S3 into Teradata. This operator accesses data directly from the object store and generates permanent tables within the database using READ_NOS and CREATE TABLE AS functionalities with below SQL statement.

Note

The current version of S3ToTeradataOperator does not support accessing AWS S3 with Security Token Service (STS) temporary credentials. Instead, it exclusively supports accessing with long-term credentials.

Transferring data in CSV format from S3 to Teradata

An example usage of the S3ToTeradataOperator to transfer CSV data format from S3 to teradata table is as follows:

tests/system/teradata/example_s3_to_teradata_transfer.py[source]

    transfer_data_csv = S3ToTeradataOperator(
        task_id="transfer_data_s3_to_teradata_csv",
        s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/CSVDATA/09394500/2018/06/",
        public_bucket=True,
        teradata_table="example_s3_teradata_csv",
        aws_conn_id="aws_default",
        trigger_rule="all_done",
    )

Transferring data in JSON format from S3 to Teradata

An example usage of the S3ToTeradataOperator to transfer JSON data format from S3 to teradata table is as follows:

tests/system/teradata/example_s3_to_teradata_transfer.py[source]

    transfer_data_json = S3ToTeradataOperator(
        task_id="transfer_data_s3_to_teradata_json",
        s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/JSONDATA/09394500/2018/06/",
        public_bucket=True,
        teradata_table="example_s3_teradata_json",
        aws_conn_id="aws_default",
        trigger_rule="all_done",
    )

Transferring data in PARQUET format from S3 to Teradata

An example usage of the S3ToTeradataOperator to transfer PARQUET data format from S3 to teradata table is as follows:

tests/system/teradata/example_s3_to_teradata_transfer.py[source]

    transfer_data_parquet = S3ToTeradataOperator(
        task_id="transfer_data_s3_to_teradata_parquet",
        s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/PARQUETDATA/09394500/2018/06/",
        public_bucket=True,
        teradata_table="example_s3_teradata_parquet",
        aws_conn_id="aws_default",
        trigger_rule="all_done",
    )

The complete S3ToTeradataOperator Operator DAG

When we put everything together, our DAG should look like this:

tests/system/teradata/example_s3_to_teradata_transfer.py[source]



ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_s3_to_teradata_transfer_operator"
CONN_ID = "teradata_default"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime.datetime(2020, 2, 2),
    schedule="@once",
    catchup=False,
    default_args={"teradata_conn_id": CONN_ID},
) as dag:
    # [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_public_s3_to_teradata_csv]
    transfer_data_csv = S3ToTeradataOperator(
        task_id="transfer_data_s3_to_teradata_csv",
        s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/CSVDATA/09394500/2018/06/",
        public_bucket=True,
        teradata_table="example_s3_teradata_csv",
        aws_conn_id="aws_default",
        trigger_rule="all_done",
    )
    # [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_public_s3_to_teradata_csv]
    # [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv]
    read_data_table_csv = TeradataOperator(
        task_id="read_data_table_csv",
        conn_id=CONN_ID,
        sql="SELECT * from example_s3_teradata_csv;",
    )
    # [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv]
    # [START s3_to_teradata_transfer_operator_howto_guide_drop_table_csv]
    drop_table_csv = TeradataOperator(
        task_id="drop_table_csv",
        conn_id=CONN_ID,
        sql="DROP TABLE example_s3_teradata_csv;",
    )
    # [END s3_to_teradata_transfer_operator_howto_guide_drop_table_csv]
    # [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_access_s3_to_teradata_csv]
    transfer_key_data_csv = S3ToTeradataOperator(
        task_id="transfer_key_data_s3_to_teradata_key_csv",
        s3_source_key="/s3/airflowteradatatest.s3.ap-southeast-2.amazonaws.com/",
        teradata_table="example_s3_teradata_csv",
        aws_conn_id="aws_default",
        teradata_conn_id="teradata_default",
        trigger_rule="all_done",
    )
    # [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_access_s3_to_teradata_csv]
    # [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv]
    read_key_data_table_csv = TeradataOperator(
        task_id="read_key_data_table_csv",
        conn_id=CONN_ID,
        sql="SELECT * from example_s3_teradata_csv;",
    )
    # [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv]
    # [START s3_to_teradata_transfer_operator_howto_guide_drop_table_csv]
    drop_key_table_csv = TeradataOperator(
        task_id="drop_key_table_csv",
        conn_id=CONN_ID,
        sql="DROP TABLE example_s3_teradata_csv;",
    )
    # [END s3_to_teradata_transfer_operator_howto_guide_drop_table_csv]
    # [START s3_to_teradata_transfer_operator_howto_guide_create_authorization]
    create_aws_authorization = TeradataOperator(
        task_id="create_aws_authorization",
        conn_id=CONN_ID,
        sql="CREATE AUTHORIZATION aws_authorization USER '{{ var.value.get('AWS_ACCESS_KEY_ID') }}' PASSWORD '{{ var.value.get('AWS_SECRET_ACCESS_KEY') }}' ",
    )
    # [END s3_to_teradata_transfer_operator_howto_guide_create_authorization]
    # [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_authorization_s3_to_teradata_csv]
    transfer_auth_data_csv = S3ToTeradataOperator(
        task_id="transfer_auth_data_s3_to_teradata_auth_csv",
        s3_source_key="/s3/teradata-download.s3.us-east-1.amazonaws.com/DevTools/csv/",
        teradata_table="example_s3_teradata_csv",
        teradata_authorization_name="aws_authorization",
        teradata_conn_id="teradata_default",
        trigger_rule="all_done",
    )
    # [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_authorization_s3_to_teradata_csv]
    # [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv]
    read_auth_data_table_csv = TeradataOperator(
        task_id="read_auth_data_table_csv",
        conn_id=CONN_ID,
        sql="SELECT * from example_s3_teradata_csv;",
    )
    # [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv]
    # [START s3_to_teradata_transfer_operator_howto_guide_drop_table_csv]
    drop_auth_table_csv = TeradataOperator(
        task_id="drop_auth_table_csv",
        conn_id=CONN_ID,
        sql="DROP TABLE example_s3_teradata_csv;",
    )
    # [END s3_to_teradata_transfer_operator_howto_guide_drop_table_csv]
    # [START s3_to_teradata_transfer_operator_howto_guide_drop_authorization]
    drop_auth = TeradataOperator(
        task_id="drop_auth",
        conn_id=CONN_ID,
        sql="DROP AUTHORIZATION aws_authorization;",
    )
    # [END s3_to_teradata_transfer_operator_howto_guide_drop_authorization]
    # [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_json]
    transfer_data_json = S3ToTeradataOperator(
        task_id="transfer_data_s3_to_teradata_json",
        s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/JSONDATA/09394500/2018/06/",
        public_bucket=True,
        teradata_table="example_s3_teradata_json",
        aws_conn_id="aws_default",
        trigger_rule="all_done",
    )
    # [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_json]
    # [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_json]
    read_data_table_json = TeradataOperator(
        task_id="read_data_table_json",
        sql="SELECT * from example_s3_teradata_json;",
    )
    # [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_json]
    # [START s3_to_teradata_transfer_operator_howto_guide_drop_table_json]
    drop_table_json = TeradataOperator(
        task_id="drop_table_json",
        sql="DROP TABLE example_s3_teradata_json;",
    )
    # [END s3_to_teradata_transfer_operator_howto_guide_drop_table_json]
    # [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_parquet]
    transfer_data_parquet = S3ToTeradataOperator(
        task_id="transfer_data_s3_to_teradata_parquet",
        s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/PARQUETDATA/09394500/2018/06/",
        public_bucket=True,
        teradata_table="example_s3_teradata_parquet",
        aws_conn_id="aws_default",
        trigger_rule="all_done",
    )
    # [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_parquet]
    # [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_parquet]
    read_data_table_parquet = TeradataOperator(
        task_id="read_data_table_parquet",
        sql="SELECT * from example_s3_teradata_parquet;",
    )
    # [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_parquet]
    # [START s3_to_teradata_transfer_operator_howto_guide_drop_table]
    drop_table_parquet = TeradataOperator(
        task_id="drop_table_parquet",
        sql="DROP TABLE example_s3_teradata_parquet;",
    )

    # [END s3_to_teradata_transfer_operator_howto_guide_drop_table]
    (
        transfer_data_csv
        >> transfer_data_json
        >> transfer_data_parquet
        >> read_data_table_csv
        >> read_data_table_json
        >> read_data_table_parquet
        >> drop_table_csv
        >> drop_table_json
        >> drop_table_parquet
        >> transfer_key_data_csv
        >> read_key_data_table_csv
        >> drop_key_table_csv
        >> create_aws_authorization
        >> transfer_auth_data_csv
        >> read_auth_data_table_csv
        >> drop_auth_table_csv
        >> drop_auth
    )

Was this entry helpful?