Alibaba Cloud AnalyticDB Spark Operators

Overview

Airflow to Alibaba Cloud AnalyticDB Spark integration provides several operators to develop spark batch and sql applications.

Develop Spark batch applications

Purpose

This example dag uses AnalyticDBSparkBatchOperator to submit Spark Pi and Spark Logistic regression applications.

Defining tasks

In the following code we submit Spark Pi and Spark Logistic regression applications.

tests/system/alibaba/example_adb_spark_batch.py[source]

with DAG(
    dag_id=DAG_ID,
    start_date=datetime(2021, 1, 1),
    schedule=None,
    default_args={"cluster_id": "your cluster", "rg_name": "your resource group", "region": "your region"},
    max_active_runs=1,
    catchup=False,
) as dag:
    spark_pi = AnalyticDBSparkBatchOperator(
        task_id="task1",
        file="local:///tmp/spark-examples.jar",
        class_name="org.apache.spark.examples.SparkPi",
    )

    spark_lr = AnalyticDBSparkBatchOperator(
        task_id="task2",
        file="local:///tmp/spark-examples.jar",
        class_name="org.apache.spark.examples.SparkLR",
    )

    spark_pi >> spark_lr

    from tests_common.test_utils.watcher import watcher

    # This test needs watcher in order to properly mark success/failure
    # when "tearDown" task with trigger rule is part of the DAG
    list(dag.tasks) >> watcher()

Was this entry helpful?