Source code for airflow.providers.cncf.kubernetes.utils.delete_from

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# from https://github.com/tomplus/kubernetes_asyncio/pull/239/files

from __future__ import annotations

import re

from kubernetes import client
from kubernetes.client import ApiClient

[docs]DEFAULT_DELETION_BODY = client.V1DeleteOptions( propagation_policy="Background", grace_period_seconds=5, )
[docs]def delete_from_dict(k8s_client, data, body, namespace, verbose=False, **kwargs): api_exceptions = [] if "List" in data["kind"]: kind = data["kind"].replace("List", "") for yml_doc in data["items"]: if kind != "": yml_doc["apiVersion"] = data["apiVersion"] yml_doc["kind"] = kind try: _delete_from_yaml_single_item( k8s_client=k8s_client, yml_document=yml_doc, verbose=verbose, namespace=namespace, body=body, **kwargs, ) except client.rest.ApiException as api_exception: api_exceptions.append(api_exception) else: try: _delete_from_yaml_single_item( k8s_client=k8s_client, yml_document=data, verbose=verbose, namespace=namespace, body=body, **kwargs, ) except client.rest.ApiException as api_exception: api_exceptions.append(api_exception) if api_exceptions: raise FailToDeleteError(api_exceptions)
[docs]def delete_from_yaml( *, k8s_client: ApiClient, yaml_objects=None, verbose: bool = False, namespace: str = "default", body: dict | None = None, **kwargs, ): for yml_document in yaml_objects: if yml_document is None: continue else: delete_from_dict( k8s_client=k8s_client, data=yml_document, body=body, namespace=namespace, verbose=verbose, **kwargs, )
def _delete_from_yaml_single_item( *, k8s_client: ApiClient, yml_document: dict, verbose: bool = False, namespace: str = "default", body: dict | None = None, **kwargs, ): if body is None: body = DEFAULT_DELETION_BODY # get group and version from apiVersion group, _, version = yml_document["apiVersion"].partition("/") if version == "": version = group group = "core" # Take care for the case e.g. api_type is "apiextensions.k8s.io" # Only replace the last instance group = "".join(group.rsplit(".k8s.io", 1)) # convert group name from DNS subdomain format to # python class name convention group = "".join(word.capitalize() for word in group.split(".")) fcn_to_call = f"{group}{version.capitalize()}Api" k8s_api = getattr(client, fcn_to_call)(k8s_client) # Replace CamelCased action_type into snake_case kind = yml_document["kind"] kind = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", kind) kind = re.sub("([a-z0-9])([A-Z])", r"\1_\2", kind).lower() # Decide which namespace we are going to use for deleting the object # IMPORTANT: the docs namespace takes precedence over the namespace in args # create_from_yaml_single_item have same behaviour if "namespace" in yml_document["metadata"]: namespace = yml_document["metadata"]["namespace"] name = yml_document["metadata"]["name"] # Expect the user to delete namespaced objects more often resp: client.V1Status if hasattr(k8s_api, f"delete_namespaced_{kind}"): resp = getattr(k8s_api, f"delete_namespaced_{kind}")( name=name, namespace=namespace, body=body, **kwargs ) else: resp = getattr(k8s_api, f"delete_{kind}")(name=name, body=body, **kwargs) if verbose: print(f"{kind} deleted. status='{str(resp.status)}'") return resp
[docs]class FailToDeleteError(Exception): """For handling error if an error occurred when handling a yaml file during deletion of the resource.""" def __init__(self, api_exceptions: list): self.api_exceptions = api_exceptions
[docs] def __str__(self): msg = "" for api_exception in self.api_exceptions: msg += f"Error from server ({api_exception.reason}):{api_exception.body}\n" return msg

Was this entry helpful?