Source code for airflow.providers.cncf.kubernetes.backcompat.backwards_compat_converters

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Executes task in a Kubernetes POD"""

from typing import List

from kubernetes.client import ApiClient, models as k8s

from airflow.exceptions import AirflowException
from airflow.providers.cncf.kubernetes.backcompat.pod import Port, Resources
from airflow.providers.cncf.kubernetes.backcompat.pod_runtime_info_env import PodRuntimeInfoEnv
from airflow.providers.cncf.kubernetes.backcompat.volume import Volume
from airflow.providers.cncf.kubernetes.backcompat.volume_mount import VolumeMount


[docs]def _convert_kube_model_object(obj, old_class, new_class): convert_op = getattr(obj, "to_k8s_client_obj", None) if callable(convert_op): return obj.to_k8s_client_obj() elif isinstance(obj, new_class): return obj else: raise AirflowException(f"Expected {old_class} or {new_class}, got {type(obj)}")
[docs]def _convert_from_dict(obj, new_class): if isinstance(obj, new_class): return obj elif isinstance(obj, dict): api_client = ApiClient() return api_client._ApiClient__deserialize_model(obj, new_class) # pylint: disable=W0212 else: raise AirflowException(f"Expected dict or {new_class}, got {type(obj)}")
[docs]def convert_volume(volume) -> k8s.V1Volume: """ Converts an airflow Volume object into a k8s.V1Volume :param volume: :return: k8s.V1Volume """ return _convert_kube_model_object(volume, Volume, k8s.V1Volume)
[docs]def convert_volume_mount(volume_mount) -> k8s.V1VolumeMount: """ Converts an airflow VolumeMount object into a k8s.V1VolumeMount :param volume_mount: :return: k8s.V1VolumeMount """ return _convert_kube_model_object(volume_mount, VolumeMount, k8s.V1VolumeMount)
[docs]def convert_resources(resources) -> k8s.V1ResourceRequirements: """ Converts an airflow Resources object into a k8s.V1ResourceRequirements :param resources: :return: k8s.V1ResourceRequirements """ if isinstance(resources, dict): resources = Resources(**resources) return _convert_kube_model_object(resources, Resources, k8s.V1ResourceRequirements)
[docs]def convert_port(port) -> k8s.V1ContainerPort: """ Converts an airflow Port object into a k8s.V1ContainerPort :param port: :return: k8s.V1ContainerPort """ return _convert_kube_model_object(port, Port, k8s.V1ContainerPort)
[docs]def convert_env_vars(env_vars) -> List[k8s.V1EnvVar]: """ Converts a dictionary into a list of env_vars :param env_vars: :return: """ if isinstance(env_vars, dict): res = [] for k, v in env_vars.items(): res.append(k8s.V1EnvVar(name=k, value=v)) return res elif isinstance(env_vars, list): return env_vars else: raise AirflowException(f"Expected dict or list, got {type(env_vars)}")
[docs]def convert_pod_runtime_info_env(pod_runtime_info_envs) -> k8s.V1EnvVar: """ Converts a PodRuntimeInfoEnv into an k8s.V1EnvVar :param pod_runtime_info_envs: :return: """ return _convert_kube_model_object(pod_runtime_info_envs, PodRuntimeInfoEnv, k8s.V1EnvVar)
[docs]def convert_image_pull_secrets(image_pull_secrets) -> List[k8s.V1LocalObjectReference]: """ Converts a PodRuntimeInfoEnv into an k8s.V1EnvVar :param image_pull_secrets: :return: """ if isinstance(image_pull_secrets, str): secrets = image_pull_secrets.split(",") return [k8s.V1LocalObjectReference(name=secret) for secret in secrets] else: return image_pull_secrets
[docs]def convert_configmap(configmaps) -> k8s.V1EnvFromSource: """ Converts a str into an k8s.V1EnvFromSource :param configmaps: :return: """ return k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name=configmaps))
[docs]def convert_affinity(affinity) -> k8s.V1Affinity: """Converts a dict into an k8s.V1Affinity""" return _convert_from_dict(affinity, k8s.V1Affinity)
[docs]def convert_toleration(toleration) -> k8s.V1Toleration: """Converts a dict into an k8s.V1Toleration""" return _convert_from_dict(toleration, k8s.V1Toleration)

Was this entry helpful?