AzureBlobStorageToTeradataOperator

The purpose of AzureBlobStorageToTeradataOperator is to define tasks involving CSV, JSON and Parquet format data transfer from an Azure Blob Storage to Teradata table. Use the AzureBlobStorageToTeradataOperator to transfer data from an Azure Blob Storage to Teradata.This operator leverages the Teradata READ_NOS feature to import data in CSV, JSON, and Parquet formats from Azure Blob Storage into Teradata. This operator accesses data directly from the object store and generates permanent tables within the database using READ_NOS and CREATE TABLE AS functionalities with below SQL statement.

CREATE MULTISET TABLE multiset_table_name AS (
  SELECT *
  FROM (
    LOCATION='YOUR-OBJECT-STORE-URI'
    AUTHORIZATION=authorization_object
  ) AS d
) WITH DATA;

It facilitates data loading from both public and private object storage. For private object storage, access to the object store can be granted via either Teradata Authorization database object or Object Store Login and Object Store Key defined with Azure Blob Storage connection in Airflow. Conversely, for data transfer from public object storage, no authorization or access credentials are required.

  • Teradata Authorization database object access type can be used with teradata_authorization_name parameter of AzureBlobStorageToTeradataOperator

  • Object Store Access Key ID and Access Key Secret access type can be used with azure_conn_id parameter of S3ToTeradataOperator

https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Setting-Up-Access/Setting-Access-Privileges

Note

Teradata Authorization database object takes precedence if both access types defined.

Transferring data from public Azure Blob Storage to Teradata

An example usage of the AzureBlobStorageToTeradataOperator to transfer CSV data format from public Azure Blob Storage to teradata table is as follows:

tests/system/teradata/example_azure_blob_to_teradata_transfer.py[source]

    transfer_data_csv = AzureBlobStorageToTeradataOperator(
        task_id="transfer_data_blob_to_teradata_csv",
        blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/CSVDATA/09380000/2018/06/",
        public_bucket=True,
        teradata_table="example_blob_teradata_csv",
        teradata_conn_id="teradata_default",
        azure_conn_id="wasb_default",
        trigger_rule="all_done",
    )

Transferring data from private Azure Blob Storage to Teradata with AWS connection

An example usage of the AzureBlobStorageToTeradataOperator to transfer CSV data format from private S3 object store to teradata with AWS credentials defined as AWS connection:

tests/system/teradata/example_azure_blob_to_teradata_transfer.py[source]

    transfer_key_data_csv = AzureBlobStorageToTeradataOperator(
        task_id="transfer_key_data_blob_to_teradata_csv",
        blob_source_key="/az/airflowteradata.blob.core.windows.net/csvdata/",
        teradata_table="example_blob_teradata_csv",
        azure_conn_id="wasb_default",
        teradata_conn_id="teradata_default",
        trigger_rule="all_done",
    )

Transferring data from private Azure Blob Storage to Teradata with Teradata Authorization Object

Teradata authorization database object is used to control who can access an external object store. Teradata authorization database object should exists in Teradata database to use it in transferring data from S3 to Teradata. Refer Authentication for External Object Stores in Teradata

An example usage of the AzureBlobStorageToTeradataOperator to transfer CSV data format from private S3 object store to teradata with Authorization database object defined in Teradata.

tests/system/teradata/example_azure_blob_to_teradata_transfer.py[source]

    transfer_auth_data_csv = AzureBlobStorageToTeradataOperator(
        task_id="transfer_auth_data_blob_to_teradata_csv",
        blob_source_key="/az/airflowteradata.blob.core.windows.net/csvdata/",
        teradata_table="example_blob_teradata_csv",
        teradata_authorization_name="azure_authorization",
        teradata_conn_id="teradata_default",
        trigger_rule="all_done",
    )

Transferring data in CSV format from Azure Blob Storage to Teradata

An example usage of the AzureBlobStorageToTeradataOperator to transfer CSV data format from Azure Blob Storage to teradata table is as follows:

tests/system/teradata/example_azure_blob_to_teradata_transfer.py[source]

    transfer_data_csv = AzureBlobStorageToTeradataOperator(
        task_id="transfer_data_blob_to_teradata_csv",
        blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/CSVDATA/09380000/2018/06/",
        public_bucket=True,
        teradata_table="example_blob_teradata_csv",
        teradata_conn_id="teradata_default",
        azure_conn_id="wasb_default",
        trigger_rule="all_done",
    )

Transferring data in JSON format from Azure Blob Storage to Teradata

An example usage of the AzureBlobStorageToTeradataOperator to transfer JSON data format from Azure Blob Storage to teradata table is as follows:

tests/system/teradata/example_azure_blob_to_teradata_transfer.py[source]

    transfer_data_json = AzureBlobStorageToTeradataOperator(
        task_id="transfer_data_blob_to_teradata_json",
        blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/JSONDATA/09380000/2018/06/",
        teradata_table="example_blob_teradata_json",
        public_bucket=True,
        teradata_conn_id="teradata_default",
        azure_conn_id="wasb_default",
        trigger_rule="all_done",
    )

Transferring data in PARQUET format from Azure Blob Storage to Teradata

An example usage of the AzureBlobStorageToTeradataOperator to transfer PARQUET data format from Azure Blob Storage to teradata table is as follows:

