Google Cloud Data Pipelines Operators

Data Pipelines is a Dataflow feature that allows customers to create and schedule recurring jobs, view aggregated job metrics, and define and manage job SLOs. A pipeline consists of a collection of jobs including ways to manage them. A pipeline may be associated with a Dataflow Template (classic/flex) and include all jobs launched with the associated template.

Prerequisite Tasks

To use these operators, you must do a few things:

Creating a Data Pipeline

To create a new Data Pipelines instance using a request body and parent name, use CreateDataPipelineOperator. The operator accesses Google Cloud’s Data Pipelines API and calls upon the create method to run the given pipeline.

CreateDataPipelineOperator accepts four parameters:

body: instance of the Pipeline, project_id: id of the GCP project that owns the job, location: destination for the Pipeline, gcp_conn_id: id to connect to Google Cloud.

The request body and project id need to be passed each time, while the GCP connection id and location have default values. The project id and location will be used to build the parent name needed to create the operator.

Here is an example of how you can create a Data Pipelines instance by running the above parameters with CreateDataPipelineOperator:

tests/system/providers/google/cloud/datapipelines/example_datapipeline.py[source]

create_data_pipeline = CreateDataPipelineOperator(
    task_id="create_data_pipeline",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    body={
        "name": f"projects/{GCP_PROJECT_ID}/locations/{GCP_LOCATION}/pipelines/{PIPELINE_NAME}",
        "type": PIPELINE_TYPE,
        "workload": {
            "dataflowFlexTemplateRequest": {
                "launchParameter": {
                    "containerSpecGcsPath": GCS_PATH,
                    "jobName": DATAPIPELINES_JOB_NAME,
                    "environment": {"tempLocation": TEMP_LOCATION},
                    "parameters": {
                        "inputFile": INPUT_FILE,
                        "output": OUTPUT,
                    },
                },
                "projectId": GCP_PROJECT_ID,
                "location": GCP_LOCATION,
            }
        },
    },
)

Running a Data Pipeline

To run a Data Pipelines instance, use RunDataPipelineOperator. The operator accesses Google Cloud’s Data Pipelines API and calls upon the run method to run the given pipeline.

RunDataPipelineOperator can take in four parameters:

  • data_pipeline_name: the name of the Data Pipelines instance

  • project_id: the ID of the GCP project that owns the job

  • location: the location of the Data Pipelines instance

  • gcp_conn_id: the connection ID to connect to the Google Cloud Platform

Only the Data Pipeline name and Project ID are required parameters, as the Location and GCP Connection ID have default values. The Project ID and Location will be used to build the parent name, which is where the given Data Pipeline should be located.

You can run a Data Pipelines instance by running the above parameters with RunDataPipelineOperator:

tests/system/providers/google/cloud/datapipelines/example_datapipeline.py[source]

run_data_pipeline = RunDataPipelineOperator(
    task_id="run_data_pipeline",
    data_pipeline_name=PIPELINE_NAME,
    project_id=GCP_PROJECT_ID,
)

Once called, the RunDataPipelineOperator will return the Google Cloud Dataflow Job created by running the given pipeline.

For further information regarding the API usage, see Data Pipelines API REST Resource in the Google Cloud documentation.

Was this entry helpful?