# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import logging
import secrets
import string
from typing import TYPE_CHECKING
import pendulum
from deprecated import deprecated
from kubernetes.client.rest import ApiException
from slugify import slugify
from airflow.compat.functools import cache
from airflow.configuration import conf
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.models.taskinstancekey import TaskInstanceKey
[docs]log = logging.getLogger(__name__)
[docs]alphanum_lower = string.ascii_lowercase + string.digits
[docs]POD_NAME_MAX_LENGTH = 63 # Matches Linux kernel's HOST_NAME_MAX default value minus 1.
def rand_str(num):
"""Generate random lowercase alphanumeric string of length num.
:meta private:
return "".join(secrets.choice(alphanum_lower) for _ in range(num))
def add_unique_suffix(*, name: str, rand_len: int = 8, max_len: int = POD_NAME_MAX_LENGTH) -> str:
"""Add random string to pod or job name while staying under max length.
:param name: name of the pod or job
:param rand_len: length of the random string to append
:param max_len: maximum length of the pod name
:meta private:
suffix = "-" + rand_str(rand_len)
return name[: max_len - len(suffix)].strip("-.") + suffix
reason="This function is deprecated. Please use `add_unique_suffix`",
def add_pod_suffix(*, pod_name: str, rand_len: int = 8, max_len: int = POD_NAME_MAX_LENGTH) -> str:
"""Add random string to pod name while staying under max length.
:param pod_name: name of the pod
:param rand_len: length of the random string to append
:param max_len: maximum length of the pod name
:meta private:
return add_unique_suffix(name=pod_name, rand_len=rand_len, max_len=max_len)
[docs]def create_unique_id(
dag_id: str | None = None,
task_id: str | None = None,
max_length: int = POD_NAME_MAX_LENGTH,
unique: bool = True,
) -> str:
Generate unique pod or job ID given a dag_id and / or task_id.
:param dag_id: DAG ID
:param task_id: Task ID
:param max_length: max number of characters
:param unique: whether a random string suffix should be added
:return: A valid identifier for a kubernetes pod name
if not (dag_id or task_id):
raise ValueError("Must supply either dag_id or task_id.")
name = ""
if dag_id:
name += dag_id
if task_id:
if name:
name += "-"
name += task_id
base_name = slugify(name, lowercase=True)[:max_length].strip(".-")
if unique:
return add_unique_suffix(name=base_name, rand_len=8, max_len=max_length)
return base_name
reason="This function is deprecated. Please use `create_unique_id`.",
[docs]def create_pod_id(
dag_id: str | None = None,
task_id: str | None = None,
max_length: int = POD_NAME_MAX_LENGTH,
unique: bool = True,
) -> str:
Generate unique pod ID given a dag_id and / or task_id.
:param dag_id: DAG ID
:param task_id: Task ID
:param max_length: max number of characters
:param unique: whether a random string suffix should be added
:return: A valid identifier for a kubernetes pod name
return create_unique_id(dag_id=dag_id, task_id=task_id, max_length=max_length, unique=unique)
[docs]def annotations_to_key(annotations: dict[str, str]) -> TaskInstanceKey:
"""Build a TaskInstanceKey based on pod annotations."""
log.debug("Creating task key for annotations %s", annotations)
dag_id = annotations["dag_id"]
task_id = annotations["task_id"]
try_number = int(annotations["try_number"])
annotation_run_id = annotations.get("run_id")
map_index = int(annotations.get("map_index", -1))
# Compat: Look up the run_id from the TI table!
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
from airflow.settings import Session
if not annotation_run_id and "execution_date" in annotations:
execution_date = pendulum.parse(annotations["execution_date"])
# Do _not_ use create-session, we don't want to expunge
session = Session()
task_instance_run_id = (
TaskInstance.dag_id == dag_id,
TaskInstance.task_id == task_id,
DagRun.execution_date == execution_date,
task_instance_run_id = annotation_run_id
return TaskInstanceKey(
[docs]def should_retry_creation(exception: BaseException) -> bool:
Check if an Exception indicates a transient error and warrants retrying.
This function is needed for preventing 'No agent available' error. The error appears time to time
when users try to create a Resource or Job. This issue is inside kubernetes and in the current moment
has no solution. Like a temporary solution we decided to retry Job or Resource creation request each
time when this error appears.
More about this issue here: https://github.com/cert-manager/cert-manager/issues/6457
if isinstance(exception, ApiException):
return str(exception.status) == "500"
return False