Source code for airflow.providers.cncf.kubernetes.hooks.kubernetes
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportcontextlibimporttempfileimportwarningsfromtypingimportTYPE_CHECKING,Any,Generatorfromasgiref.syncimportsync_to_asyncfromkubernetesimportclient,config,watchfromkubernetes.client.modelsimportV1Podfromkubernetes.configimportConfigExceptionfromkubernetes_asyncioimportclientasasync_client,configasasync_configfromurllib3.exceptionsimportHTTPErrorfromairflow.compat.functoolsimportcached_propertyfromairflow.exceptionsimportAirflowExceptionfromairflow.hooks.baseimportBaseHookfromairflow.kubernetes.kube_clientimport_disable_verify_ssl,_enable_tcp_keepalivefromairflow.utilsimportyaml
[docs]LOADING_KUBE_CONFIG_FILE_RESOURCE="Loading Kubernetes configuration file kube_config from {}..."
def_load_body_to_dict(body):try:body_dict=yaml.safe_load(body)exceptyaml.YAMLErrorase:raiseAirflowException(f"Exception when loading resource definition: {e}\n")returnbody_dict
[docs]classKubernetesHook(BaseHook):""" Creates Kubernetes API connection. - use in cluster configuration by using extra field ``in_cluster`` in connection - use custom config by providing path to the file using extra field ``kube_config_path`` in connection - use custom configuration by providing content of kubeconfig file via extra field ``kube_config`` in connection - use default config by providing no extras This hook check for configuration option in the above order. Once an option is present it will use this configuration. .. seealso:: For more information about Kubernetes connection: :doc:`/connections/kubernetes` :param conn_id: The :ref:`kubernetes connection <howto/connection:kubernetes>` to Kubernetes cluster. :param client_configuration: Optional dictionary of client configuration params. Passed on to kubernetes client. :param cluster_context: Optionally specify a context to use (e.g. if you have multiple in your kubeconfig. :param config_file: Path to kubeconfig file. :param in_cluster: Set to ``True`` if running from within a kubernetes cluster. :param disable_verify_ssl: Set to ``True`` if SSL verification should be disabled. :param disable_tcp_keepalive: Set to ``True`` if you want to disable keepalive logic. """
[docs]defget_ui_field_behaviour()->dict[str,Any]:"""Returns custom field behaviour"""return{"hidden_fields":["host","schema","login","password","port","extra"],"relabeling":{},
def_get_field(self,field_name):""" Prior to Airflow 2.3, in order to make use of UI customizations for extra fields, we needed to store them with the prefix ``extra__kubernetes__``. This method handles the backcompat, i.e. if the extra dict contains prefixed fields. """iffield_name.startswith("extra__"):raiseValueError(f"Got prefixed name {field_name}; please remove the 'extra__kubernetes__' prefix "f"when using this method.")iffield_nameinself.conn_extras:returnself.conn_extras[field_name]orNoneprefixed_name=f"extra__kubernetes__{field_name}"returnself.conn_extras.get(prefixed_name)orNone
[docs]defget_conn(self)->client.ApiClient:"""Returns kubernetes api session for use with requests"""in_cluster=self._coalesce_param(self.in_cluster,self._get_field("in_cluster"))cluster_context=self._coalesce_param(self.cluster_context,self._get_field("cluster_context"))kubeconfig_path=self._coalesce_param(self.config_file,self._get_field("kube_config_path"))kubeconfig=self._get_field("kube_config")num_selected_configuration=len([oforoin[in_cluster,kubeconfig,kubeconfig_path]ifo])ifnum_selected_configuration>1:raiseAirflowException("Invalid connection configuration. Options kube_config_path, ""kube_config, in_cluster are mutually exclusive. ""You can only use one option at a time.")disable_verify_ssl=self._coalesce_param(self.disable_verify_ssl,_get_bool(self._get_field("disable_verify_ssl")))disable_tcp_keepalive=self._coalesce_param(self.disable_tcp_keepalive,_get_bool(self._get_field("disable_tcp_keepalive")))ifdisable_verify_sslisTrue:_disable_verify_ssl()ifdisable_tcp_keepaliveisnotTrue:_enable_tcp_keepalive()ifin_cluster:self.log.debug("loading kube_config from: in_cluster configuration")self._is_in_cluster=Trueconfig.load_incluster_config()returnclient.ApiClient()ifkubeconfig_pathisnotNone:self.log.debug("loading kube_config from: %s",kubeconfig_path)self._is_in_cluster=Falseconfig.load_kube_config(config_file=kubeconfig_path,client_configuration=self.client_configuration,context=cluster_context,)returnclient.ApiClient()ifkubeconfigisnotNone:withtempfile.NamedTemporaryFile()astemp_config:self.log.debug("loading kube_config from: connection kube_config")temp_config.write(kubeconfig.encode())temp_config.flush()self._is_in_cluster=Falseconfig.load_kube_config(config_file=temp_config.name,client_configuration=self.client_configuration,context=cluster_context,)returnclient.ApiClient()returnself._get_default_client(cluster_context=cluster_context)
def_get_default_client(self,*,cluster_context:str|None=None)->client.ApiClient:# if we get here, then no configuration has been supplied# we should try in_cluster since that's most likely# but failing that just load assuming a kubeconfig file# in the default locationtry:config.load_incluster_config(client_configuration=self.client_configuration)self._is_in_cluster=TrueexceptConfigException:self.log.debug("loading kube_config from: default file")self._is_in_cluster=Falseconfig.load_kube_config(client_configuration=self.client_configuration,context=cluster_context,)returnclient.ApiClient()@property
[docs]defis_in_cluster(self)->bool:"""Expose whether the hook is configured with ``load_incluster_config`` or not"""ifself._is_in_clusterisnotNone:returnself._is_in_clusterself.api_client# so we can determine if we are in_cluster or notifTYPE_CHECKING:assertself._is_in_clusterisnotNonereturnself._is_in_cluster
@cached_property
[docs]defapi_client(self)->client.ApiClient:"""Cached Kubernetes API client"""returnself.get_conn()
[docs]defcreate_custom_object(self,group:str,version:str,plural:str,body:str|dict,namespace:str|None=None):""" Creates custom resource definition object in Kubernetes :param group: api group :param version: api version :param plural: api plural :param body: crd object definition :param namespace: kubernetes namespace """api:client.CustomObjectsApi=self.custom_object_clientnamespace=namespaceorself._get_namespace()orself.DEFAULT_NAMESPACEifisinstance(body,str):body_dict=_load_body_to_dict(body)else:body_dict=body# Attribute "name" is not mandatory if "generateName" is used insteadif"name"inbody_dict["metadata"]:try:api.delete_namespaced_custom_object(group=group,version=version,namespace=namespace,plural=plural,name=body_dict["metadata"]["name"],)self.log.warning("Deleted SparkApplication with the same name")exceptclient.rest.ApiException:self.log.info("SparkApplication %s not found",body_dict["metadata"]["name"])try:response=api.create_namespaced_custom_object(group=group,version=version,namespace=namespace,plural=plural,body=body_dict)self.log.debug("Response: %s",response)returnresponseexceptclient.rest.ApiExceptionase:raiseAirflowException(f"Exception when calling -> create_custom_object: {e}\n")
[docs]defget_custom_object(self,group:str,version:str,plural:str,name:str,namespace:str|None=None):""" Get custom resource definition object from Kubernetes :param group: api group :param version: api version :param plural: api plural :param name: crd object name :param namespace: kubernetes namespace """api=client.CustomObjectsApi(self.api_client)namespace=namespaceorself._get_namespace()orself.DEFAULT_NAMESPACEtry:response=api.get_namespaced_custom_object(group=group,version=version,namespace=namespace,plural=plural,name=name)returnresponseexceptclient.rest.ApiExceptionase:raiseAirflowException(f"Exception when calling -> get_custom_object: {e}\n")
[docs]defget_namespace(self)->str|None:""" Returns the namespace defined in the connection or 'default'. TODO: in provider version 6.0, return None when namespace not defined in connection """namespace=self._get_namespace()ifself.conn_idandnotnamespace:warnings.warn("Airflow connection defined but namespace is not set; returning 'default'. In ""cncf.kubernetes provider version 6.0 we will return None when namespace is ""not defined in the connection so that it's clear whether user intends 'default' or ""whether namespace is unset (which is required in order to apply precedence logic in ""KubernetesPodOperator).",DeprecationWarning,)return"default"returnnamespace
def_get_namespace(self)->str|None:""" Returns the namespace that defined in the connection TODO: in provider version 6.0, get rid of this method and make it the behavior of get_namespace. """ifself.conn_id:returnself._get_field("namespace")returnNone
[docs]defget_xcom_sidecar_container_image(self):"""Returns the xcom sidecar image that defined in the connection"""returnself._get_field("xcom_sidecar_container_image")
[docs]defget_pod_log_stream(self,pod_name:str,container:str|None="",namespace:str|None=None,)->tuple[watch.Watch,Generator[str,None,None]]:""" Retrieves a log stream for a container in a kubernetes pod. :param pod_name: pod name :param container: container name :param namespace: kubernetes namespace """watcher=watch.Watch()return(watcher,watcher.stream(self.core_v1_client.read_namespaced_pod_log,name=pod_name,container=container,namespace=namespaceorself._get_namespace()orself.DEFAULT_NAMESPACE,
),)
[docs]defget_pod_logs(self,pod_name:str,container:str|None="",namespace:str|None=None,):""" Retrieves a container's log from the specified pod. :param pod_name: pod name :param container: container name :param namespace: kubernetes namespace """returnself.core_v1_client.read_namespaced_pod_log(name=pod_name,container=container,_preload_content=False,namespace=namespaceorself._get_namespace()orself.DEFAULT_NAMESPACE,
[docs]defget_namespaced_pod_list(self,label_selector:str|None="",namespace:str|None=None,watch:bool=False,**kwargs,):""" Retrieves a list of Kind pod which belong default kubernetes namespace :param label_selector: A selector to restrict the list of returned objects by their labels :param namespace: kubernetes namespace :param watch: Watch for changes to the described resources and return them as a stream """returnself.core_v1_client.list_namespaced_pod(namespace=namespaceorself._get_namespace()orself.DEFAULT_NAMESPACE,watch=watch,label_selector=label_selector,_preload_content=False,**kwargs,
)def_get_bool(val)->bool|None:""" Converts val to bool if can be done with certainty. If we cannot infer intention we return None. """ifisinstance(val,bool):returnvalelifisinstance(val,str):ifval.strip().lower()=="true":returnTrueelifval.strip().lower()=="false":returnFalsereturnNone
[docs]classAsyncKubernetesHook(KubernetesHook):"""Hook to use Kubernetes SDK asynchronously."""def__init__(self,config_dict:dict|None=None,*args,**kwargs):super().__init__(*args,**kwargs)self.config_dict=config_dictself._extras:dict|None=Noneasyncdef_load_config(self):"""Returns Kubernetes API session for use with requests"""in_cluster=self._coalesce_param(self.in_cluster,awaitself._get_field("in_cluster"))cluster_context=self._coalesce_param(self.cluster_context,awaitself._get_field("cluster_context"))kubeconfig=awaitself._get_field("kube_config")num_selected_configuration=len([oforoin[in_cluster,kubeconfig,self.config_dict]ifo])ifnum_selected_configuration>1:raiseAirflowException("Invalid connection configuration. Options kube_config_path, ""kube_config, in_cluster are mutually exclusive. ""You can only use one option at a time.")ifin_cluster:self.log.debug(LOADING_KUBE_CONFIG_FILE_RESOURCE.format("within a pod"))self._is_in_cluster=Trueasync_config.load_incluster_config()returnasync_client.ApiClient()ifself.config_dict:self.log.debug(LOADING_KUBE_CONFIG_FILE_RESOURCE.format("config dictionary"))awaitasync_config.load_kube_config_from_dict(self.config_dict)returnasync_client.ApiClient()ifkubeconfigisnotNone:withtempfile.NamedTemporaryFile()astemp_config:self.log.debug("Reading kubernetes configuration file from connection ""object and writing temporary config file with its content",)temp_config.write(kubeconfig.encode())temp_config.flush()self._is_in_cluster=Falseawaitasync_config.load_kube_config(config_file=temp_config.name,client_configuration=self.client_configuration,context=cluster_context,)returnasync_client.ApiClient()self.log.debug(LOADING_KUBE_CONFIG_FILE_RESOURCE.format("default configuration file"))awaitasync_config.load_kube_config(client_configuration=self.client_configuration,context=cluster_context,)
asyncdef_get_field(self,field_name):iffield_name.startswith("extra__"):raiseValueError(f"Got prefixed name {field_name}; please remove the 'extra__kubernetes__' prefix ""when using this method.")extras=awaitself.get_conn_extras()iffield_nameinextras:returnextras.get(field_name)prefixed_name=f"extra__kubernetes__{field_name}"returnextras.get(prefixed_name)@contextlib.asynccontextmanager
[docs]asyncdefget_pod(self,name:str,namespace:str)->V1Pod:""" Gets pod's object. :param name: Name of the pod. :param namespace: Name of the pod's namespace. """asyncwithself.get_conn()asconnection:v1_api=async_client.CoreV1Api(connection)pod:V1Pod=awaitv1_api.read_namespaced_pod(name=name,namespace=namespace,)returnpod
[docs]asyncdefdelete_pod(self,name:str,namespace:str):""" Deletes pod's object. :param name: Name of the pod. :param namespace: Name of the pod's namespace. """asyncwithself.get_conn()asconnection:try:v1_api=async_client.CoreV1Api(connection)awaitv1_api.delete_namespaced_pod(name=name,namespace=namespace,body=client.V1DeleteOptions())exceptasync_client.ApiExceptionase:# If the pod is already deletedife.status!=404:raise
[docs]asyncdefread_logs(self,name:str,namespace:str):""" Reads logs inside the pod while starting containers inside. All the logs will be outputted with its timestamp to track the logs after the execution of the pod is completed. The method is used for async output of the logs only in the pod failed it execution or the task was cancelled by the user. :param name: Name of the pod. :param namespace: Name of the pod's namespace. """asyncwithself.get_conn()asconnection:try:v1_api=async_client.CoreV1Api(connection)logs=awaitv1_api.read_namespaced_pod_log(name=name,namespace=namespace,follow=False,timestamps=True,)logs=logs.splitlines()forlineinlogs:self.log.info("Container logs from %s",line)returnlogsexceptHTTPError:self.log.exception("There was an error reading the kubernetes API.")raise