Amazon Elastic Container Service (ECS)

Amazon Elastic Container Service (Amazon ECS) is a fully managed container orchestration service that makes it easy for you to deploy, manage, and scale containerized applications.

Airflow provides operators to run Task Definitions on an ECS cluster.

Prerequisite Tasks

To use these operators, you must do a few things:

Generic Parameters

aws_conn_id

Reference to Amazon Web Services Connection ID. If this parameter is set to None then the default boto3 behaviour is used without a connection lookup. Otherwise use the credentials stored in the Connection. Default: aws_default

region_name

AWS Region Name. If this parameter is set to None or omitted then region_name from AWS Connection Extra Parameter will be used. Otherwise use the specified value instead of the connection value. Default: None

verify

Whether or not to verify SSL certificates.

  • False - Do not validate SSL certificates.

  • path/to/cert/bundle.pem - A filename of the CA cert bundle to use. You can specify this argument if you want to use a different CA cert bundle than the one used by botocore.

If this parameter is set to None or is omitted then verify from AWS Connection Extra Parameter will be used. Otherwise use the specified value instead of the connection value. Default: None

botocore_config

The provided dictionary is used to construct a botocore.config.Config. This configuration can be used to configure Avoid Throttling exceptions, timeouts, etc.

Example, for more detail about parameters please have a look botocore.config.Config
{
    "signature_version": "unsigned",
    "s3": {
        "us_east_1_regional_endpoint": True,
    },
    "retries": {
      "mode": "standard",
      "max_attempts": 10,
    },
    "connect_timeout": 300,
    "read_timeout": 300,
    "tcp_keepalive": True,
}

If this parameter is set to None or omitted then config_kwargs from AWS Connection Extra Parameter will be used. Otherwise use the specified value instead of the connection value. Default: None

Note

Specifying an empty dictionary, {}, will overwrite the connection configuration for botocore.config.Config

Operators

Create an AWS ECS Cluster

To create an Amazon ECS cluster you can use EcsCreateClusterOperator.

All optional parameters to be passed to the Create Cluster API should be passed in the ‘create_cluster_kwargs’ dict.

tests/system/amazon/aws/example_ecs.py

create_cluster = EcsCreateClusterOperator(
    task_id="create_cluster",
    cluster_name=new_cluster_name,
)

Delete an AWS ECS Cluster

To delete an Amazon ECS cluster you can use EcsDeleteClusterOperator.

tests/system/amazon/aws/example_ecs.py

delete_cluster = EcsDeleteClusterOperator(
    task_id="delete_cluster",
    cluster_name=new_cluster_name,
)

Register a Task Definition

To register a task definition you can use EcsRegisterTaskDefinitionOperator.

All optional parameters to be passed to the Register Task Definition API should be passed in the ‘register_task_kwargs’ dict.

tests/system/amazon/aws/example_ecs.py

register_task = EcsRegisterTaskDefinitionOperator(
    task_id="register_task",
    family=family_name,
    container_definitions=[
        {
            "name": container_name,
            "image": "ubuntu",
            "workingDirectory": "/usr/bin",
            "entryPoint": ["sh", "-c"],
            "command": ["ls"],
            "logConfiguration": {
                "logDriver": "awslogs",
                "options": {
                    "awslogs-group": log_group_name,
                    "awslogs-region": aws_region,
                    "awslogs-create-group": "true",
                    "awslogs-stream-prefix": "ecs",
                },
            },
        },
    ],
    register_task_kwargs={
        "cpu": "256",
        "memory": "512",
        "networkMode": "awsvpc",
    },
)

Deregister a Task Definition

To deregister a task definition you can use EcsDeregisterTaskDefinitionOperator.

tests/system/amazon/aws/example_ecs.py

deregister_task = EcsDeregisterTaskDefinitionOperator(
    task_id="deregister_task",
    task_definition=register_task.output,
)

Run a Task Definition

To run a Task Definition defined in an Amazon ECS cluster you can use EcsRunTaskOperator.

You need to have created your ECS Cluster, and have created a Task Definition before you can use this Operator. The Task Definition contains details of the containerized application you want to run.

This Operator support running your containers in ECS Clusters that are either Serverless (FARGATE), via EC2, or via external resources (EXTERNAL). The parameters you need to configure for this Operator will depend upon which launch_type you want to use.

launch_type="EC2|FARGATE|EXTERNAL"
  • If you are using AWS Fargate as your compute resource in your ECS Cluster, set the parameter launch_type to FARGATE. When using a launch type of FARGATE you will need to provide network_configuration parameters.

  • If you are using EC2 as the compute resources in your ECS Cluster, set the parameter to EC2.

  • If you have integrated external resources in your ECS Cluster, for example using ECS Anywhere, and want to run your containers on those external resources, set the parameter to EXTERNAL.

tests/system/amazon/aws/example_ecs.py

