airflow.providers.amazon.aws.executors.ecs.ecs_executor

AWS ECS Executor.

Each Airflow task gets delegated out to an Amazon ECS Task.

Module Contents

Classes

AwsEcsExecutor

Executes the provided Airflow command on an ECS instance.

Attributes

INVALID_CREDENTIALS_EXCEPTIONS

airflow.providers.amazon.aws.executors.ecs.ecs_executor.INVALID_CREDENTIALS_EXCEPTIONS = ['ExpiredTokenException', 'InvalidClientTokenId', 'UnrecognizedClientException'][source]
class airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor(*args, **kwargs)[source]

Bases: airflow.executors.base_executor.BaseExecutor

Executes the provided Airflow command on an ECS instance.

The Airflow Scheduler creates a shell command, and passes it to the executor. This ECS Executor runs said Airflow command on a remote Amazon ECS Cluster with a task-definition configured to launch the same containers as the Scheduler. It then periodically checks in with the launched tasks (via task ARNs) to determine the status.

This allows individual tasks to specify CPU, memory, GPU, env variables, etc. When initializing a task, there’s an option for “executor config” which should be a dictionary with keys that match the ContainerOverride definition per AWS documentation (see link below).

Prerequisite: proper configuration of Boto3 library .. seealso:: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html for authentication and access-key management. You can store an environmental variable, setup aws config from console, or use IAM roles.

See also

https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_ContainerOverride.html for an Airflow TaskInstance’s executor_config.

MAX_RUN_TASK_ATTEMPTS[source]
DESCRIBE_TASKS_BATCH_SIZE = 99[source]
start()[source]

Call this when the Executor is run for the first time by the scheduler.

check_health()[source]

Make a test API call to check the health of the ECS Executor.

Deliberately use an invalid task ID, some potential outcomes in order:
  1. AccessDeniedException is raised if there are insufficient permissions.

  2. ClusterNotFoundException is raised if permissions exist but the cluster does not.

  3. The API responds with a failure message if the cluster is found and there are permissions, but the cluster itself has issues.

  4. InvalidParameterException is raised if the permissions and cluster exist but the task does not.

The last one is considered a success state for the purposes of this check.

load_ecs_connection(check_connection=True)[source]
sync()[source]

Sync will get called periodically by the heartbeat method.

Executors should override this to perform gather statuses.

sync_running_tasks()[source]

Check and update state on all running tasks.

attempt_task_runs()[source]

Take tasks from the pending_tasks queue, and attempts to find an instance to run it on.

If the launch type is EC2, this will attempt to place tasks on empty EC2 instances. If

there are no EC2 instances available, no task is placed and this function will be called again in the next heart-beat.

If the launch type is FARGATE, this will run the tasks on new AWS Fargate instances.

execute_async(key, command, queue=None, executor_config=None)[source]

Save the task to be executed in the next sync by inserting the commands into a queue.

end(heartbeat_interval=10)[source]

Wait for all currently running tasks to end, and don’t launch any tasks.

terminate()[source]

Kill all ECS processes by calling Boto3’s StopTask API.

get_container(container_list)[source]

Search task list for core Airflow container.

try_adopt_task_instances(tis)[source]

Adopt task instances which have an external_executor_id (the ECS task ARN).

Anything that is not adopted will be cleared by the scheduler and becomes eligible for re-scheduling.

Was this entry helpful?