Source code for airflow.providers.google.cloud.hooks.compute
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This module contains a Google Compute Engine Hook."""importtimefromtypingimportAny,Dict,Optional,Sequence,Unionfromgoogleapiclient.discoveryimportbuildfromairflow.exceptionsimportAirflowExceptionfromairflow.providers.google.common.hooks.base_googleimportPROVIDE_PROJECT_ID,GoogleBaseHook# Time to sleep between active checks of the operation results
[docs]classComputeEngineHook(GoogleBaseHook):""" Hook for Google Compute Engine APIs. All the methods in the hook where project_id is used must be called with keyword arguments rather than positional. """def__init__(self,api_version:str='v1',gcp_conn_id:str='google_cloud_default',delegate_to:Optional[str]=None,impersonation_chain:Optional[Union[str,Sequence[str]]]=None,)->None:super().__init__(gcp_conn_id=gcp_conn_id,delegate_to=delegate_to,impersonation_chain=impersonation_chain,)self.api_version=api_version_conn:Optional[Any]=None
[docs]defget_conn(self):""" Retrieves connection to Google Compute Engine. :return: Google Compute Engine services object :rtype: dict """ifnotself._conn:http_authorized=self._authorize()self._conn=build('compute',self.api_version,http=http_authorized,cache_discovery=False)returnself._conn
@GoogleBaseHook.fallback_to_default_project_id
[docs]defstart_instance(self,zone:str,resource_id:str,project_id:str)->None:""" Starts an existing instance defined by project_id, zone and resource_id. Must be called with keyword arguments rather than positional. :param zone: Google Cloud zone where the instance exists :param resource_id: Name of the Compute Engine instance resource :param project_id: Optional, Google Cloud project ID where the Compute Engine Instance exists. If set to None or missing, the default project_id from the Google Cloud connection is used. :return: None """response=(self.get_conn().instances().start(project=project_id,zone=zone,instance=resource_id).execute(num_retries=self.num_retries))try:operation_name=response["name"]exceptKeyError:raiseAirflowException(f"Wrong response '{response}' returned - it should contain 'name' field")self._wait_for_operation_to_complete(project_id=project_id,operation_name=operation_name,zone=zone)
@GoogleBaseHook.fallback_to_default_project_id
[docs]defstop_instance(self,zone:str,resource_id:str,project_id:str)->None:""" Stops an instance defined by project_id, zone and resource_id Must be called with keyword arguments rather than positional. :param zone: Google Cloud zone where the instance exists :param resource_id: Name of the Compute Engine instance resource :param project_id: Optional, Google Cloud project ID where the Compute Engine Instance exists. If set to None or missing, the default project_id from the Google Cloud connection is used. :return: None """response=(self.get_conn().instances().stop(project=project_id,zone=zone,instance=resource_id).execute(num_retries=self.num_retries))try:operation_name=response["name"]exceptKeyError:raiseAirflowException(f"Wrong response '{response}' returned - it should contain 'name' field")self._wait_for_operation_to_complete(project_id=project_id,operation_name=operation_name,zone=zone)
@GoogleBaseHook.fallback_to_default_project_id
[docs]defset_machine_type(self,zone:str,resource_id:str,body:dict,project_id:str)->None:""" Sets machine type of an instance defined by project_id, zone and resource_id. Must be called with keyword arguments rather than positional. :param zone: Google Cloud zone where the instance exists. :param resource_id: Name of the Compute Engine instance resource :param body: Body required by the Compute Engine setMachineType API, as described in https://cloud.google.com/compute/docs/reference/rest/v1/instances/setMachineType :param project_id: Optional, Google Cloud project ID where the Compute Engine Instance exists. If set to None or missing, the default project_id from the Google Cloud connection is used. :return: None """response=self._execute_set_machine_type(zone,resource_id,body,project_id)try:operation_name=response["name"]exceptKeyError:raiseAirflowException(f"Wrong response '{response}' returned - it should contain 'name' field")self._wait_for_operation_to_complete(project_id=project_id,operation_name=operation_name,zone=zone)
[docs]defget_instance_template(self,resource_id:str,project_id:str)->dict:""" Retrieves instance template by project_id and resource_id. Must be called with keyword arguments rather than positional. :param resource_id: Name of the instance template :param project_id: Optional, Google Cloud project ID where the Compute Engine Instance exists. If set to None or missing, the default project_id from the Google Cloud connection is used. :return: Instance template representation as object according to https://cloud.google.com/compute/docs/reference/rest/v1/instanceTemplates :rtype: dict """response=(self.get_conn().instanceTemplates().get(project=project_id,instanceTemplate=resource_id).execute(num_retries=self.num_retries))returnresponse
@GoogleBaseHook.fallback_to_default_project_id
[docs]definsert_instance_template(self,body:dict,project_id:str=PROVIDE_PROJECT_ID,request_id:Optional[str]=None,)->None:""" Inserts instance template using body specified Must be called with keyword arguments rather than positional. :param body: Instance template representation as object according to https://cloud.google.com/compute/docs/reference/rest/v1/instanceTemplates :param request_id: Optional, unique request_id that you might add to achieve full idempotence (for example when client call times out repeating the request with the same request id will not create a new instance template again) It should be in UUID format as defined in RFC 4122 :param project_id: Optional, Google Cloud project ID where the Compute Engine Instance exists. If set to None or missing, the default project_id from the Google Cloud connection is used. :return: None """response=(self.get_conn().instanceTemplates().insert(project=project_id,body=body,requestId=request_id).execute(num_retries=self.num_retries))try:operation_name=response["name"]exceptKeyError:raiseAirflowException(f"Wrong response '{response}' returned - it should contain 'name' field")self._wait_for_operation_to_complete(project_id=project_id,operation_name=operation_name)
@GoogleBaseHook.fallback_to_default_project_id
[docs]defget_instance_group_manager(self,zone:str,resource_id:str,project_id:str=PROVIDE_PROJECT_ID,)->dict:""" Retrieves Instance Group Manager by project_id, zone and resource_id. Must be called with keyword arguments rather than positional. :param zone: Google Cloud zone where the Instance Group Manager exists :param resource_id: Name of the Instance Group Manager :param project_id: Optional, Google Cloud project ID where the Compute Engine Instance exists. If set to None or missing, the default project_id from the Google Cloud connection is used. :return: Instance group manager representation as object according to https://cloud.google.com/compute/docs/reference/rest/beta/instanceGroupManagers :rtype: dict """response=(self.get_conn().instanceGroupManagers().get(project=project_id,zone=zone,instanceGroupManager=resource_id).execute(num_retries=self.num_retries))returnresponse
@GoogleBaseHook.fallback_to_default_project_id
[docs]defpatch_instance_group_manager(self,zone:str,resource_id:str,body:dict,project_id:str,request_id:Optional[str]=None,)->None:""" Patches Instance Group Manager with the specified body. Must be called with keyword arguments rather than positional. :param zone: Google Cloud zone where the Instance Group Manager exists :param resource_id: Name of the Instance Group Manager :param body: Instance Group Manager representation as json-merge-patch object according to https://cloud.google.com/compute/docs/reference/rest/beta/instanceTemplates/patch :param request_id: Optional, unique request_id that you might add to achieve full idempotence (for example when client call times out repeating the request with the same request id will not create a new instance template again). It should be in UUID format as defined in RFC 4122 :param project_id: Optional, Google Cloud project ID where the Compute Engine Instance exists. If set to None or missing, the default project_id from the Google Cloud connection is used. :return: None """response=(self.get_conn().instanceGroupManagers().patch(project=project_id,zone=zone,instanceGroupManager=resource_id,body=body,requestId=request_id,).execute(num_retries=self.num_retries))try:operation_name=response["name"]exceptKeyError:raiseAirflowException(f"Wrong response '{response}' returned - it should contain 'name' field")self._wait_for_operation_to_complete(project_id=project_id,operation_name=operation_name,zone=zone)
def_wait_for_operation_to_complete(self,project_id:str,operation_name:str,zone:Optional[str]=None)->None:""" Waits for the named operation to complete - checks status of the async call. :param operation_name: name of the operation :param zone: optional region of the request (might be None for global operations) :return: None """service=self.get_conn()whileTrue:ifzoneisNone:operation_response=self._check_global_operation_status(service=service,operation_name=operation_name,project_id=project_id,num_retries=self.num_retries,)else:operation_response=self._check_zone_operation_status(service,operation_name,project_id,zone,self.num_retries)ifoperation_response.get("status")==GceOperationStatus.DONE:error=operation_response.get("error")iferror:code=operation_response.get("httpErrorStatusCode")msg=operation_response.get("httpErrorMessage")# Extracting the errors list as string and trimming square braceserror_msg=str(error.get("errors"))[1:-1]raiseAirflowException(f"{code}{msg}: "+error_msg)breaktime.sleep(TIME_TO_SLEEP_IN_SECONDS)@staticmethoddef_check_zone_operation_status(service:Any,operation_name:str,project_id:str,zone:str,num_retries:int)->dict:return(service.zoneOperations().get(project=project_id,zone=zone,operation=operation_name).execute(num_retries=num_retries))@staticmethoddef_check_global_operation_status(service:Any,operation_name:str,project_id:str,num_retries:int)->dict:return(service.globalOperations().get(project=project_id,operation=operation_name).execute(num_retries=num_retries))@GoogleBaseHook.fallback_to_default_project_id
[docs]defget_instance_info(self,zone:str,resource_id:str,project_id:str)->Dict[str,Any]:""" Gets instance information. :param zone: Google Cloud zone where the Instance Group Manager exists :param resource_id: Name of the Instance Group Manager :param project_id: Optional, Google Cloud project ID where the Compute Engine Instance exists. If set to None or missing, the default project_id from the Google Cloud connection is used. """instance_info=(self.get_conn().instances().get(project=project_id,instance=resource_id,zone=zone).execute(num_retries=self.num_retries))returninstance_info
@GoogleBaseHook.fallback_to_default_project_id
[docs]defget_instance_address(self,zone:str,resource_id:str,project_id:str=PROVIDE_PROJECT_ID,use_internal_ip:bool=False)->str:""" Return network address associated to instance. :param zone: Google Cloud zone where the Instance Group Manager exists :param resource_id: Name of the Instance Group Manager :param project_id: Optional, Google Cloud project ID where the Compute Engine Instance exists. If set to None or missing, the default project_id from the Google Cloud connection is used. :param use_internal_ip: If true, return private IP address. """instance_info=self.get_instance_info(project_id=project_id,resource_id=resource_id,zone=zone)ifuse_internal_ip:returninstance_info["networkInterfaces"][0].get("networkIP")access_config=instance_info["networkInterfaces"][0].get("accessConfigs")ifaccess_config:returnaccess_config[0].get("natIP")raiseAirflowException("The target instance does not have external IP")
@GoogleBaseHook.fallback_to_default_project_id
[docs]defset_instance_metadata(self,zone:str,resource_id:str,metadata:Dict[str,str],project_id:str)->None:""" Set instance metadata. :param zone: Google Cloud zone where the Instance Group Manager exists :param resource_id: Name of the Instance Group Manager :param metadata: The new instance metadata. :param project_id: Optional, Google Cloud project ID where the Compute Engine Instance exists. If set to None or missing, the default project_id from the Google Cloud connection is used. """response=(self.get_conn().instances().setMetadata(project=project_id,zone=zone,instance=resource_id,body=metadata).execute(num_retries=self.num_retries))operation_name=response["name"]self._wait_for_operation_to_complete(project_id=project_id,operation_name=operation_name,zone=zone)