Google Cloud Tasks¶
Cloud Tasks is a fully managed service that allows you to manage the execution, dispatch, and delivery of a large number of distributed tasks. Using Cloud Tasks, you can perform work asynchronously outside of a user or service-to-service request.
For more information about the service visit Cloud Tasks product documentation
Prerequisite Tasks¶
To use these operators, you must do a few things:
Select or create a Cloud Platform project using the Cloud Console.
Enable billing for your project, as described in the Google Cloud documentation.
Enable the API, as described in the Cloud Console documentation.
Install API libraries via pip.
pip install 'apache-airflow[google]'Detailed information is available for Installation.
Queue operations¶
Create queue¶
To create new Queue use
CloudTasksQueueCreateOperator
tests/system/google/cloud/tasks/example_queue.py
create_queue = CloudTasksQueueCreateOperator(
location=LOCATION,
task_queue=Queue(stackdriver_logging_config=dict(sampling_ratio=0.5)),
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
retry=Retry(maximum=10.0),
timeout=5,
task_id="create_queue",
)
Delete queue¶
To delete Queue use
CloudTasksQueueDeleteOperator
tests/system/google/cloud/tasks/example_queue.py
delete_queue = CloudTasksQueueDeleteOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="delete_queue",
)
Resume queue¶
To resume Queue use
CloudTasksQueueResumeOperator
tests/system/google/cloud/tasks/example_queue.py
resume_queue = CloudTasksQueueResumeOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="resume_queue",
)
Pause queue¶
To pause Queue use
CloudTasksQueuePauseOperator
tests/system/google/cloud/tasks/example_queue.py
pause_queue = CloudTasksQueuePauseOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="pause_queue",
)
Purge queue¶
To purge Queue use
CloudTasksQueuePurgeOperator
tests/system/google/cloud/tasks/example_queue.py
purge_queue = CloudTasksQueuePurgeOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="purge_queue",
)
Get queue¶
To get Queue use
CloudTasksQueueGetOperator
tests/system/google/cloud/tasks/example_queue.py
get_queue = CloudTasksQueueGetOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="get_queue",
)
get_queue_result = BashOperator(
task_id="get_queue_result",
bash_command=f"echo {get_queue.output}",
)
Update queue¶
To update Queue use
CloudTasksQueueUpdateOperator
tests/system/google/cloud/tasks/example_queue.py
update_queue = CloudTasksQueueUpdateOperator(
task_queue=Queue(stackdriver_logging_config=dict(sampling_ratio=1)),
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
update_mask=FieldMask(paths=["stackdriver_logging_config.sampling_ratio"]),
task_id="update_queue",
)
List queues¶
To list all Queues use
CloudTasksQueuesListOperator
tests/system/google/cloud/tasks/example_queue.py
list_queue = CloudTasksQueuesListOperator(location=LOCATION, task_id="list_queue")
Tasks operations¶
Create task¶
To create new Task in a particular queue use
CloudTasksTaskCreateOperator
tests/system/google/cloud/tasks/example_tasks.py
create_task = CloudTasksTaskCreateOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task=TASK,
task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
retry=Retry(maximum=10.0),
timeout=5,
task_id="create_task_to_run",
)
Get task¶
To get the Tasks in a particular queue use
CloudTasksTaskGetOperator
tests/system/google/cloud/tasks/example_tasks.py
tasks_get = CloudTasksTaskGetOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="tasks_get",
)
Run task¶
To run the Task in a particular queue use
CloudTasksTaskRunOperator
tests/system/google/cloud/tasks/example_tasks.py
run_task = CloudTasksTaskRunOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
retry=Retry(maximum=10.0),
task_id="run_task",
)
List tasks¶
To list all Tasks in a particular queue use
CloudTasksTasksListOperator
tests/system/google/cloud/tasks/example_tasks.py
list_tasks = CloudTasksTasksListOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="list_tasks",
)
Delete task¶
To delete the Task from particular queue use
CloudTasksTaskDeleteOperator
tests/system/google/cloud/tasks/example_tasks.py
create_task = CloudTasksTaskCreateOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task=TASK,
task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
retry=Retry(maximum=10.0),
timeout=5,
task_id="create_task_to_run",
)
References¶
For further information, take a look at: