Apache Beam Operators¶
Apache Beam is an open source, unified model for defining both batch and streaming data-parallel processing pipelines. Using one of the open source Beam SDKs, you build a program that defines the pipeline. The pipeline is then executed by one of Beam’s supported distributed processing back-ends, which include Apache Flink, Apache Spark, and Google Cloud Dataflow.
Note
This operator requires gcloud
command (Google Cloud SDK) to be installed on the Airflow worker
<https://cloud.google.com/sdk/docs/install> when the Apache Beam pipeline runs on the
Dataflow service.
Run Python Pipelines in Apache Beam¶
The py_file
argument must be specified for
BeamRunPythonPipelineOperator
as it contains the pipeline to be executed by Beam. The Python file can be available on GCS that Airflow
has the ability to download or available on the local filesystem (provide the absolute path to it).
The py_interpreter
argument specifies the Python version to be used when executing the pipeline, the default
is python3
. If your Airflow instance is running on Python 2 - specify python2
and ensure your py_file
is
in Python 2. For best results, use Python 3.
If py_requirements
argument is specified a temporary Python virtual environment with specified requirements will be created
and within it pipeline will run.
The py_system_site_packages
argument specifies whether or not all the Python packages from your Airflow instance,
will be accessible within virtual environment (if py_requirements
argument is specified),
recommend avoiding unless the Dataflow job requires it.
Python Pipelines with DirectRunner¶
start_python_pipeline_local_direct_runner = BeamRunPythonPipelineOperator(
task_id="start_python_pipeline_local_direct_runner",
py_file="apache_beam.examples.wordcount",
py_options=["-m"],
py_requirements=["apache-beam[gcp]==2.59.0"],
py_interpreter="python3",
py_system_site_packages=False,
)
start_python_pipeline_direct_runner = BeamRunPythonPipelineOperator(
task_id="start_python_pipeline_direct_runner",
py_file=GCS_PYTHON,
py_options=[],
pipeline_options={"output": GCS_OUTPUT},
py_requirements=["apache-beam[gcp]==2.59.0"],
py_interpreter="python3",
py_system_site_packages=False,
)
You can use deferrable mode for this action in order to run the operator asynchronously. It will give you a possibility to free up the worker when it knows it has to wait, and hand off the job of resuming Operator to a Trigger. As a result, while it is suspended (deferred), it is not taking up a worker slot and your cluster will have a lot less resources wasted on idle Operators or Sensors:
start_python_pipeline_local_direct_runner = BeamRunPythonPipelineOperator(
task_id="start_python_pipeline_local_direct_runner",
py_file="apache_beam.examples.wordcount",
py_options=["-m"],
py_requirements=["apache-beam[gcp]==2.59.0"],
py_interpreter="python3",
py_system_site_packages=False,
deferrable=True,
)
start_python_pipeline_direct_runner = BeamRunPythonPipelineOperator(
task_id="start_python_pipeline_direct_runner",
py_file=GCS_PYTHON,
py_options=[],
pipeline_options={"output": GCS_OUTPUT},
py_requirements=["apache-beam[gcp]==2.59.0"],
py_interpreter="python3",
py_system_site_packages=False,
deferrable=True,
)
Python Pipelines with DataflowRunner¶
start_python_pipeline_dataflow_runner = BeamRunPythonPipelineOperator(
task_id="start_python_pipeline_dataflow_runner",
runner="DataflowRunner",
py_file=GCS_PYTHON,
pipeline_options={
"tempLocation": GCS_TMP,
"stagingLocation": GCS_STAGING,
"output": GCS_OUTPUT,
},
py_options=[],
py_requirements=["apache-beam[gcp]==2.59.0"],
py_interpreter="python3",
py_system_site_packages=False,
dataflow_config=DataflowConfiguration(
job_name="{{task.task_id}}", project_id=GCP_PROJECT_ID, location="us-central1"
),
)
start_python_job_dataflow_runner_async = BeamRunPythonPipelineOperator(
task_id="start_python_job_dataflow_runner_async",
runner="DataflowRunner",
py_file=GCS_PYTHON_DATAFLOW_ASYNC,
pipeline_options={
"tempLocation": GCS_TMP,
"stagingLocation": GCS_STAGING,
"output": GCS_OUTPUT,
},
py_options=[],
py_requirements=["apache-beam[gcp]==2.59.0"],
py_interpreter="python3",
py_system_site_packages=False,
dataflow_config=DataflowConfiguration(
job_name="{{task.task_id}}",
project_id=GCP_PROJECT_ID,
location="us-central1",
wait_until_finished=False,
),
)
wait_for_python_job_dataflow_runner_async_done = DataflowJobStatusSensor(
task_id="wait-for-python-job-async-done",
job_id="{{task_instance.xcom_pull('start_python_job_dataflow_runner_async')['dataflow_job_id']}}",
expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
project_id=GCP_PROJECT_ID,
location="us-central1",
)
start_python_job_dataflow_runner_async >> wait_for_python_job_dataflow_runner_async_done
You can use deferrable mode for this action in order to run the operator asynchronously. It will give you a possibility to free up the worker when it knows it has to wait, and hand off the job of resuming Operator to a Trigger. As a result, while it is suspended (deferred), it is not taking up a worker slot and your cluster will have a lot less resources wasted on idle Operators or Sensors:
start_python_pipeline_dataflow_runner = BeamRunPythonPipelineOperator(
task_id="start_python_pipeline_dataflow_runner",
runner="DataflowRunner",
py_file=GCS_PYTHON,
pipeline_options={
"tempLocation": GCS_TMP,
"stagingLocation": GCS_STAGING,
"output": GCS_OUTPUT,
},
py_options=[],
py_requirements=["apache-beam[gcp]==2.59.0"],
py_interpreter="python3",
py_system_site_packages=False,
dataflow_config=DataflowConfiguration(
job_name="{{task.task_id}}", project_id=GCP_PROJECT_ID, location="us-central1"
),
deferrable=True,
)
Run Java Pipelines in Apache Beam¶
For Java pipeline the jar
argument must be specified for
BeamRunJavaPipelineOperator
as it contains the pipeline to be executed by Apache Beam. The JAR can be available on GCS that Airflow
has the ability to download or available on the local filesystem (provide the absolute path to it).
Java Pipelines with DirectRunner¶
jar_to_local_direct_runner = GCSToLocalFilesystemOperator(
task_id="jar_to_local_direct_runner",
bucket=GCS_JAR_DIRECT_RUNNER_BUCKET_NAME,
object_name=GCS_JAR_DIRECT_RUNNER_OBJECT_NAME,
filename="/tmp/beam_wordcount_direct_runner_{{ ds_nodash }}.jar",
)
start_java_pipeline_direct_runner = BeamRunJavaPipelineOperator(
task_id="start_java_pipeline_direct_runner",
jar="/tmp/beam_wordcount_direct_runner_{{ ds_nodash }}.jar",
pipeline_options={
"output": "/tmp/start_java_pipeline_direct_runner",
"inputFile": GCS_INPUT,
},
job_class="org.apache.beam.examples.WordCount",
)
jar_to_local_direct_runner >> start_java_pipeline_direct_runner
Java Pipelines with DataflowRunner¶
jar_to_local_dataflow_runner = GCSToLocalFilesystemOperator(
task_id="jar_to_local_dataflow_runner",
bucket=GCS_JAR_DATAFLOW_RUNNER_BUCKET_NAME,
object_name=GCS_JAR_DATAFLOW_RUNNER_OBJECT_NAME,
filename="/tmp/beam_wordcount_dataflow_runner_{{ ds_nodash }}.jar",
)
start_java_pipeline_dataflow = BeamRunJavaPipelineOperator(
task_id="start_java_pipeline_dataflow",
runner="DataflowRunner",
jar="/tmp/beam_wordcount_dataflow_runner_{{ ds_nodash }}.jar",
pipeline_options={
"tempLocation": GCS_TMP,
"stagingLocation": GCS_STAGING,
"output": GCS_OUTPUT,
},
job_class="org.apache.beam.examples.WordCount",
dataflow_config={"job_name": "{{task.task_id}}", "location": "us-central1"},
)
jar_to_local_dataflow_runner >> start_java_pipeline_dataflow
Run Go Pipelines in Apache Beam¶
The go_file
argument must be specified for
BeamRunGoPipelineOperator
as it contains the pipeline to be executed by Beam. The Go file can be available on GCS that Airflow
has the ability to download or available on the local filesystem (provide the absolute path to it). When running
from the local filesystem the equivalent will be go run <go_file>
. If pulling from GCS bucket, beforehand it will
init the module and install dependencies with go run init example.com/main
and go mod tidy
.
Go Pipelines with DirectRunner¶
start_go_pipeline_local_direct_runner = BeamRunGoPipelineOperator(
task_id="start_go_pipeline_local_direct_runner",
go_file="files/apache_beam/examples/wordcount.go",
)
start_go_pipeline_direct_runner = BeamRunGoPipelineOperator(
task_id="start_go_pipeline_direct_runner",
go_file=GCS_GO,
pipeline_options={"output": GCS_OUTPUT},
)
Go Pipelines with DataflowRunner¶
start_go_pipeline_dataflow_runner = BeamRunGoPipelineOperator(
task_id="start_go_pipeline_dataflow_runner",
runner="DataflowRunner",
go_file=GCS_GO,
pipeline_options={
"tempLocation": GCS_TMP,
"stagingLocation": GCS_STAGING,
"output": GCS_OUTPUT,
"WorkerHarnessContainerImage": "apache/beam_go_sdk:latest",
},
dataflow_config=DataflowConfiguration(
job_name="{{task.task_id}}", project_id=GCP_PROJECT_ID, location="us-central1"
),
)
start_go_job_dataflow_runner_async = BeamRunGoPipelineOperator(
task_id="start_go_job_dataflow_runner_async",
runner="DataflowRunner",
go_file=GCS_GO_DATAFLOW_ASYNC,
pipeline_options={
"tempLocation": GCS_TMP,
"stagingLocation": GCS_STAGING,
"output": GCS_OUTPUT,
"WorkerHarnessContainerImage": "apache/beam_go_sdk:latest",
},
dataflow_config=DataflowConfiguration(
job_name="{{task.task_id}}",
project_id=GCP_PROJECT_ID,
location="us-central1",
wait_until_finished=False,
),
)
wait_for_go_job_dataflow_runner_async_done = DataflowJobStatusSensor(
task_id="wait-for-go-job-async-done",
job_id="{{task_instance.xcom_pull('start_go_job_dataflow_runner_async')['dataflow_job_id']}}",
expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
project_id=GCP_PROJECT_ID,
location="us-central1",
)
start_go_job_dataflow_runner_async >> wait_for_go_job_dataflow_runner_async_done
Reference¶
For further information, look at: