Source code for tests.system.teradata.example_teradata_call_sp

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Example Airflow DAG to show Stored Procedure creation and execution on teradata database using
TeradataStoredProcedureOperator.

This DAG assumes Airflow Connection with connection id `teradata_sp_call` already exists in locally. It
shows how to create and execute Stored Procedure as tasks in airflow dags using
TeradataStoredProcedureOperator."""

from __future__ import annotations

from datetime import datetime

import pytest

from airflow import DAG

try:
    from airflow.providers.teradata.operators.teradata import (
        TeradataOperator,
        TeradataStoredProcedureOperator,
    )
except ImportError:
    pytest.skip("Teradata provider apache-airflow-provider-teradata not available", allow_module_level=True)

# [START howto_teradata_operator_for_sp]
[docs]CONN_ID = "teradata_sp_call"
[docs]DAG_ID = "example_teradata_call_sp"
with DAG( dag_id=DAG_ID, max_active_runs=1, max_active_tasks=3, catchup=False, default_args={"teradata_conn_id": CONN_ID}, schedule="@once", start_date=datetime(2023, 1, 1), ) as dag: # [START howto_teradata_stored_procedure_operator_with_in_inout_out] # [START howto_create_teradata_stored_procedure_operator_with_in_inout]
[docs] create_sp_in_inout = TeradataOperator( task_id="create_sp_in_inout", sql=r"""REPLACE PROCEDURE TEST_PROCEDURE ( IN val_in INTEGER, INOUT val_in_out INTEGER, OUT val_out INTEGER, OUT value_str_out varchar(100) ) BEGIN set val_out = val_in * 2; set val_in_out = val_in_out * 4; set value_str_out = 'string output'; END; """, )
# [END howto_create_teradata_stored_procedure_operator_with_in_inout] # [START howto_call_teradata_stored_procedure_operator_with_types] opr_sp_types = TeradataStoredProcedureOperator( task_id="opr_sp_types", procedure="TEST_PROCEDURE", parameters=[3, 1, int, str], ) # [END howto_call_teradata_stored_procedure_operator_with_types] # [START howto_call_teradata_stored_procedure_operator_with_place_holder] opr_sp_place_holder = TeradataStoredProcedureOperator( task_id="opr_sp_place_holder", procedure="TEST_PROCEDURE", parameters=[3, 1, "?", "?"], ) # [END howto_call_teradata_stored_procedure_operator_with_place_holder] # [START howto_call_teradata_stored_procedure_operator_with_dict_input] opr_sp_dict = TeradataStoredProcedureOperator( task_id="opr_sp_dict", procedure="TEST_PROCEDURE", parameters={"val_in": 3, "val_in_out": 1, "val_out": int, "str_out": str}, ) # [END howto_call_teradata_stored_procedure_operator_with_dict_input] # [END howto_teradata_stored_procedure_operator_with_in_inout_out] # [START howto_create_teradata_stored_procedure_operator_timestamp] create_sp_timestamp = TeradataOperator( task_id="create_sp_timestamp", sql=r"""REPLACE PROCEDURE GetTimestampOutParameter (OUT out_timestamp TIMESTAMP) BEGIN -- Assign current timestamp to the OUT parameter SET out_timestamp = CURRENT_TIMESTAMP; END; """, ) # [END howto_create_teradata_stored_procedure_operator_timestamp] # [START howto_call_teradata_stored_procedure_operator_timestamp] opr_sp_timestamp = TeradataStoredProcedureOperator( task_id="opr_sp_timestamp", procedure="GetTimestampOutParameter", parameters=["?"], ) # [END howto_call_teradata_stored_procedure_operator_timestamp] # [START howto_teradata_stored_procedure_operator_with_in_out_dynamic_result] create_sp_param_dr = TeradataOperator( task_id="create_sp_param_dr", sql=r"""replace procedure examplestoredproc (in p1 integer, inout p2 integer, out p3 integer) dynamic result sets 2 begin declare cur1 cursor with return for select * from dbc.dbcinfo order by 1 ; declare cur2 cursor with return for select infodata, infokey from dbc.dbcinfo order by 1 ; open cur1 ; open cur2 ; set p2 = p1 + p2 ; set p3 = p1 * p2 ; end ; """, ) # [END howto_teradata_stored_procedure_operator_with_in_out_dynamic_result] # [START howto_call_teradata_stored_procedure_operator_with_in_out_dynamic_result] opr_sp_param_dr = TeradataStoredProcedureOperator( task_id="opr_sp_param_dr", procedure="examplestoredproc", parameters=[3, 2, int], ) # [END howto_call_teradata_stored_procedure_operator_with_in_out_dynamic_result] # [START howto_teradata_stored_procedure_operator_drop] drop_sp = TeradataOperator( task_id="drop_sp", sql=r"drop procedure examplestoredproc;", ) drop_sp_test = TeradataOperator( task_id="drop_sp_test", sql=r"drop procedure TEST_PROCEDURE;", ) drop_sp_timestamp = TeradataOperator( task_id="drop_sp_timestamp", sql=r"drop procedure GetTimestampOutParameter;", ) # [END howto_teradata_stored_procedure_operator_drop] ( create_sp_in_inout >> opr_sp_types >> opr_sp_dict >> opr_sp_place_holder >> create_sp_param_dr >> opr_sp_param_dr >> drop_sp >> drop_sp_test >> create_sp_timestamp >> opr_sp_timestamp >> drop_sp_timestamp ) # [END howto_teradata_operator_for_sp] from tests_common.test_utils.watcher import watcher # This test needs watcher in order to properly mark success/failure # when "tearDown" task with trigger rule is part of the DAG list(dag.tasks) >> watcher() from tests_common.test_utils.system_tests import get_test_run # noqa: E402 # Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
[docs]test_run = get_test_run(dag)

Was this entry helpful?