Amazon Elastic Kubernetes Service (EKS)¶
Amazon Elastic Kubernetes Service (Amazon EKS) is a managed service that makes it easy for you to run Kubernetes on AWS without needing to stand up or maintain your own Kubernetes control plane. Kubernetes is an open-source system for automating the deployment, scaling, and management of containerized applications.
Airflow provides operators to create and interact with the EKS clusters and compute infrastructure.
Prerequisite Tasks¶
To use these operators, you must do a few things:
Create necessary resources using AWS Console or AWS CLI.
Install API libraries via pip.
pip install 'apache-airflow[amazon]'Detailed information is available Installation of Airflow®
Generic Parameters¶
- aws_conn_id
Reference to Amazon Web Services Connection ID. If this parameter is set to
None
then the default boto3 behaviour is used without a connection lookup. Otherwise use the credentials stored in the Connection. Default:aws_default
- region_name
AWS Region Name. If this parameter is set to
None
or omitted then region_name from AWS Connection Extra Parameter will be used. Otherwise use the specified value instead of the connection value. Default:None
- verify
Whether or not to verify SSL certificates.
False
- Do not validate SSL certificates.path/to/cert/bundle.pem - A filename of the CA cert bundle to use. You can specify this argument if you want to use a different CA cert bundle than the one used by botocore.
If this parameter is set to
None
or is omitted then verify from AWS Connection Extra Parameter will be used. Otherwise use the specified value instead of the connection value. Default:None
- botocore_config
The provided dictionary is used to construct a botocore.config.Config. This configuration can be used to configure Avoid Throttling exceptions, timeouts, etc.
Example, for more detail about parameters please have a look botocore.config.Config¶{ "signature_version": "unsigned", "s3": { "us_east_1_regional_endpoint": True, }, "retries": { "mode": "standard", "max_attempts": 10, }, "connect_timeout": 300, "read_timeout": 300, "tcp_keepalive": True, }
If this parameter is set to
None
or omitted then config_kwargs from AWS Connection Extra Parameter will be used. Otherwise use the specified value instead of the connection value. Default:None
Note
Specifying an empty dictionary,
{}
, will overwrite the connection configuration for botocore.config.Config
Operators¶
Create an Amazon EKS cluster¶
To create an Amazon EKS Cluster you can use
EksCreateClusterOperator
.
- Note: An AWS IAM role with the following permissions is required:
eks.amazonaws.com
must be added to the Trusted RelationshipsAmazonEKSClusterPolicy
IAM Policy must be attached
tests/system/amazon/aws/example_eks_with_nodegroups.py
# Create an Amazon EKS Cluster control plane without attaching compute service.
create_cluster = EksCreateClusterOperator(
task_id="create_cluster",
cluster_name=cluster_name,
cluster_role_arn=test_context[ROLE_ARN_KEY],
resources_vpc_config={"subnetIds": test_context[SUBNETS_KEY]},
compute=None,
)
Create an Amazon EKS cluster and node group in one step¶
To create an Amazon EKS cluster and an EKS managed node group in one command, you can use
EksCreateClusterOperator
.
- Note: An AWS IAM role with the following permissions is required:
ec2.amazon.aws.com
must be in the Trusted Relationshipseks.amazonaws.com
must be added to the Trusted RelationshipsAmazonEC2ContainerRegistryReadOnly
IAM Policy must be attachedAmazonEKSClusterPolicy
IAM Policy must be attachedAmazonEKSWorkerNodePolicy
IAM Policy must be attached
tests/system/amazon/aws/example_eks_with_nodegroup_in_one_step.py
# Create an Amazon EKS cluster control plane and an EKS nodegroup compute platform in one step.
create_cluster_and_nodegroup = EksCreateClusterOperator(
task_id="create_cluster_and_nodegroup",
cluster_name=cluster_name,
nodegroup_name=nodegroup_name,
cluster_role_arn=test_context[ROLE_ARN_KEY],
# Opting to use the same ARN for the cluster and the nodegroup here,
# but a different ARN could be configured and passed if desired.
nodegroup_role_arn=test_context[ROLE_ARN_KEY],
resources_vpc_config={"subnetIds": test_context[SUBNETS_KEY]},
# ``compute='nodegroup'`` is the default, explicitly set here for demo purposes.
compute="nodegroup",
# The launch template enforces IMDSv2 and is required for internal
# compliance when running these system tests on AWS infrastructure.
create_nodegroup_kwargs={"launchTemplate": {"name": launch_template_name}},
)
Create an Amazon EKS cluster and AWS Fargate profile in one step¶
To create an Amazon EKS cluster and an AWS Fargate profile in one command, you can use
EksCreateClusterOperator
.
You can also run this operator in deferrable mode by setting deferrable
param to True
.
- Note: An AWS IAM role with the following permissions is required:
ec2.amazon.aws.com
must be in the Trusted Relationshipseks.amazonaws.com
must be added to the Trusted RelationshipsAmazonEC2ContainerRegistryReadOnly
IAM Policy must be attachedAmazonEKSClusterPolicy
IAM Policy must be attachedAmazonEKSWorkerNodePolicy
IAM Policy must be attached
tests/system/amazon/aws/example_eks_with_fargate_in_one_step.py
# Create an Amazon EKS cluster control plane and an AWS Fargate compute platform in one step.
create_cluster_and_fargate_profile = EksCreateClusterOperator(
task_id="create_eks_cluster_and_fargate_profile",
cluster_name=cluster_name,
cluster_role_arn=cluster_role_arn,
resources_vpc_config={
"subnetIds": subnets,
"endpointPublicAccess": True,
"endpointPrivateAccess": False,
},
compute="fargate",
fargate_profile_name=fargate_profile_name,
# Opting to use the same ARN for the cluster and the pod here,
# but a different ARN could be configured and passed if desired.
fargate_pod_execution_role_arn=fargate_pod_role_arn,
)
Delete an Amazon EKS Cluster¶
To delete an existing Amazon EKS Cluster you can use
EksDeleteClusterOperator
.
You can also run this operator in deferrable mode by setting deferrable
param to True
.
tests/system/amazon/aws/example_eks_with_nodegroups.py
delete_cluster = EksDeleteClusterOperator(
task_id="delete_cluster",
cluster_name=cluster_name,
)
- Note: If the cluster has any attached resources, such as an Amazon EKS Nodegroup or AWS
Fargate profile, the cluster can not be deleted. Using the
force
parameter will attempt to delete any attached resources first.
tests/system/amazon/aws/example_eks_with_nodegroup_in_one_step.py
# An Amazon EKS cluster can not be deleted with attached resources such as nodegroups or Fargate profiles.
# Setting the `force` to `True` will delete any attached resources before deleting the cluster.
delete_nodegroup_and_cluster = EksDeleteClusterOperator(
task_id="delete_nodegroup_and_cluster",
cluster_name=cluster_name,
force_delete_compute=True,
)
Create an Amazon EKS managed node group¶
To create an Amazon EKS managed node group you can use
EksCreateNodegroupOperator
.
You can also run this operator in deferrable mode by setting deferrable
param to True
.
- Note: An AWS IAM role with the following permissions is required:
ec2.amazon.aws.com
must be in the Trusted RelationshipsAmazonEC2ContainerRegistryReadOnly
IAM Policy must be attachedAmazonEKSWorkerNodePolicy
IAM Policy must be attached
tests/system/amazon/aws/example_eks_with_nodegroups.py
create_nodegroup = EksCreateNodegroupOperator(
task_id="create_nodegroup",
cluster_name=cluster_name,
nodegroup_name=nodegroup_name,
nodegroup_subnets=test_context[SUBNETS_KEY],
nodegroup_role_arn=test_context[ROLE_ARN_KEY],
)
Delete an Amazon EKS managed node group¶
To delete an existing Amazon EKS managed node group you can use
EksDeleteNodegroupOperator
.
You can also run this operator in deferrable mode by setting deferrable
param to True
.
tests/system/amazon/aws/example_eks_with_nodegroups.py
delete_nodegroup = EksDeleteNodegroupOperator(
task_id="delete_nodegroup",
cluster_name=cluster_name,
nodegroup_name=nodegroup_name,
)
Create an AWS Fargate Profile¶
To create an AWS Fargate Profile you can use
EksCreateFargateProfileOperator
.
- Note: An AWS IAM role with the following permissions is required:
ec2.amazon.aws.com
must be in the Trusted RelationshipsAmazonEC2ContainerRegistryReadOnly
IAM Policy must be attachedAmazonEKSWorkerNodePolicy
IAM Policy must be attached
tests/system/amazon/aws/example_eks_with_fargate_profile.py
create_fargate_profile = EksCreateFargateProfileOperator(
task_id="create_eks_fargate_profile",
cluster_name=cluster_name,
pod_execution_role_arn=fargate_pod_role_arn,
fargate_profile_name=fargate_profile_name,
selectors=SELECTORS,
)
Delete an AWS Fargate Profile¶
To delete an existing AWS Fargate Profile you can use
EksDeleteFargateProfileOperator
.
tests/system/amazon/aws/example_eks_with_fargate_profile.py
delete_fargate_profile = EksDeleteFargateProfileOperator(
task_id="delete_eks_fargate_profile",
cluster_name=cluster_name,
fargate_profile_name=fargate_profile_name,
)
Perform a Task on an Amazon EKS Cluster¶
To run a pod on an existing Amazon EKS Cluster, you can use
EksPodOperator
.
Note: An Amazon EKS Cluster with underlying compute infrastructure is required.
tests/system/amazon/aws/example_eks_with_nodegroups.py
start_pod = EksPodOperator(
task_id="start_pod",
pod_name="test_pod",
cluster_name=cluster_name,
image="amazon/aws-cli:latest",
cmds=["sh", "-c", "echo Test Airflow; date"],
labels={"demo": "hello_world"},
get_logs=True,
on_finish_action="keep_pod",
)
Sensors¶
Wait on an Amazon EKS cluster state¶
To check the state of an Amazon EKS Cluster until it reaches the target state or another terminal
state you can use EksClusterStateSensor
.
tests/system/amazon/aws/example_eks_with_nodegroups.py
await_create_cluster = EksClusterStateSensor(
task_id="await_create_cluster",
cluster_name=cluster_name,
target_state=ClusterStates.ACTIVE,
)
Wait on an Amazon EKS managed node group state¶
To check the state of an Amazon EKS managed node group until it reaches the target state or another terminal
state you can use EksNodegroupStateSensor
.
tests/system/amazon/aws/example_eks_with_nodegroups.py
await_create_nodegroup = EksNodegroupStateSensor(
task_id="await_create_nodegroup",
cluster_name=cluster_name,
nodegroup_name=nodegroup_name,
target_state=NodegroupStates.ACTIVE,
)
Wait on an AWS Fargate profile state¶
To check the state of an AWS Fargate profile until it reaches the target state or another terminal
state you can use EksFargateProfileSensor
.
tests/system/amazon/aws/example_eks_with_fargate_profile.py
await_create_fargate_profile = EksFargateProfileStateSensor(
task_id="wait_for_create_fargate_profile",
cluster_name=cluster_name,
fargate_profile_name=fargate_profile_name,
target_state=FargateProfileStates.ACTIVE,
)