run_task = EcsRunTaskOperator(
    task_id="run_task",
    cluster=existing_cluster_name,
    task_definition=register_task.output,
    overrides={
        "containerOverrides": [
            {
                "name": container_name,
                "command": ["echo hello world"],
            },
        ],
    },
    network_configuration={"awsvpcConfiguration": {"subnets": existing_cluster_subnets}},
    awslogs_group=log_group_name,
    awslogs_region=aws_region,
    awslogs_stream_prefix=f"ecs/{container_name}",
)

tests/system/amazon/aws/example_ecs_fargate.py

hello_world = EcsRunTaskOperator(
    task_id="hello_world",
    cluster=cluster_name,
    task_definition=task_definition_name,
    launch_type="FARGATE",
    overrides={
        "containerOverrides": [
            {
                "name": container_name,
                "command": ["echo", "hello", "world"],
            },
        ],
    },
    network_configuration={
        "awsvpcConfiguration": {
            "subnets": test_context[SUBNETS_KEY],
            "securityGroups": test_context[SECURITY_GROUPS_KEY],
            "assignPublicIp": "ENABLED",
        },
    },
)

Stream logs to AWS CloudWatch

To stream logs to AWS CloudWatch, you need to define the parameters below. Using the example above, we would add these additional parameters to enable logging to CloudWatch. You need to ensure that you have the appropriate level of permissions (see next section).

tests/system/amazon/aws/example_ecs.py

awslogs_group=log_group_name,
awslogs_region=aws_region,
awslogs_stream_prefix=f"ecs/{container_name}",

IAM Permissions

You need to ensure you have the following IAM permissions to run tasks via the EcsRunTaskOperator:

{
    "Effect": "Allow",
    "Action": [
        "ecs:RunTask",
        "ecs:DescribeTasks",
    ]
    "Resource": [ "arn:aws:ecs:{aws region}:{aws account number}:task_definition/{task definition family}" ]
},
{
    "Effect": "Allow",
    "Action": [
        "iam:PassRole"
    ]
    "Resource": [ "arn:aws:iam::{aws account number}:role/{task execution role name}" ]
},
{
    "Effect": "Allow",
    "Action": [
      "ecs:DescribeTasks",
    ],
    "Resource": [ "arn:aws:ecs:{aws region}:{aws account number}:task/{ecs cluster name}/*" ]
}

If you use the “reattach=True” (the default is False), you need to add further permissions. You need to add the following additional Actions to the IAM policy.

"ecs:DescribeTaskDefinition",
"ecs:ListTasks"

CloudWatch Permissions

If you plan on streaming Apache Airflow logs into AWS CloudWatch, you need to ensure that you have configured the appropriate permissions set.

iam.PolicyStatement(
    actions=[
        "logs:CreateLogStream",
        "logs:CreateLogGroup",
        "logs:PutLogEvents",
        "logs:GetLogEvents",
        "logs:GetLogRecord",
        "logs:GetLogGroupFields",
        "logs:GetQueryResults"
    ],
    effect=iam.Effect.ALLOW,
    resources=[
        "arn:aws:logs:{aws region}:{aws account number}:log-group:{aws-log-group-name}:log-stream:{aws-log-stream-name}/\*"
        ]
)

Sensors

AWS ECS Cluster State Sensor

To poll the cluster state until it reaches a terminal state you can use EcsClusterStateSensor.

Defaults to EcsClusterStates.ACTIVE as a success state and no failure state, both can be overridden with provided values. Raises an AirflowException with the failure reason if a failed state is provided and that state is reached before the target state.

tests/system/amazon/aws/example_ecs.py

await_cluster = EcsClusterStateSensor(
    task_id="await_cluster",
    cluster_name=new_cluster_name,
)

AWS ECS Task Definition State Sensor

To poll the task definition state until it reaches a terminal state you can use EcsTaskDefinitionStateSensor.

Valid states are either EcsTaskDefinitionStates.ACTIVE or EcsTaskDefinitionStates.INACTIVE. Defaults to EcsTaskDefinitionStates.ACTIVE as the success state, but accepts a parameter to change that. Raises an AirflowException with the failure reason if the failed state is reached before the target state.

tests/system/amazon/aws/example_ecs.py

await_task_definition = EcsTaskDefinitionStateSensor(
    task_id="await_task_definition",
    task_definition=register_task.output,
)

AWS ECS Task State Sensor

To poll the task state until it reaches a terminal state you can use EcsTaskStateSensor.

Defaults to EcsTaskStates.RUNNING as the success state and no failure state, both can be overridden with provided values. Raises an AirflowException with the failure reason if a failed state is provided and that state is reached before the target state.

tests/system/amazon/aws/example_ecs_fargate.py

# By default, EcsTaskStateSensor waits until the task has started, but the
# demo task runs so fast that the sensor misses it.  This sensor instead
# demonstrates how to wait until the ECS Task has completed by providing
# the target_state and failure_states parameters.
await_task_finish = EcsTaskStateSensor(
    task_id="await_task_finish",
    cluster=cluster_name,
    task=hello_world.output["ecs_task_arn"],
    target_state=EcsTaskStates.STOPPED,
    failure_states={EcsTaskStates.NONE},
)

Was this entry helpful?