Google Cloud Transfer Service Operators¶
Prerequisite Tasks¶
CloudDataTransferServiceCreateJobOperator¶
Create a transfer job.
The function accepts dates in two formats:
consistent with Google API
{ "year": 2019, "month": 2, "day": 11 }
as an
datetime
object
The function accepts time in two formats:
consistent with Google API
{ "hours": 12, "minutes": 30, "seconds": 0 }
as an
time
object
If you want to create a job transfer that copies data from AWS S3 then you must have a connection configured. Information about configuration for AWS is available: Amazon Web Services Connection
The selected connection for AWS can be indicated by the parameter aws_conn_id
.
For parameter definition, take a look at
CloudDataTransferServiceCreateJobOperator
.
Using the operator¶
gcs_to_gcs_transfer_body = {
DESCRIPTION: "description",
STATUS: GcpTransferJobsStatus.ENABLED,
PROJECT_ID: PROJECT_ID_TRANSFER,
SCHEDULE: {
SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
START_TIME_OF_DAY: (datetime.utcnow() + timedelta(seconds=120)).time(),
},
TRANSFER_SPEC: {
GCS_DATA_SOURCE: {BUCKET_NAME: BUCKET_NAME_SRC},
GCS_DATA_SINK: {BUCKET_NAME: BUCKET_NAME_DST},
TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
},
}
aws_to_gcs_transfer_body = {
DESCRIPTION: GCP_DESCRIPTION,
STATUS: GcpTransferJobsStatus.ENABLED,
PROJECT_ID: GCP_PROJECT_ID,
JOB_NAME: GCP_TRANSFER_JOB_NAME,
SCHEDULE: {
SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
START_TIME_OF_DAY: (datetime.utcnow() + timedelta(minutes=2)).time(),
},
TRANSFER_SPEC: {
AWS_S3_DATA_SOURCE: {BUCKET_NAME: GCP_TRANSFER_SOURCE_AWS_BUCKET},
GCS_DATA_SINK: {BUCKET_NAME: GCP_TRANSFER_FIRST_TARGET_BUCKET},
TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
},
}
create_transfer_job_from_aws = CloudDataTransferServiceCreateJobOperator(
task_id="create_transfer_job_from_aws", body=aws_to_gcs_transfer_body
)
Templating¶
template_fields: Sequence[str] = (
'body',
'gcp_conn_id',
'aws_conn_id',
'google_impersonation_chain',
)
More information¶
See Google Cloud Transfer Service - Method: transferJobs.create.
CloudDataTransferServiceDeleteJobOperator¶
Deletes a transfer job.
For parameter definition, take a look at
CloudDataTransferServiceDeleteJobOperator
.
Using the operator¶
delete_transfer_from_aws_job = CloudDataTransferServiceDeleteJobOperator(
task_id="delete_transfer_from_aws_job",
job_name="{{task_instance.xcom_pull('create_transfer_job_from_aws')['name']}}",
project_id=GCP_PROJECT_ID,
)
Templating¶
template_fields: Sequence[str] = (
'job_name',
'project_id',
'gcp_conn_id',
'api_version',
'google_impersonation_chain',
)
More information¶
See Google Cloud Transfer Service - REST Resource: transferJobs - Status
CloudDataTransferServiceUpdateJobOperator¶
Updates a transfer job.
For parameter definition, take a look at
CloudDataTransferServiceUpdateJobOperator
.
Using the operator¶
update_body = {
PROJECT_ID: PROJECT_ID_TRANSFER,
TRANSFER_JOB: {DESCRIPTION: "description_updated"},
TRANSFER_JOB_FIELD_MASK: "description",
}
update_transfer = CloudDataTransferServiceUpdateJobOperator(
task_id="update_transfer",
job_name="{{task_instance.xcom_pull('create_transfer')['name']}}",
body=update_body,
)
Templating¶
template_fields: Sequence[str] = (
'job_name',
'body',
'gcp_conn_id',
'aws_conn_id',
'google_impersonation_chain',
)
More information¶
See Google Cloud Transfer Service - Method: transferJobs.patch
CloudDataTransferServiceCancelOperationOperator¶
Gets a transfer operation. The result is returned to XCOM.
For parameter definition, take a look at
CloudDataTransferServiceCancelOperationOperator
.
Using the operator¶
cancel_operation = CloudDataTransferServiceCancelOperationOperator(
task_id="cancel_operation",
operation_name="{{task_instance.xcom_pull("
"'wait_for_second_operation_to_start', key='sensed_operations')[0]['name']}}",
)
Templating¶
template_fields: Sequence[str] = (
'operation_name',
'gcp_conn_id',
'api_version',
'google_impersonation_chain',
)
More information¶
See Google Cloud Transfer Service - Method: transferOperations.cancel
CloudDataTransferServiceGetOperationOperator¶
Gets a transfer operation. The result is returned to XCOM.
For parameter definition, take a look at
CloudDataTransferServiceGetOperationOperator
.
Using the operator¶
get_operation = CloudDataTransferServiceGetOperationOperator(
task_id="get_operation", operation_name="{{task_instance.xcom_pull('list_operations')[0]['name']}}"
)
Templating¶
template_fields: Sequence[str] = (
'operation_name',
'gcp_conn_id',
'google_impersonation_chain',
)
More information¶
See Google Cloud Transfer Service - Method: transferOperations.get
CloudDataTransferServiceListOperationsOperator¶
List a transfer operations. The result is returned to XCOM.
For parameter definition, take a look at
CloudDataTransferServiceListOperationsOperator
.
Using the operator¶
list_operations = CloudDataTransferServiceListOperationsOperator(
task_id="list_operations",
request_filter={
FILTER_PROJECT_ID: GCP_PROJECT_ID,
FILTER_JOB_NAMES: ["{{task_instance.xcom_pull('create_transfer_job_from_aws')['name']}}"],
},
)
Templating¶
template_fields: Sequence[str] = (
'filter',
'gcp_conn_id',
'google_impersonation_chain',
)
More information¶
See Google Cloud Transfer Service - Method: transferOperations.list
CloudDataTransferServicePauseOperationOperator¶
Pauses a transfer operations.
For parameter definition, take a look at
CloudDataTransferServicePauseOperationOperator
.
Using the operator¶
pause_operation = CloudDataTransferServicePauseOperationOperator(
task_id="pause_operation",
operation_name="{{task_instance.xcom_pull('wait_for_operation_to_start', "
"key='sensed_operations')[0]['name']}}",
)
Templating¶
template_fields: Sequence[str] = (
'operation_name',
'gcp_conn_id',
'api_version',
'google_impersonation_chain',
)
More information¶
See Google Cloud Transfer Service - Method: transferOperations.pause
CloudDataTransferServiceResumeOperationOperator¶
Resumes a transfer operations.
For parameter definition, take a look at
CloudDataTransferServiceResumeOperationOperator
.
Using the operator¶
resume_operation = CloudDataTransferServiceResumeOperationOperator(
task_id="resume_operation", operation_name="{{task_instance.xcom_pull('get_operation')['name']}}"
)
Templating¶
template_fields: Sequence[str] = (
'operation_name',
'gcp_conn_id',
'api_version',
'google_impersonation_chain',
)
More information¶
See Google Cloud Transfer Service - Method: transferOperations.resume
CloudDataTransferServiceJobStatusSensor¶
Waits for at least one operation belonging to the job to have the expected status.
For parameter definition, take a look at
CloudDataTransferServiceJobStatusSensor
.
Using the operator¶
wait_for_operation_to_end = CloudDataTransferServiceJobStatusSensor(
task_id="wait_for_operation_to_end",
job_name="{{task_instance.xcom_pull('create_transfer_job_from_aws')['name']}}",
project_id=GCP_PROJECT_ID,
expected_statuses={GcpTransferOperationStatus.SUCCESS},
poke_interval=WAIT_FOR_OPERATION_POKE_INTERVAL,
)
Templating¶
template_fields: Sequence[str] = (
'job_name',
'impersonation_chain',
)