MsSqlOperator

The purpose of MSSQL Operator is to define tasks involving interactions with the MSSQL database.

Use the MsSqlOperator to execute SQL commands in MSSQL database.

Common Database Operations with MsSqlOperator

To use the mssql operator to carry out SQL request, two parameters are required: sql and mssql_conn_id. These two parameters are eventually fed to the MSSQL hook object that interacts directly with the MSSQL database.

Creating a MSSQL database table

The code snippets below are based on Airflow-2.2

An example usage of the MsSqlOperator is as follows:

tests/system/providers/microsoft/mssql/example_mssql.py[source]


    # Example of creating a task to create a table in MsSql

    create_table_mssql_task = MsSqlOperator(
        task_id="create_country_table",
        mssql_conn_id="airflow_mssql",
        sql=r"""
        CREATE TABLE Country (
            country_id INT NOT NULL IDENTITY(1,1) PRIMARY KEY,
            name TEXT,
            continent TEXT
        );
        """,
        dag=dag,
    )

You can also use an external file to execute the SQL commands. Script folder must be at the same level as DAG.py file. This way you can easily maintain the SQL queries separated from the code.

tests/system/providers/microsoft/mssql/example_mssql.py[source]

    # Example of creating a task that calls an sql command from an external file.
    create_table_mssql_from_external_file = MsSqlOperator(
        task_id="create_table_from_external_file",
        mssql_conn_id="airflow_mssql",
        sql="create_table.sql",
        dag=dag,
    )

Your dags/create_table.sql should look like this:

Inserting data into a MSSQL database table

We can then create a MsSqlOperator task that populate the Users table.

tests/system/providers/microsoft/mssql/example_mssql.py[source]

    populate_user_table = MsSqlOperator(
        task_id="populate_user_table",
        mssql_conn_id="airflow_mssql",
        sql=r"""
                INSERT INTO Users (username, description)
                VALUES ( 'Danny', 'Musician');
                INSERT INTO Users (username, description)
                VALUES ( 'Simone', 'Chef');
                INSERT INTO Users (username, description)
                VALUES ( 'Lily', 'Florist');
                INSERT INTO Users (username, description)
                VALUES ( 'Tim', 'Pet shop owner');
                """,
    )

Fetching records from your MSSQL database table

Fetching records from your MSSQL database table can be as simple as:

tests/system/providers/microsoft/mssql/example_mssql.py[source]

    get_all_countries = MsSqlOperator(
        task_id="get_all_countries",
        mssql_conn_id="airflow_mssql",
        sql=r"""SELECT * FROM Country;""",
    )

Passing Parameters into MsSqlOperator

MsSqlOperator provides parameters attribute which makes it possible to dynamically inject values into your SQL requests during runtime.

To find the countries in Asian continent:

tests/system/providers/microsoft/mssql/example_mssql.py[source]

    get_countries_from_continent = MsSqlOperator(
        task_id="get_countries_from_continent",
        mssql_conn_id="airflow_mssql",
        sql=r"""SELECT * FROM Country where {{ params.column }}='{{ params.value }}';""",
        params={"column": "CONVERT(VARCHAR, continent)", "value": "Asia"},
    )

The complete MSSQL Operator DAG

When we put everything together, our DAG should look like this:

tests/system/providers/microsoft/mssql/example_mssql.py[source]

import os
from datetime import datetime

import pytest

from airflow import DAG

try:
    from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
    from airflow.providers.microsoft.mssql.operators.mssql import MsSqlOperator
except ImportError:
    pytest.skip("MSSQL provider not available", allow_module_level=True)

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_mssql"


with DAG(
    DAG_ID,
    schedule="@daily",
    start_date=datetime(2021, 10, 1),
    tags=["example"],
    catchup=False,
) as dag:

    # Example of creating a task to create a table in MsSql

    create_table_mssql_task = MsSqlOperator(
        task_id="create_country_table",
        mssql_conn_id="airflow_mssql",
        sql=r"""
        CREATE TABLE Country (
            country_id INT NOT NULL IDENTITY(1,1) PRIMARY KEY,
            name TEXT,
            continent TEXT
        );
        """,
        dag=dag,
    )

    @dag.task(task_id="insert_mssql_task")
    def insert_mssql_hook():
        mssql_hook = MsSqlHook(mssql_conn_id="airflow_mssql", schema="airflow")

        rows = [
            ("India", "Asia"),
            ("Germany", "Europe"),
            ("Argentina", "South America"),
            ("Ghana", "Africa"),
            ("Japan", "Asia"),
            ("Namibia", "Africa"),
        ]
        target_fields = ["name", "continent"]
        mssql_hook.insert_rows(table="Country", rows=rows, target_fields=target_fields)
    # Example of creating a task that calls an sql command from an external file.
    create_table_mssql_from_external_file = MsSqlOperator(
        task_id="create_table_from_external_file",
        mssql_conn_id="airflow_mssql",
        sql="create_table.sql",
        dag=dag,
    )
    populate_user_table = MsSqlOperator(
        task_id="populate_user_table",
        mssql_conn_id="airflow_mssql",
        sql=r"""
                INSERT INTO Users (username, description)
                VALUES ( 'Danny', 'Musician');
                INSERT INTO Users (username, description)
                VALUES ( 'Simone', 'Chef');
                INSERT INTO Users (username, description)
                VALUES ( 'Lily', 'Florist');
                INSERT INTO Users (username, description)
                VALUES ( 'Tim', 'Pet shop owner');
                """,
    )
    get_all_countries = MsSqlOperator(
        task_id="get_all_countries",
        mssql_conn_id="airflow_mssql",
        sql=r"""SELECT * FROM Country;""",
    )
    get_all_description = MsSqlOperator(
        task_id="get_all_description",
        mssql_conn_id="airflow_mssql",
        sql=r"""SELECT description FROM Users;""",
    )
    get_countries_from_continent = MsSqlOperator(
        task_id="get_countries_from_continent",
        mssql_conn_id="airflow_mssql",
        sql=r"""SELECT * FROM Country where {{ params.column }}='{{ params.value }}';""",
        params={"column": "CONVERT(VARCHAR, continent)", "value": "Asia"},
    )
    (
        create_table_mssql_task
        >> insert_mssql_hook()
        >> create_table_mssql_from_external_file
        >> populate_user_table
        >> get_all_countries
        >> get_all_description
        >> get_countries_from_continent
    )

Was this entry helpful?