Google Kubernetes Engine Operators

Google Kubernetes Engine (GKE) provides a managed environment for deploying, managing, and scaling your containerized applications using Google infrastructure. The GKE environment consists of multiple machines (specifically, Compute Engine instances) grouped together to form a cluster.

Prerequisite Tasks

To use these operators, you must do a few things:

Manage GKE cluster

A cluster is the foundation of GKE - all workloads run on top of the cluster. It is made up on a cluster master and worker nodes. The lifecycle of the master is managed by GKE when creating or deleting a cluster. The worker nodes are represented as Compute Engine VM instances that GKE creates on your behalf when creating a cluster.

Create GKE cluster

Here is an example of a cluster definition:

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine.py[source]

CLUSTER = {"name": CLUSTER_NAME, "initial_node_count": 1}

A dict object like this, or a Cluster definition, is required when creating a cluster with GKECreateClusterOperator.

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine.py[source]

create_cluster = GKECreateClusterOperator(
    task_id="create_cluster",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    body=CLUSTER,
)

You can use deferrable mode for this action in order to run the operator asynchronously. It will give you a possibility to free up the worker when it knows it has to wait, and hand off the job of resuming Operator to a Trigger. As a result, while it is suspended (deferred), it is not taking up a worker slot and your cluster will have a lot less resources wasted on idle Operators or Sensors:

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py[source]

create_cluster = GKECreateClusterOperator(
    task_id="create_cluster",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    body=CLUSTER,
    deferrable=True,
)

Install Kueue of specific version inside Cluster

Kueue is a Cloud Native Job scheduler that works with the default Kubernetes scheduler, the Job controller, and the cluster autoscaler to provide an end-to-end batch system. Kueue implements Job queueing, deciding when Jobs should wait and when they should start, based on quotas and a hierarchy for sharing resources fairly among teams. Kueue supports Autopilot clusters, Standard GKE with Node Auto-provisioning and regular autoscaled node pools. To install and use Kueue on your cluster with the help of GKEStartKueueInsideClusterOperator as shown in this example:

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_kueue.py[source]

    add_kueue_cluster = GKEStartKueueInsideClusterOperator(
        task_id="add_kueue_cluster",
        project_id=GCP_PROJECT_ID,
        location=GCP_LOCATION,
        cluster_name=CLUSTER_NAME,
        kueue_version="v0.5.1",
    )

Delete GKE cluster

To delete a cluster, use GKEDeleteClusterOperator. This would also delete all the nodes allocated to the cluster.

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine.py[source]

delete_cluster = GKEDeleteClusterOperator(
    task_id="delete_cluster",
    name=CLUSTER_NAME,
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
)

You can use deferrable mode for this action in order to run the operator asynchronously. It will give you a possibility to free up the worker when it knows it has to wait, and hand off the job of resuming Operator to a Trigger. As a result, while it is suspended (deferred), it is not taking up a worker slot and your cluster will have a lot less resources wasted on idle Operators or Sensors:

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py[source]

delete_cluster = GKEDeleteClusterOperator(
    task_id="delete_cluster",
    name=CLUSTER_NAME,
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    deferrable=True,
)

Manage workloads on a GKE cluster

GKE works with containerized applications, such as those created on Docker, and deploys them to run on the cluster. These are called workloads, and when deployed on the cluster they leverage the CPU and memory resources of the cluster to run effectively.

Run a Pod on a GKE cluster

There are two operators available in order to run a pod on a GKE cluster:

GKEStartPodOperator extends KubernetesPodOperator to provide authorization using Google Cloud credentials. There is no need to manage the kube_config file, as it will be generated automatically. All Kubernetes parameters (except config_file) are also valid for the GKEStartPodOperator. For more information on KubernetesPodOperator, please look at: KubernetesPodOperator guide.

Using with Private cluster

All clusters have a canonical endpoint. The endpoint is the IP address of the Kubernetes API server that Airflow use to communicate with your cluster master. The endpoint is displayed in Cloud Console under the Endpoints field of the cluster’s Details tab, and in the output of gcloud container clusters describe in the endpoint field.

Private clusters have two unique endpoint values: privateEndpoint, which is an internal IP address, and publicEndpoint, which is an external one. Running GKEStartPodOperator against a private cluster sets the external IP address as the endpoint by default. If you prefer to use the internal IP as the endpoint, you need to set use_internal_ip parameter to True.

Using with Autopilot (serverless) cluster

When running on serverless cluster like GKE Autopilot, the pod startup can sometimes take longer due to cold start. During the pod startup, the status is checked in regular short intervals and warning messages are emitted if the pod has not yet started. You can increase this interval length via the startup_check_interval_seconds parameter, with recommendation of 60 seconds.

Use of XCom

