Google Dataform Operators¶
Dataform is a service for data analysts to develop, test, version control, and schedule complex SQL workflows for data transformation in BigQuery.
Dataform lets you manage data transformation in the Extraction, Loading, and Transformation (ELT) process for data integration. After raw data is extracted from source systems and loaded into BigQuery, Dataform helps you to transform it into a well-defined, tested, and documented suite of data tables.
For more information about the task visit Dataform production documentation <Product documentation
Configuration¶
Before you can use the Dataform operators you need to initialize repository and workspace, for more information about this visit Dataform Production documentation <Product documentation
Create Compilation Result¶
A simple configuration to create Compilation Result can look as followed:
DataformCreateCompilationResultOperator
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": "main",
"workspace": (
f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPOSITORY_ID}/"
f"workspaces/{WORKSPACE_ID}"
),
},
)
Get Compilation Result¶
To get a Compilation Result you can use:
DataformGetCompilationResultOperator
get_compilation_result = DataformGetCompilationResultOperator(
task_id="get_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result_id=(
"{{ task_instance.xcom_pull('create_compilation_result')['name'].split('/')[-1] }}"
),
)
Create Workflow Invocation¶
To create a Workflow Invocation you can use:
DataformCreateWorkflowInvocationOperator
We have possibility to run this operation in the sync mode and async, for async operation we also have
a sensor:
DataformWorkflowInvocationStateSensor
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
},
)
create_workflow_invocation_async = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation_async',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
},
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is_workflow_invocation_done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=(
"{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"
),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)
Get Workflow Invocation¶
To get a Workflow Invocation you can use:
DataformGetWorkflowInvocationOperator
get_workflow_invocation = DataformGetWorkflowInvocationOperator(
task_id='get_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=(
"{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"
),
)
Cancel Workflow Invocation¶
To cancel a Workflow Invocation you can use:
DataformCancelWorkflowInvocationOperator
cancel_workflow_invocation = DataformCancelWorkflowInvocationOperator(
task_id='cancel_workflow_incoation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=(
"{{ task_instance.xcom_pull('create_second_workflow_invocation')['name'].split('/')[-1] }}"
),
)