tests/system/teradata/example_azure_blob_to_teradata_transfer.py[source]

    transfer_data_parquet = AzureBlobStorageToTeradataOperator(
        task_id="transfer_data_blob_to_teradata_parquet",
        blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/PARQUETDATA/09394500/2018/06/",
        teradata_table="example_blob_teradata_parquet",
        public_bucket=True,
        teradata_conn_id="teradata_default",
        trigger_rule="all_done",
    )

The complete AzureBlobStorageToTeradataOperator Operator DAG

When we put everything together, our DAG should look like this:

tests/system/teradata/example_azure_blob_to_teradata_transfer.py[source]


ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_azure_blob_to_teradata_transfer_operator"
CONN_ID = "teradata_default"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime.datetime(2020, 2, 2),
    schedule="@once",
    catchup=False,
    default_args={"teradata_conn_id": CONN_ID},
) as dag:
    transfer_data_csv = AzureBlobStorageToTeradataOperator(
        task_id="transfer_data_blob_to_teradata_csv",
        blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/CSVDATA/09380000/2018/06/",
        public_bucket=True,
        teradata_table="example_blob_teradata_csv",
        teradata_conn_id="teradata_default",
        azure_conn_id="wasb_default",
        trigger_rule="all_done",
    )
    read_data_table_csv = TeradataOperator(
        task_id="read_data_table_csv",
        sql="SELECT count(1) from example_blob_teradata_csv;",
    )
    drop_table_csv = TeradataOperator(
        task_id="drop_table_csv",
        sql="DROP TABLE example_blob_teradata_csv;",
    )
    transfer_key_data_csv = AzureBlobStorageToTeradataOperator(
        task_id="transfer_key_data_blob_to_teradata_csv",
        blob_source_key="/az/airflowteradata.blob.core.windows.net/csvdata/",
        teradata_table="example_blob_teradata_csv",
        azure_conn_id="wasb_default",
        teradata_conn_id="teradata_default",
        trigger_rule="all_done",
    )
    read_key_data_table_csv = TeradataOperator(
        task_id="read_key_data_table_csv",
        conn_id=CONN_ID,
        sql="SELECT count(1) from example_blob_teradata_csv;",
    )
    drop_key_table_csv = TeradataOperator(
        task_id="drop_key_table_csv",
        conn_id=CONN_ID,
        sql="DROP TABLE example_blob_teradata_csv;",
    )
    create_azure_authorization = TeradataOperator(
        task_id="create_azure_authorization",
        conn_id=CONN_ID,
        sql="CREATE AUTHORIZATION azure_authorization USER '{{ var.value.get('AZURE_BLOB_ACCOUNTNAME') }}' PASSWORD '{{ var.value.get('AZURE_BLOB_ACCOUNT_SECRET_KEY') }}' ",
    )
    transfer_auth_data_csv = AzureBlobStorageToTeradataOperator(
        task_id="transfer_auth_data_blob_to_teradata_csv",
        blob_source_key="/az/airflowteradata.blob.core.windows.net/csvdata/",
        teradata_table="example_blob_teradata_csv",
        teradata_authorization_name="azure_authorization",
        teradata_conn_id="teradata_default",
        trigger_rule="all_done",
    )
    read_auth_data_table_csv = TeradataOperator(
        task_id="read_auth_data_table_csv",
        conn_id=CONN_ID,
        sql="SELECT count(1) from example_blob_teradata_csv;",
    )
    drop_auth_table_csv = TeradataOperator(
        task_id="drop_auth_table_csv",
        conn_id=CONN_ID,
        sql="DROP TABLE example_blob_teradata_csv;",
    )
    drop_auth = TeradataOperator(
        task_id="drop_auth",
        conn_id=CONN_ID,
        sql="DROP AUTHORIZATION azure_authorization;",
    )
    transfer_data_json = AzureBlobStorageToTeradataOperator(
        task_id="transfer_data_blob_to_teradata_json",
        blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/JSONDATA/09380000/2018/06/",
        teradata_table="example_blob_teradata_json",
        public_bucket=True,
        teradata_conn_id="teradata_default",
        azure_conn_id="wasb_default",
        trigger_rule="all_done",
    )
    read_data_table_json = TeradataOperator(
        task_id="read_data_table_json",
        sql="SELECT count(1) from example_blob_teradata_json;",
    )
    drop_table_json = TeradataOperator(
        task_id="drop_table_json",
        sql="DROP TABLE example_blob_teradata_json;",
    )
    transfer_data_parquet = AzureBlobStorageToTeradataOperator(
        task_id="transfer_data_blob_to_teradata_parquet",
        blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/PARQUETDATA/09394500/2018/06/",
        teradata_table="example_blob_teradata_parquet",
        public_bucket=True,
        teradata_conn_id="teradata_default",
        trigger_rule="all_done",
    )
    read_data_table_parquet = TeradataOperator(
        task_id="read_data_table_parquet",
        sql="SELECT count(1) from example_blob_teradata_parquet;",
    )
    drop_table_parquet = TeradataOperator(
        task_id="drop_table_parquet",
        sql="DROP TABLE example_blob_teradata_parquet;",
    )

    (
        transfer_data_csv
        >> transfer_data_json
        >> transfer_data_parquet
        >> read_data_table_csv
        >> read_data_table_json
        >> read_data_table_parquet
        >> drop_table_csv
        >> drop_table_json
        >> drop_table_parquet
        >> transfer_key_data_csv
        >> read_key_data_table_csv
        >> drop_key_table_csv
        >> create_azure_authorization
        >> transfer_auth_data_csv
        >> read_auth_data_table_csv
        >> drop_auth_table_csv
        >> drop_auth
    )

Was this entry helpful?