Google DataFusion Operators

Cloud Data Fusion is a fully managed, cloud-native data integration service that helps users efficiently build and manage ETL/ELT data pipelines. With a graphical interface and a broad open source library of preconfigured connectors and transformations, Cloud Data Fusion shifts an organization’s focus away from code and integration to insights and action.

Prerequisite Tasks

To use these operators, you must do a few things:

Restart DataFusion Instance

To restart Data Fusion instance use: CloudDataFusionRestartInstanceOperator.

tests/system/providers/google/cloud/datafusion/example_datafusion.py[source]

restart_instance = CloudDataFusionRestartInstanceOperator(
    location=LOCATION, instance_name=INSTANCE_NAME, task_id="restart_instance"
)

You can use Jinja templating with instance_name, impersonation_chain parameters which allows you to dynamically determine values. The result is saved to XCom, which allows it to be used by other operators.

Delete DataFusion Instance

To delete Data Fusion instance use: CloudDataFusionDeleteInstanceOperator.

tests/system/providers/google/cloud/datafusion/example_datafusion.py[source]

delete_instance = CloudDataFusionDeleteInstanceOperator(
    location=LOCATION,
    instance_name=INSTANCE_NAME,
    task_id="delete_instance",
    trigger_rule=TriggerRule.ALL_DONE,
)

You can use Jinja templating with instance_name, impersonation_chain parameters which allows you to dynamically determine values. The result is saved to XCom, which allows it to be used by other operators.

Create DataFusion Instance

To create Data Fusion instance use: CloudDataFusionCreateInstanceOperator.

tests/system/providers/google/cloud/datafusion/example_datafusion.py[source]

create_instance = CloudDataFusionCreateInstanceOperator(
    location=LOCATION,
    instance_name=INSTANCE_NAME,
    instance=INSTANCE,
    task_id="create_instance",
)

You can use Jinja templating with instance_name, instance, impersonation_chain parameters which allows you to dynamically determine values. The result is saved to XCom, which allows it to be used by other operators.

Update DataFusion Instance

To update Data Fusion instance use: CloudDataFusionUpdateInstanceOperator.

tests/system/providers/google/cloud/datafusion/example_datafusion.py[source]

update_instance = CloudDataFusionUpdateInstanceOperator(
    location=LOCATION,
    instance_name=INSTANCE_NAME,
    instance=INSTANCE,
    update_mask="",
    task_id="update_instance",
)

You can use Jinja templating with instance_name, instance, impersonation_chain parameters which allows you to dynamically determine values. The result is saved to XCom, which allows it to be used by other operators.

Get DataFusion Instance

To retrieve Data Fusion instance use: CloudDataFusionGetInstanceOperator.

tests/system/providers/google/cloud/datafusion/example_datafusion.py[source]

get_instance = CloudDataFusionGetInstanceOperator(
    location=LOCATION, instance_name=INSTANCE_NAME, task_id="get_instance"
)

You can use Jinja templating with instance_name, impersonation_chain parameters which allows you to dynamically determine values. The result is saved to XCom, which allows it to be used by other operators.

Create a DataFusion pipeline

To create Data Fusion pipeline use: CloudDataFusionCreatePipelineOperator.

tests/system/providers/google/cloud/datafusion/example_datafusion.py[source]

create_pipeline = CloudDataFusionCreatePipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    pipeline=PIPELINE,
    instance_name=INSTANCE_NAME,
    task_id="create_pipeline",
)

You can use Jinja templating with instance_name, pipeline_name, impersonation_chain parameters which allows you to dynamically determine values. The result is saved to XCom, which allows it to be used by other operators.

Start a DataFusion pipeline

To start Data Fusion pipeline using synchronous mode: CloudDataFusionStartPipelineOperator.

tests/system/providers/google/cloud/datafusion/example_datafusion.py[source]

start_pipeline = CloudDataFusionStartPipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    task_id="start_pipeline",
)

To start Data Fusion pipeline using asynchronous mode: CloudDataFusionStartPipelineOperator.

tests/system/providers/google/cloud/datafusion/example_datafusion.py[source]

start_pipeline_async = CloudDataFusionStartPipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    asynchronous=True,
    task_id="start_pipeline_async",
)

There is a possibility to start Data Fusion pipeline asynchronously using deferrable mode. While asynchronous parameter gives a possibility to wait until DataFusion pipeline reaches terminate state using synchronous sleep() method, deferrable mode checks for the state using asynchronous calls. It is not possible to use both asynchronous and deferrable parameters at the same time. Please, check the example of using deferrable mode: CloudDataFusionStartPipelineOperator.

tests/system/providers/google/cloud/datafusion/example_datafusion.py[source]

start_pipeline_def = CloudDataFusionStartPipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    task_id="start_pipeline_def",
    deferrable=True,
)

You can use Jinja templating with instance_name, pipeline_name, runtime_args, impersonation_chain parameters which allows you to dynamically determine values. The result is saved to XCom, which allows it to be used by other operators.

Stop a DataFusion pipeline

To stop Data Fusion pipeline use: CloudDataFusionStopPipelineOperator.

tests/system/providers/google/cloud/datafusion/example_datafusion.py[source]

stop_pipeline = CloudDataFusionStopPipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    task_id="stop_pipeline",
)

You can use Jinja templating with instance_name, pipeline_name, impersonation_chain parameters which allows you to dynamically determine values. The result is saved to XCom, which allows it to be used by other operators.

Delete a DataFusion pipeline

To delete Data Fusion pipeline use: CloudDataFusionDeletePipelineOperator.

tests/system/providers/google/cloud/datafusion/example_datafusion.py[source]

delete_pipeline = CloudDataFusionDeletePipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    task_id="delete_pipeline",
    trigger_rule=TriggerRule.ALL_DONE,
)

You can use Jinja templating with instance_name, version_id, pipeline_name, impersonation_chain parameters which allows you to dynamically determine values. The result is saved to XCom, which allows it to be used by other operators.

List DataFusion pipelines

To list Data Fusion pipelines use: CloudDataFusionListPipelinesOperator.

tests/system/providers/google/cloud/datafusion/example_datafusion.py[source]

list_pipelines = CloudDataFusionListPipelinesOperator(
    location=LOCATION, instance_name=INSTANCE_NAME, task_id="list_pipelines"
)

You can use Jinja templating with instance_name, artifact_name, artifact_version, impersonation_chain parameters which allows you to dynamically determine values. The result is saved to XCom, which allows it to be used by other operators.

Sensors

When start pipeline is triggered asynchronously sensors may be used to run checks and verify that the pipeline is in correct state.

CloudDataFusionPipelineStateSensor.

tests/system/providers/google/cloud/datafusion/example_datafusion.py[source]

start_pipeline_sensor = CloudDataFusionPipelineStateSensor(
    task_id="pipeline_state_sensor",
    pipeline_name=PIPELINE_NAME,
    pipeline_id=start_pipeline_async.output,
    expected_statuses=["COMPLETED"],
    failure_statuses=["FAILED"],
    instance_name=INSTANCE_NAME,
    location=LOCATION,
)

CloudDataFusionPipelineStateSensor.

Was this entry helpful?