AzureBlobStorageToTeradataOperator¶
The purpose of AzureBlobStorageToTeradataOperator
is to define tasks involving CSV, JSON and Parquet
format data transfer from an Azure Blob Storage to Teradata table.
Use the AzureBlobStorageToTeradataOperator
to transfer data from an Azure Blob Storage to Teradata.This operator leverages the Teradata
READ_NOS feature
to import data in CSV, JSON, and Parquet formats from Azure Blob Storage into Teradata.
This operator accesses data directly from the object store and generates permanent tables
within the database using READ_NOS and CREATE TABLE AS functionalities with below SQL statement.
CREATE MULTISET TABLE multiset_table_name AS (
SELECT *
FROM (
LOCATION='YOUR-OBJECT-STORE-URI'
AUTHORIZATION=authorization_object
) AS d
) WITH DATA;
It facilitates data loading from both public and private object storage. For private object storage, access to the object store can be granted via either Teradata Authorization database object or Object Store Login and Object Store Key defined with Azure Blob Storage connection in Airflow. Conversely, for data transfer from public object storage, no authorization or access credentials are required.
Teradata Authorization database object access type can be used with
teradata_authorization_name
parameter ofAzureBlobStorageToTeradataOperator
Object Store Access Key ID and Access Key Secret access type can be used with
azure_conn_id
parameter ofS3ToTeradataOperator
Note
Teradata Authorization database object takes precedence if both access types defined.
Transferring data from public Azure Blob Storage to Teradata¶
An example usage of the AzureBlobStorageToTeradataOperator to transfer CSV data format from public Azure Blob Storage to teradata table is as follows:
transfer_data_csv = AzureBlobStorageToTeradataOperator(
task_id="transfer_data_blob_to_teradata_csv",
blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/CSVDATA/09380000/2018/06/",
public_bucket=True,
teradata_table="example_blob_teradata_csv",
teradata_conn_id="teradata_default",
azure_conn_id="wasb_default",
trigger_rule="all_done",
)
Transferring data from private Azure Blob Storage to Teradata with AWS connection¶
An example usage of the AzureBlobStorageToTeradataOperator to transfer CSV data format from private S3 object store to teradata with AWS credentials defined as AWS connection:
transfer_key_data_csv = AzureBlobStorageToTeradataOperator(
task_id="transfer_key_data_blob_to_teradata_csv",
blob_source_key="/az/airflowteradata.blob.core.windows.net/csvdata/",
teradata_table="example_blob_teradata_csv",
azure_conn_id="wasb_default",
teradata_conn_id="teradata_default",
trigger_rule="all_done",
)
Transferring data from private Azure Blob Storage to Teradata with Teradata Authorization Object¶
Teradata authorization database object is used to control who can access an external object store. Teradata authorization database object should exists in Teradata database to use it in transferring data from S3 to Teradata. Refer Authentication for External Object Stores in Teradata
An example usage of the AzureBlobStorageToTeradataOperator to transfer CSV data format from private S3 object store to teradata with Authorization database object defined in Teradata.
transfer_auth_data_csv = AzureBlobStorageToTeradataOperator(
task_id="transfer_auth_data_blob_to_teradata_csv",
blob_source_key="/az/airflowteradata.blob.core.windows.net/csvdata/",
teradata_table="example_blob_teradata_csv",
teradata_authorization_name="azure_authorization",
teradata_conn_id="teradata_default",
trigger_rule="all_done",
)
Transferring data in CSV format from Azure Blob Storage to Teradata¶
An example usage of the AzureBlobStorageToTeradataOperator to transfer CSV data format from Azure Blob Storage to teradata table is as follows:
transfer_data_csv = AzureBlobStorageToTeradataOperator(
task_id="transfer_data_blob_to_teradata_csv",
blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/CSVDATA/09380000/2018/06/",
public_bucket=True,
teradata_table="example_blob_teradata_csv",
teradata_conn_id="teradata_default",
azure_conn_id="wasb_default",
trigger_rule="all_done",
)
Transferring data in JSON format from Azure Blob Storage to Teradata¶
An example usage of the AzureBlobStorageToTeradataOperator to transfer JSON data format from Azure Blob Storage to teradata table is as follows:
transfer_data_json = AzureBlobStorageToTeradataOperator(
task_id="transfer_data_blob_to_teradata_json",
blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/JSONDATA/09380000/2018/06/",
teradata_table="example_blob_teradata_json",
public_bucket=True,
teradata_conn_id="teradata_default",
azure_conn_id="wasb_default",
trigger_rule="all_done",
)
Transferring data in PARQUET format from Azure Blob Storage to Teradata¶
An example usage of the AzureBlobStorageToTeradataOperator to transfer PARQUET data format from Azure Blob Storage to teradata table is as follows:
transfer_data_parquet = AzureBlobStorageToTeradataOperator(
task_id="transfer_data_blob_to_teradata_parquet",
blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/PARQUETDATA/09394500/2018/06/",
teradata_table="example_blob_teradata_parquet",
public_bucket=True,
teradata_conn_id="teradata_default",
trigger_rule="all_done",
)
The complete AzureBlobStorageToTeradataOperator
Operator DAG¶
When we put everything together, our DAG should look like this:
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_azure_blob_to_teradata_transfer_operator"
CONN_ID = "teradata_default"
with DAG(
dag_id=DAG_ID,
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
catchup=False,
default_args={"teradata_conn_id": CONN_ID},
) as dag:
transfer_data_csv = AzureBlobStorageToTeradataOperator(
task_id="transfer_data_blob_to_teradata_csv",
blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/CSVDATA/09380000/2018/06/",
public_bucket=True,
teradata_table="example_blob_teradata_csv",
teradata_conn_id="teradata_default",
azure_conn_id="wasb_default",
trigger_rule="all_done",
)
read_data_table_csv = TeradataOperator(
task_id="read_data_table_csv",
sql="SELECT count(1) from example_blob_teradata_csv;",
)
drop_table_csv = TeradataOperator(
task_id="drop_table_csv",
sql="DROP TABLE example_blob_teradata_csv;",
)
transfer_key_data_csv = AzureBlobStorageToTeradataOperator(
task_id="transfer_key_data_blob_to_teradata_csv",
blob_source_key="/az/airflowteradata.blob.core.windows.net/csvdata/",
teradata_table="example_blob_teradata_csv",
azure_conn_id="wasb_default",
teradata_conn_id="teradata_default",
trigger_rule="all_done",
)
read_key_data_table_csv = TeradataOperator(
task_id="read_key_data_table_csv",
conn_id=CONN_ID,
sql="SELECT count(1) from example_blob_teradata_csv;",
)
drop_key_table_csv = TeradataOperator(
task_id="drop_key_table_csv",
conn_id=CONN_ID,
sql="DROP TABLE example_blob_teradata_csv;",
)
create_azure_authorization = TeradataOperator(
task_id="create_azure_authorization",
conn_id=CONN_ID,
sql="CREATE AUTHORIZATION azure_authorization USER '{{ var.value.get('AZURE_BLOB_ACCOUNTNAME') }}' PASSWORD '{{ var.value.get('AZURE_BLOB_ACCOUNT_SECRET_KEY') }}' ",
)
transfer_auth_data_csv = AzureBlobStorageToTeradataOperator(
task_id="transfer_auth_data_blob_to_teradata_csv",
blob_source_key="/az/airflowteradata.blob.core.windows.net/csvdata/",
teradata_table="example_blob_teradata_csv",
teradata_authorization_name="azure_authorization",
teradata_conn_id="teradata_default",
trigger_rule="all_done",
)
read_auth_data_table_csv = TeradataOperator(
task_id="read_auth_data_table_csv",
conn_id=CONN_ID,
sql="SELECT count(1) from example_blob_teradata_csv;",
)
drop_auth_table_csv = TeradataOperator(
task_id="drop_auth_table_csv",
conn_id=CONN_ID,
sql="DROP TABLE example_blob_teradata_csv;",
)
drop_auth = TeradataOperator(
task_id="drop_auth",
conn_id=CONN_ID,
sql="DROP AUTHORIZATION azure_authorization;",
)
transfer_data_json = AzureBlobStorageToTeradataOperator(
task_id="transfer_data_blob_to_teradata_json",
blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/JSONDATA/09380000/2018/06/",
teradata_table="example_blob_teradata_json",
public_bucket=True,
teradata_conn_id="teradata_default",
azure_conn_id="wasb_default",
trigger_rule="all_done",
)
read_data_table_json = TeradataOperator(
task_id="read_data_table_json",
sql="SELECT count(1) from example_blob_teradata_json;",
)
drop_table_json = TeradataOperator(
task_id="drop_table_json",
sql="DROP TABLE example_blob_teradata_json;",
)
transfer_data_parquet = AzureBlobStorageToTeradataOperator(
task_id="transfer_data_blob_to_teradata_parquet",
blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/PARQUETDATA/09394500/2018/06/",
teradata_table="example_blob_teradata_parquet",
public_bucket=True,
teradata_conn_id="teradata_default",
trigger_rule="all_done",
)
read_data_table_parquet = TeradataOperator(
task_id="read_data_table_parquet",
sql="SELECT count(1) from example_blob_teradata_parquet;",
)
drop_table_parquet = TeradataOperator(
task_id="drop_table_parquet",
sql="DROP TABLE example_blob_teradata_parquet;",
)
(
transfer_data_csv
>> transfer_data_json
>> transfer_data_parquet
>> read_data_table_csv
>> read_data_table_json
>> read_data_table_parquet
>> drop_table_csv
>> drop_table_json
>> drop_table_parquet
>> transfer_key_data_csv
>> read_key_data_table_csv
>> drop_key_table_csv
>> create_azure_authorization
>> transfer_auth_data_csv
>> read_auth_data_table_csv
>> drop_auth_table_csv
>> drop_auth
)