airflow.providers.amazon.aws.executors.batch.batch_executor

AWS Batch Executor. Each Airflow task gets delegated out to an AWS Batch Job.

Module Contents

Classes

AwsBatchExecutor

The Airflow Scheduler creates a shell command, and passes it to the executor.

Attributes

CommandType

ExecutorConfigType

INVALID_CREDENTIALS_EXCEPTIONS

airflow.providers.amazon.aws.executors.batch.batch_executor.CommandType[source]
airflow.providers.amazon.aws.executors.batch.batch_executor.ExecutorConfigType[source]
airflow.providers.amazon.aws.executors.batch.batch_executor.INVALID_CREDENTIALS_EXCEPTIONS = ['ExpiredTokenException', 'InvalidClientTokenId', 'UnrecognizedClientException'][source]
class airflow.providers.amazon.aws.executors.batch.batch_executor.AwsBatchExecutor(*args, **kwargs)[source]

Bases: airflow.executors.base_executor.BaseExecutor

The Airflow Scheduler creates a shell command, and passes it to the executor.

This Batch Executor simply runs said airflow command in a resource provisioned and managed by AWS Batch. It then periodically checks in with the launched jobs (via job-ids) to determine the status. The submit_job_kwargs is a dictionary that should match the kwargs for the SubmitJob definition per AWS’ documentation (see below). For maximum flexibility, individual tasks can specify executor_config as a dictionary, with keys that match the request syntax for the SubmitJob definition per AWS’ documentation (see link below). The executor_config will update the submit_job_kwargs dictionary when calling the task. This allows individual jobs to specify CPU, memory, GPU, env variables, etc. Prerequisite: proper configuration of Boto3 library .. seealso:: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html for authentication and access-key management. You can store an environmental variable, setup aws config from console, or use IAM roles. .. seealso:: https://docs.aws.amazon.com/batch/latest/APIReference/API_SubmitJob.html for an Airflow TaskInstance’s executor_config.

MAX_SUBMIT_JOB_ATTEMPTS[source]
DESCRIBE_JOBS_BATCH_SIZE = 99[source]
check_health()[source]

Make a test API call to check the health of the Batch Executor.

start()[source]

Call this when the Executor is run for the first time by the scheduler.

load_batch_connection(check_connection=True)[source]
sync()[source]

Sync will get called periodically by the heartbeat method in the scheduler.

sync_running_jobs()[source]
attempt_submit_jobs()[source]

Attempt to submit all jobs submitted to the Executor.

For each iteration of the sync() method, every pending job is submitted to Batch. If a job fails validation, it will be put at the back of the queue to be reattempted in the next iteration of the sync() method, unless it has exceeded the maximum number of attempts. If a job exceeds the maximum number of attempts, it is removed from the queue.

execute_async(key, command, queue=None, executor_config=None)[source]

Save the task to be executed in the next sync using Boto3’s RunTask API.

end(heartbeat_interval=10)[source]

Wait for all currently running tasks to end and prevent any new jobs from running.

terminate()[source]

Kill all Batch Jobs by calling Boto3’s TerminateJob API.

Was this entry helpful?