We can enable the usage of XCom on the operator. This works by launching a sidecar container with the pod specified. The sidecar is automatically mounted when the XCom usage is specified and its mount point is the path /airflow/xcom. To provide values to the XCom, ensure your Pod writes it into a file called return.json in the sidecar. The contents of this can then be used downstream in your DAG. Here is an example of it being used:

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine.py[source]

pod_task_xcom = GKEStartPodOperator(
    task_id="pod_task_xcom",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    do_xcom_push=True,
    namespace="default",
    image="alpine",
    cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
    name="test-pod-xcom",
    in_cluster=False,
    on_finish_action="delete_pod",
)

And then use it in other operators:

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine.py[source]

pod_task_xcom_result = BashOperator(
    bash_command="echo \"{{ task_instance.xcom_pull('pod_task_xcom')[0] }}\"",
    task_id="pod_task_xcom_result",
)

You can use deferrable mode for this action in order to run the operator asynchronously. It will give you a possibility to free up the worker when it knows it has to wait, and hand off the job of resuming Operator to a Trigger. As a result, while it is suspended (deferred), it is not taking up a worker slot and your cluster will have a lot less resources wasted on idle Operators or Sensors:

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py[source]

pod_task_xcom_async = GKEStartPodOperator(
    task_id="pod_task_xcom_async",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    namespace="default",
    image="alpine",
    cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
    name="test-pod-xcom-async",
    in_cluster=False,
    on_finish_action="delete_pod",
    do_xcom_push=True,
    deferrable=True,
    get_logs=True,
)

Run a Job on a GKE cluster

There are two operators available in order to run a job on a GKE cluster:

GKEStartJobOperator extends KubernetesJobOperator to provide authorization using Google Cloud credentials. There is no need to manage the kube_config file, as it will be generated automatically. All Kubernetes parameters (except config_file) are also valid for the GKEStartJobOperator.

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py[source]

job_task = GKEStartJobOperator(
    task_id="job_task",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    namespace=JOB_NAMESPACE,
    image="perl:5.34.0",
    cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
    name=JOB_NAME,
)

GKEStartJobOperator also supports deferrable mode. Note that it makes sense only if the wait_until_job_complete parameter is set True.

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py[source]

job_task_def = GKEStartJobOperator(
    task_id="job_task_def",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    namespace=JOB_NAMESPACE,
    image="perl:5.34.0",
    cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
    name=JOB_NAME_DEF,
    wait_until_job_complete=True,
    deferrable=True,
)

For run Job on a GKE cluster with Kueue enabled use GKEStartKueueJobOperator.

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_kueue.py[source]

kueue_job_task = GKEStartKueueJobOperator(
    task_id="kueue_job_task",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    queue_name=QUEUE_NAME,
    namespace="default",
    parallelism=3,
    image="perl:5.34.0",
    cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
    name="test-pi",
    suspend=True,
    container_resources=k8s.V1ResourceRequirements(
        requests={
            "cpu": 1,
            "memory": "200Mi",
        },
    ),
)

Delete a Job on a GKE cluster

There are two operators available in order to delete a job on a GKE cluster:

GKEDeleteJobOperator extends KubernetesDeleteJobOperator to provide authorization using Google Cloud credentials. There is no need to manage the kube_config file, as it will be generated automatically. All Kubernetes parameters (except config_file) are also valid for the GKEDeleteJobOperator.

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py[source]

delete_job = GKEDeleteJobOperator(
    task_id="delete_job",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    name=JOB_NAME,
    namespace=JOB_NAMESPACE,
)

Retrieve information about Job by given name

You can use GKEDescribeJobOperator to retrieve detailed description of existing Job by providing its name and namespace.

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py[source]

describe_job_task = GKEDescribeJobOperator(
    task_id="describe_job_task",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    job_name=job_task.output["job_name"],
    namespace="default",
    cluster_name=CLUSTER_NAME,
)

Retrieve list of Jobs

You can use GKEListJobsOperator to retrieve list of existing Jobs. If namespace parameter is provided, output will include Jobs across given namespace. If namespace parameter is not specified, the information across all the namespaces will be outputted.

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py[source]

list_job_task = GKEListJobsOperator(
    task_id="list_job_task", project_id=GCP_PROJECT_ID, location=GCP_LOCATION, cluster_name=CLUSTER_NAME
)

Create a resource in a GKE cluster

You can use GKECreateCustomResourceOperator to create resource in the specified Google Kubernetes Engine cluster.

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_resource.py[source]

create_resource_task = GKECreateCustomResourceOperator(
    task_id="create_resource_task",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    yaml_conf=PVC_CONF,
)

Delete a resource in a GKE cluster

You can use GKEDeleteCustomResourceOperator to delete resource in the specified Google Kubernetes Engine cluster.

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_resource.py[source]

delete_resource_task = GKEDeleteCustomResourceOperator(
    task_id="delete_resource_task",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    yaml_conf=PVC_CONF,
)

Was this entry helpful?