SQL Operators

These operators perform various queries against a SQL database, including column- and table-level data quality checks.

Execute SQL query

Use the SQLExecuteQueryOperator to run SQL query against different databases. Parameters of the operators are:

  • sql - single string, list of strings or string pointing to a template file to be executed;

  • autocommit (optional) if True, each command is automatically committed (default: False);

  • parameters (optional) the parameters to render the SQL query with.

  • handler (optional) the function that will be applied to the cursor. If it’s None results won’t returned (default: fetch_all_handler).

  • split_statements (optional) if split single SQL string into statements and run separately (default: False).

  • return_last (optional) depends split_statements and if it’s True this parameter is used to return the result of only last statement or all split statements (default: True).

The example below shows how to instantiate the SQLExecuteQueryOperator task.

tests/system/providers/common/sql/example_sql_execute_query.py[source]

execute_query = SQLExecuteQueryOperator(
    task_id="execute_query",
    sql=f"SELECT 1; SELECT * FROM {AIRFLOW_DB_METADATA_TABLE} LIMIT 1;",
    split_statements=True,
    return_last=False,
)

Check SQL Table Columns

Use the SQLColumnCheckOperator to run data quality checks against columns of a given table. As well as a connection ID and table, a column_mapping describing the relationship between columns and tests to run must be supplied. An example column mapping is a set of three nested dictionaries and looks like:

column_mapping = {
    "col_name": {
        "null_check": {"equal_to": 0, "partition_clause": "other_col LIKE 'this'"},
        "min": {
            "greater_than": 5,
            "leq_to": 10,
            "tolerance": 0.2,
        },
        "max": {"less_than": 1000, "geq_to": 10, "tolerance": 0.01},
    }
}

Where col_name is the name of the column to run checks on, and each entry in its dictionary is a check. The valid checks are:

  • null_check: checks the number of NULL values in the column

  • distinct_check: checks the COUNT of values in the column that are distinct

  • unique_check: checks the number of distinct values in a column against the number of rows

  • min: checks the minimum value in the column

  • max: checks the maximum value in the column

Each entry in the check’s dictionary is either a condition for success of the check, the tolerance, or a partition clause. The conditions for success are:

  • greater_than

  • geq_to

  • less_than

  • leq_to

  • equal_to

When specifying conditions, equal_to is not compatible with other conditions. Both a lower- and an upper- bound condition may be specified in the same check. The tolerance is a percentage that the result may be out of bounds but still considered successful.

The partition clauses may be given at the operator level as a parameter where it partitions all checks, at the column level in the column mapping where it partitions all checks for that column, or at the check level for a column where it partitions just that check.

A database may also be specified if not using the database from the supplied connection.

The accept_none argument, true by default, will convert None values returned by the query to 0s, allowing empty tables to return valid integers.

The below example demonstrates how to instantiate the SQLColumnCheckOperator task.

tests/system/providers/common/sql/example_sql_column_table_check.py[source]

column_check = SQLColumnCheckOperator(
    task_id="column_check",
    table=AIRFLOW_DB_METADATA_TABLE,
    column_mapping={
        "id": {
            "null_check": {
                "equal_to": 0,
                "tolerance": 0,
            },
            "distinct_check": {
                "equal_to": 1,
            },
        }
    },
)

Check SQL Table Values

Use the SQLTableCheckOperator to run data quality checks against a given table. As well as a connection ID and table, a checks dictionary describing the relationship between the table and tests to run must be supplied. An example checks argument is a set of two nested dictionaries and looks like:

checks = (
    {
        "row_count_check": {
            "check_statement": "COUNT(*) = 1000",
        },
        "column_sum_check": {
            "check_statement": "col_a + col_b < col_c",
            "partition_clause": "col_a IS NOT NULL",
        },
    },
)

The first set of keys are the check names, which are referenced in the templated query the operator builds. A dictionary key under the check name must include check_statement and the value a SQL statement that resolves to a boolean (this can be any string or int that resolves to a boolean in airflow.operators.sql.parse_boolean). The other possible key to supply is partition_clause, which is a check level statement that will partition the data in the table using a WHERE clause for that check. This statement is compatible with the parameter partition_clause, where the latter filters across all checks.

The below example demonstrates how to instantiate the SQLTableCheckOperator task.

tests/system/providers/common/sql/example_sql_column_table_check.py[source]

row_count_check = SQLTableCheckOperator(
    task_id="row_count_check",
    table=AIRFLOW_DB_METADATA_TABLE,
    checks={
        "row_count_check": {
            "check_statement": "COUNT(*) = 1",
        }
    },
)

Was this entry helpful?