PySpark Decorator

Python callable wrapped within the @task.pyspark decorator is injected with a SparkSession and SparkContext object if available.

Parameters

The following parameters can be passed to the decorator:

conn_id: str

The connection ID to use for connecting to the Spark cluster. If not specified, the spark master is set to local[*].

config_kwargs: dict

The kwargs used for initializing the SparkConf object. This overrides the spark configuration options set in the connection.

Example

The following example shows how to use the @task.pyspark decorator. Note that the spark and sc objects are injected into the function.

tests/system/apache/spark/example_pyspark.py[source]

@task.pyspark(conn_id="spark-local")
def spark_task(spark: SparkSession, sc: SparkContext) -> pd.DataFrame:
    df = spark.createDataFrame(
        [
            (1, "John Doe", 21),
            (2, "Jane Doe", 22),
            (3, "Joe Bloggs", 23),
        ],
        ["id", "name", "age"],
    )
    df.show()

    return df.toPandas()

Spark Connect

In Apache Spark 3.4, Spark Connect introduced a decoupled client-server architecture that allows remote connectivity to Spark clusters using the DataFrame API. Using Spark Connect is the preferred way in Airflow to make use of the PySpark decorator, because it does not require to run the Spark driver on the same host as Airflow. To make use of Spark Connect, you prepend your host url with sc://. For example, sc://spark-cluster:15002.

Authentication

Spark Connect does not have built-in authentication. The gRPC HTTP/2 interface however allows the use of authentication to communicate with the Spark Connect server through authenticating proxies. To make use of authentication make sure to create a Spark Connect connection and set the right credentials.

Was this entry helpful?