Source code for airflow.providers.cncf.kubernetes.kubernetes_helper_functions
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportloggingimportsecretsimportstringfromtypingimportTYPE_CHECKINGimportpendulumfromdeprecatedimportdeprecatedfromkubernetes.client.restimportApiExceptionfromslugifyimportslugifyfromairflow.compat.functoolsimportcachefromairflow.configurationimportconffromairflow.exceptionsimportAirflowProviderDeprecationWarningifTYPE_CHECKING:fromairflow.models.taskinstancekeyimportTaskInstanceKey
[docs]POD_NAME_MAX_LENGTH=63# Matches Linux kernel's HOST_NAME_MAX default value minus 1.
defrand_str(num):""" Generate random lowercase alphanumeric string of length num. :meta private: """return"".join(secrets.choice(alphanum_lower)for_inrange(num))defadd_unique_suffix(*,name:str,rand_len:int=8,max_len:int=POD_NAME_MAX_LENGTH)->str:""" Add random string to pod or job name while staying under max length. :param name: name of the pod or job :param rand_len: length of the random string to append :param max_len: maximum length of the pod name :meta private: """suffix="-"+rand_str(rand_len)returnname[:max_len-len(suffix)].strip("-.")+suffix@deprecated(reason="This function is deprecated. Please use `add_unique_suffix`",category=AirflowProviderDeprecationWarning,)defadd_pod_suffix(*,pod_name:str,rand_len:int=8,max_len:int=POD_NAME_MAX_LENGTH)->str:""" Add random string to pod name while staying under max length. :param pod_name: name of the pod :param rand_len: length of the random string to append :param max_len: maximum length of the pod name :meta private: """returnadd_unique_suffix(name=pod_name,rand_len=rand_len,max_len=max_len)
[docs]defcreate_unique_id(dag_id:str|None=None,task_id:str|None=None,*,max_length:int=POD_NAME_MAX_LENGTH,unique:bool=True,)->str:""" Generate unique pod or job ID given a dag_id and / or task_id. :param dag_id: DAG ID :param task_id: Task ID :param max_length: max number of characters :param unique: whether a random string suffix should be added :return: A valid identifier for a kubernetes pod name """ifnot(dag_idortask_id):raiseValueError("Must supply either dag_id or task_id.")name=""ifdag_id:name+=dag_idiftask_id:ifname:name+="-"name+=task_idbase_name=slugify(name,lowercase=True)[:max_length].strip(".-")ifunique:returnadd_unique_suffix(name=base_name,rand_len=8,max_len=max_length)else:returnbase_name
@deprecated(reason="This function is deprecated. Please use `create_unique_id`.",category=AirflowProviderDeprecationWarning,)
[docs]defcreate_pod_id(dag_id:str|None=None,task_id:str|None=None,*,max_length:int=POD_NAME_MAX_LENGTH,unique:bool=True,)->str:""" Generate unique pod ID given a dag_id and / or task_id. :param dag_id: DAG ID :param task_id: Task ID :param max_length: max number of characters :param unique: whether a random string suffix should be added :return: A valid identifier for a kubernetes pod name """returncreate_unique_id(dag_id=dag_id,task_id=task_id,max_length=max_length,unique=unique)
[docs]defannotations_to_key(annotations:dict[str,str])->TaskInstanceKey:"""Build a TaskInstanceKey based on pod annotations."""log.debug("Creating task key for annotations %s",annotations)dag_id=annotations["dag_id"]task_id=annotations["task_id"]try_number=int(annotations["try_number"])annotation_run_id=annotations.get("run_id")map_index=int(annotations.get("map_index",-1))# Compat: Look up the run_id from the TI table!fromairflow.models.dagrunimportDagRunfromairflow.models.taskinstanceimportTaskInstance,TaskInstanceKeyfromairflow.settingsimportSessionifnotannotation_run_idand"execution_date"inannotations:execution_date=pendulum.parse(annotations["execution_date"])# Do _not_ use create-session, we don't want to expungesession=Session()task_instance_run_id=(session.query(TaskInstance.run_id).join(TaskInstance.dag_run).filter(TaskInstance.dag_id==dag_id,TaskInstance.task_id==task_id,DagRun.execution_date==execution_date,).scalar())else:task_instance_run_id=annotation_run_idreturnTaskInstanceKey(dag_id=dag_id,task_id=task_id,run_id=task_instance_run_id,try_number=try_number,map_index=map_index,)
[docs]defshould_retry_creation(exception:BaseException)->bool:""" Check if an Exception indicates a transient error and warrants retrying. This function is needed for preventing 'No agent available' error. The error appears time to time when users try to create a Resource or Job. This issue is inside kubernetes and in the current moment has no solution. Like a temporary solution we decided to retry Job or Resource creation request each time when this error appears. More about this issue here: https://github.com/cert-manager/cert-manager/issues/6457 """ifisinstance(exception,ApiException):returnstr(exception.status)=="500"returnFalse