Source code for airflow.providers.amazon.aws.hooks.ecs
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from typing import TYPE_CHECKING
from airflow.providers.amazon.aws.exceptions import EcsOperatorError, EcsTaskFailToStart
from airflow.providers.amazon.aws.hooks.base_aws import AwsGenericHook
from airflow.providers.amazon.aws.utils import _StringCompareEnum
from airflow.typing_compat import Protocol, runtime_checkable
if TYPE_CHECKING:
from botocore.waiter import Waiter
[docs]def should_retry(exception: Exception):
"""Check if exception is related to ECS resource quota (CPU, MEM)."""
if isinstance(exception, EcsOperatorError):
return any(
quota_reason in failure["reason"]
for quota_reason in ["RESOURCE:MEMORY", "RESOURCE:CPU"]
for failure in exception.failures
)
return False
[docs]def should_retry_eni(exception: Exception):
"""Check if exception is related to ENI (Elastic Network Interfaces)."""
if isinstance(exception, EcsTaskFailToStart):
return any(
eni_reason in exception.message
for eni_reason in ["network interface provisioning", "ResourceInitializationError"]
)
return False
[docs]class EcsClusterStates(_StringCompareEnum):
"""Contains the possible State values of an ECS Cluster."""
[docs] PROVISIONING = "PROVISIONING"
[docs] DEPROVISIONING = "DEPROVISIONING"
[docs]class EcsTaskDefinitionStates(_StringCompareEnum):
"""Contains the possible State values of an ECS Task Definition."""
[docs] DELETE_IN_PROGRESS = "DELETE_IN_PROGRESS"
[docs]class EcsTaskStates(_StringCompareEnum):
"""Contains the possible State values of an ECS Task."""
[docs] PROVISIONING = "PROVISIONING"
[docs] ACTIVATING = "ACTIVATING"
[docs] DEACTIVATING = "DEACTIVATING"
[docs] DEPROVISIONING = "DEPROVISIONING"
[docs]class EcsHook(AwsGenericHook):
"""
Interact with Amazon Elastic Container Service (ECS).
Provide thin wrapper around :external+boto3:py:class:`boto3.client("ecs") <ECS.Client>`.
Additional arguments (such as ``aws_conn_id``) may be specified and
are passed down to the underlying AwsBaseHook.
.. seealso::
- :class:`airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
- `Amazon Elastic Container Service \
<https://docs.aws.amazon.com/AmazonECS/latest/APIReference/Welcome.html>`__
"""
def __init__(self, *args, **kwargs) -> None:
kwargs["client_type"] = "ecs"
super().__init__(*args, **kwargs)
[docs] def get_cluster_state(self, cluster_name: str) -> str:
"""
Get ECS Cluster state.
.. seealso::
- :external+boto3:py:meth:`ECS.Client.describe_clusters`
:param cluster_name: ECS Cluster name or full cluster Amazon Resource Name (ARN) entry.
"""
return self.conn.describe_clusters(clusters=[cluster_name])["clusters"][0]["status"]
[docs] def get_task_definition_state(self, task_definition: str) -> str:
"""
Get ECS Task Definition state.
.. seealso::
- :external+boto3:py:meth:`ECS.Client.describe_task_definition`
:param task_definition: The family for the latest ACTIVE revision,
family and revision ( family:revision ) for a specific revision in the family,
or full Amazon Resource Name (ARN) of the task definition to describe.
"""
return self.conn.describe_task_definition(taskDefinition=task_definition)["taskDefinition"]["status"]
[docs] def get_task_state(self, cluster, task) -> str:
"""
Get ECS Task state.
.. seealso::
- :external+boto3:py:meth:`ECS.Client.describe_tasks`
:param cluster: The short name or full Amazon Resource Name (ARN)
of the cluster that hosts the task or tasks to describe.
:param task: Task ID or full ARN entry.
"""
return self.conn.describe_tasks(cluster=cluster, tasks=[task])["tasks"][0]["lastStatus"]
@runtime_checkable
[docs]class EcsProtocol(Protocol):
"""
A structured Protocol for ``boto3.client('ecs')``.
This is used for type hints on :py:meth:`.EcsOperator.client`.
.. seealso::
- https://mypy.readthedocs.io/en/latest/protocols.html
- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html
"""
[docs] def run_task(self, **kwargs) -> dict:
"""
Run a task.
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.run_task
"""
...
[docs] def get_waiter(self, x: str) -> Waiter:
"""
Get a waiter.
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.get_waiter
"""
...
[docs] def describe_tasks(self, cluster: str, tasks) -> dict:
"""
Describe tasks.
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.describe_tasks
"""
...
[docs] def stop_task(self, cluster, task, reason: str) -> dict:
"""
Stop a task.
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.stop_task
"""
...
[docs] def describe_task_definition(self, taskDefinition: str) -> dict:
"""
Describe a task definition.
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.describe_task_definition
"""
...
[docs] def list_tasks(self, cluster: str, launchType: str, desiredStatus: str, family: str) -> dict:
"""
List tasks.
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.list_tasks
"""
...