Source code for airflow.providers.google.common.hooks.base_google
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This module contains a Google Cloud API base hook."""from__future__importannotationsimportasyncioimportdatetimeimportfunctoolsimportjsonimportloggingimportosimporttempfilefromcontextlibimportExitStack,contextmanagerfromsubprocessimportcheck_outputfromtypingimportTYPE_CHECKING,Any,Callable,Generator,Sequence,TypeVar,castimportgoogle.authimportgoogle.oauth2.service_accountimportgoogle_auth_httplib2importrequestsimporttenacityfromasgiref.syncimportsync_to_asyncfromgcloud.aio.auth.tokenimportToken,TokenResponsefromgoogle.api_core.exceptionsimportForbidden,ResourceExhausted,TooManyRequestsfromgoogle.authimport_cloud_sdk,compute_engine# type: ignore[attr-defined]fromgoogle.auth.environment_varsimportCLOUD_SDK_CONFIG_DIR,CREDENTIALSfromgoogle.auth.exceptionsimportRefreshErrorfromgoogle.auth.transportimport_http_clientfromgoogleapiclientimportdiscoveryfromgoogleapiclient.errorsimportHttpErrorfromgoogleapiclient.httpimportMediaIoBaseDownload,build_http,set_user_agentfromrequestsimportSessionfromairflowimportversionfromairflow.exceptionsimportAirflowException,AirflowProviderDeprecationWarningfromairflow.hooks.baseimportBaseHookfromairflow.providers.google.cloud.utils.credentials_providerimport(_get_scopes,_get_target_principal_and_delegates,get_credentials_and_project_id,)fromairflow.providers.google.common.constsimportCLIENT_INFOfromairflow.providers.google.common.deprecatedimportdeprecatedfromairflow.utils.process_utilsimportpatch_environifTYPE_CHECKING:fromaiohttpimportClientSessionfromgoogle.api_core.gapic_v1.client_infoimportClientInfofromgoogle.auth.credentialsimportCredentials
# Constants used by the mechanism of repeating requests in reaction to exceeding the temporary quota.
[docs]INVALID_KEYS=["DefaultRequestsPerMinutePerProject","DefaultRequestsPerMinutePerUser","RequestsPerMinutePerProject","Resource has been exhausted (e.g. check quota).",]
[docs]defis_soft_quota_exception(exception:Exception):""" Check for quota violation errors. API for Google services does not have a standardized way to report quota violation errors. The function has been adapted by trial and error to the following services: * Google Translate * Google Vision * Google Text-to-Speech * Google Speech-to-Text * Google Natural Language * Google Video Intelligence """ifisinstance(exception,Forbidden):returnany(reasoninerror.details()forreasoninINVALID_REASONSforerrorinexception.errors)ifisinstance(exception,(ResourceExhausted,TooManyRequests)):returnany(keyinerror.details()forkeyinINVALID_KEYSforerrorinexception.errors)returnFalse
[docs]defis_operation_in_progress_exception(exception:Exception)->bool:""" Handle operation in-progress exceptions. Some calls return 429 (too many requests!) or 409 errors (Conflict) in case of operation in progress. * Google Cloud SQL """ifisinstance(exception,HttpError):returnexception.resp.status==429orexception.resp.status==409returnFalse
[docs]defis_refresh_credentials_exception(exception:Exception)->bool:""" Handle refresh credentials exceptions. Some calls return 502 (server error) in case a new token cannot be obtained. * Google BigQuery """ifisinstance(exception,RefreshError):return"Unable to acquire impersonated credentials"instr(exception)returnFalse
[docs]classretry_if_temporary_quota(tenacity.retry_if_exception):"""Retries if there was an exception for exceeding the temporary quote limit."""def__init__(self):super().__init__(is_soft_quota_exception)
[docs]classretry_if_operation_in_progress(tenacity.retry_if_exception):"""Retries if there was an exception in case of operation in progress."""def__init__(self):super().__init__(is_operation_in_progress_exception)
[docs]classretry_if_temporary_refresh_credentials(tenacity.retry_if_exception):"""Retries if there was an exception for refreshing credentials."""def__init__(self):super().__init__(is_refresh_credentials_exception)
# A fake project_id to use in functions decorated by fallback_to_default_project_id# This allows the 'project_id' argument to be of type str instead of str | None,# making it easier to type hint the function body without dealing with the None# case that can never happen at runtime.
[docs]defget_field(extras:dict,field_name:str):"""Get field from extra, first checking short name, then for backcompat we check for prefixed name."""iffield_name.startswith("extra__"):raiseValueError(f"Got prefixed name {field_name}; please remove the 'extra__google_cloud_platform__' prefix ""when using this method.")iffield_nameinextras:returnextras[field_name]orNoneprefixed_name=f"extra__google_cloud_platform__{field_name}"returnextras.get(prefixed_name)orNone
[docs]classGoogleBaseHook(BaseHook):""" A base hook for Google cloud-related hooks. Google cloud has a shared REST API client that is built in the same way no matter which service you use. This class helps construct and authorize the credentials needed to then call googleapiclient.discovery.build() to actually discover and build a client for a Google cloud service. The class also contains some miscellaneous helper functions. All hook derived from this base hook use the 'Google Cloud' connection type. Three ways of authentication are supported: Default credentials: Only the 'Project Id' is required. You'll need to have set up default credentials, such as by the ``GOOGLE_APPLICATION_DEFAULT`` environment variable or from the metadata server on Google Compute Engine. JSON key file: Specify 'Project Id', 'Keyfile Path' and 'Scope'. Legacy P12 key files are not supported. JSON data provided in the UI: Specify 'Keyfile JSON'. :param gcp_conn_id: The connection ID to use when fetching connection info. :param delegate_to: The account to impersonate using domain-wide delegation of authority, if any. For this to work, the service account making the request must have domain-wide delegation enabled. The usage of this parameter should be limited only to Google Workspace (gsuite) and marketing platform operators and hooks. It is deprecated for usage by Google Cloud and Firebase operators and hooks, as well as transfer operators in other providers that involve Google cloud. :param impersonation_chain: Optional service account to impersonate using short-term credentials, or chained list of accounts required to get the access_token of the last account in the list, which will be impersonated in the request. If set as a string, the account must grant the originating account the Service Account Token Creator IAM role. If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account. """
[docs]defget_connection_form_widgets(cls)->dict[str,Any]:"""Return connection widgets to add to connection form."""fromflask_appbuilder.fieldwidgetsimportBS3PasswordFieldWidget,BS3TextFieldWidgetfromflask_babelimportlazy_gettextfromwtformsimportBooleanField,IntegerField,PasswordField,StringFieldfromwtforms.validatorsimportNumberRangereturn{"project":StringField(lazy_gettext("Project Id"),widget=BS3TextFieldWidget()),"key_path":StringField(lazy_gettext("Keyfile Path"),widget=BS3TextFieldWidget()),"keyfile_dict":PasswordField(lazy_gettext("Keyfile JSON"),widget=BS3PasswordFieldWidget()),"credential_config_file":StringField(lazy_gettext("Credential Configuration File"),widget=BS3TextFieldWidget()),"scope":StringField(lazy_gettext("Scopes (comma separated)"),widget=BS3TextFieldWidget()),"key_secret_name":StringField(lazy_gettext("Keyfile Secret Name (in GCP Secret Manager)"),widget=BS3TextFieldWidget()),"key_secret_project_id":StringField(lazy_gettext("Keyfile Secret Project Id (in GCP Secret Manager)"),widget=BS3TextFieldWidget()),"num_retries":IntegerField(lazy_gettext("Number of Retries"),validators=[NumberRange(min=0)],widget=BS3TextFieldWidget(),default=5,),"impersonation_chain":StringField(lazy_gettext("Impersonation Chain"),widget=BS3TextFieldWidget()),"idp_issuer_url":StringField(lazy_gettext("IdP Token Issue URL (Client Credentials Grant Flow)"),widget=BS3TextFieldWidget(),),"client_id":StringField(lazy_gettext("Client ID (Client Credentials Grant Flow)"),widget=BS3TextFieldWidget()),"client_secret":StringField(lazy_gettext("Client Secret (Client Credentials Grant Flow)"),widget=BS3PasswordFieldWidget(),),"idp_extra_parameters":StringField(lazy_gettext("IdP Extra Request Parameters"),widget=BS3TextFieldWidget()),"is_anonymous":BooleanField(lazy_gettext("Anonymous credentials (ignores all other settings)"),default=False),}
@classmethod
[docs]defget_ui_field_behaviour(cls)->dict[str,Any]:"""Return custom field behaviour."""return{"hidden_fields":["host","schema","login","password","port","extra"],"relabeling":{},}
[docs]defget_credentials_and_project_id(self)->tuple[Credentials,str|None]:"""Return the Credentials object for Google API and the associated project_id."""ifself._cached_credentialsisnotNone:returnself._cached_credentials,self._cached_project_idkey_path:str|None=self._get_field("key_path",None)try:keyfile_dict:str|dict[str,str]|None=self._get_field("keyfile_dict",None)keyfile_dict_json:dict[str,str]|None=Noneifkeyfile_dict:ifisinstance(keyfile_dict,dict):keyfile_dict_json=keyfile_dictelse:keyfile_dict_json=json.loads(keyfile_dict)exceptjson.decoder.JSONDecodeError:raiseAirflowException("Invalid key JSON.")key_secret_name:str|None=self._get_field("key_secret_name",None)key_secret_project_id:str|None=self._get_field("key_secret_project_id",None)credential_config_file:str|None=self._get_field("credential_config_file",None)ifnotself.impersonation_chain:self.impersonation_chain=self._get_field("impersonation_chain",None)ifisinstance(self.impersonation_chain,str)and","inself.impersonation_chain:self.impersonation_chain=[s.strip()forsinself.impersonation_chain.split(",")]target_principal,delegates=_get_target_principal_and_delegates(self.impersonation_chain)is_anonymous=self._get_field("is_anonymous")idp_issuer_url:str|None=self._get_field("idp_issuer_url",None)client_id:str|None=self._get_field("client_id",None)client_secret:str|None=self._get_field("client_secret",None)idp_extra_params:str|None=self._get_field("idp_extra_params",None)idp_extra_params_dict:dict[str,str]|None=Noneifidp_extra_params:try:idp_extra_params_dict=json.loads(idp_extra_params)exceptjson.decoder.JSONDecodeError:raiseAirflowException("Invalid JSON.")credentials,project_id=get_credentials_and_project_id(key_path=key_path,keyfile_dict=keyfile_dict_json,credential_config_file=credential_config_file,key_secret_name=key_secret_name,key_secret_project_id=key_secret_project_id,scopes=self.scopes,delegate_to=self.delegate_to,target_principal=target_principal,delegates=delegates,is_anonymous=is_anonymous,idp_issuer_url=idp_issuer_url,client_id=client_id,client_secret=client_secret,idp_extra_params_dict=idp_extra_params_dict,)overridden_project_id=self._get_field("project")ifoverridden_project_id:project_id=overridden_project_idself._cached_credentials=credentialsself._cached_project_id=project_idreturncredentials,project_id
[docs]defget_credentials(self)->Credentials:"""Return the Credentials object for Google API."""credentials,_=self.get_credentials_and_project_id()returncredentials
def_get_access_token(self)->str:"""Return a valid access token from Google API Credentials."""credentials=self.get_credentials()auth_req=google.auth.transport.requests.Request()# credentials.token is None# Need to refresh credentials to populate the tokencredentials.refresh(auth_req)returncredentials.token@functools.cached_propertydef_get_credentials_email(self)->str:""" Return the email address associated with the currently logged in account. If a service account is used, it returns the service account. If user authentication (e.g. gcloud auth) is used, it returns the e-mail account of that user. """credentials=self.get_credentials()ifisinstance(credentials,compute_engine.Credentials):try:credentials.refresh(_http_client.Request())exceptRefreshErrorasmsg:""" If the Compute Engine metadata service can't be reached in this case the instance has not credentials. """self.log.debug(msg)service_account_email=getattr(credentials,"service_account_email",None)ifservice_account_email:returnservice_account_emailhttp_authorized=self._authorize()oauth2_client=discovery.build("oauth2","v1",http=http_authorized,cache_discovery=False)returnoauth2_client.tokeninfo().execute()["email"]def_authorize(self)->google_auth_httplib2.AuthorizedHttp:"""Return an authorized HTTP object to be used to build a Google cloud service hook connection."""credentials=self.get_credentials()http=build_http()http=set_user_agent(http,"airflow/"+version.version)authed_http=google_auth_httplib2.AuthorizedHttp(credentials,http=http)returnauthed_httpdef_get_field(self,f:str,default:Any=None)->Any:""" Fetch a field from extras, and returns it. This is some Airflow magic. The google_cloud_platform hook type adds custom UI elements to the hook page, which allow admins to specify service_account, key_path, etc. They get formatted as shown below. """returnhasattr(self,"extras")andget_field(self.extras,f)ordefault@property
[docs]defproject_id(self)->str:""" Returns project id. :return: id of the project """_,project_id=self.get_credentials_and_project_id()returnproject_idorPROVIDE_PROJECT_ID
@property
[docs]defnum_retries(self)->int:""" Returns num_retries from Connection. :return: the number of times each API request should be retried """field_value=self._get_field("num_retries",default=5)iffield_valueisNone:return5ifisinstance(field_value,str)andfield_value.strip()=="":return5try:returnint(field_value)exceptValueError:raiseAirflowException(f"The num_retries field should be a integer. "f'Current value: "{field_value}" (type: {type(field_value)}). 'f"Please check the connection configuration.")
[docs]defclient_info(self)->ClientInfo:""" Return client information used to generate a user-agent for API calls. It allows for better errors tracking. This object is only used by the google-cloud-* libraries that are built specifically for the Google Cloud. It is not supported by The Google APIs Python Client that use Discovery based APIs. """returnCLIENT_INFO
@property
[docs]defscopes(self)->Sequence[str]:""" Return OAuth 2.0 scopes. :return: Returns the scope defined in the connection configuration, or the default scope """scope_value:str|None=self._get_field("scope",None)return_get_scopes(scope_value)
@staticmethod
[docs]defquota_retry(*args,**kwargs)->Callable:"""Provide a mechanism to repeat requests in response to exceeding a temporary quota limit."""defdecorator(func:Callable):default_kwargs={"wait":tenacity.wait_exponential(multiplier=1,max=100),"retry":retry_if_temporary_quota(),"before":tenacity.before_log(log,logging.DEBUG),"after":tenacity.after_log(log,logging.DEBUG),}default_kwargs.update(**kwargs)returntenacity.retry(*args,**default_kwargs)(func)returndecorator
@staticmethod
[docs]defoperation_in_progress_retry(*args,**kwargs)->Callable[[T],T]:"""Provide a mechanism to repeat requests in response to operation in progress (HTTP 409) limit."""defdecorator(func:T):default_kwargs={"wait":tenacity.wait_exponential(multiplier=1,max=300),"retry":retry_if_operation_in_progress(),"before":tenacity.before_log(log,logging.DEBUG),"after":tenacity.after_log(log,logging.DEBUG),}default_kwargs.update(**kwargs)returncast(T,tenacity.retry(*args,**default_kwargs)(func))returndecorator
@staticmethod
[docs]defrefresh_credentials_retry(*args,**kwargs)->Callable[[T],T]:"""Provide a mechanism to repeat requests in response to a temporary refresh credential issue."""defdecorator(func:T):default_kwargs={"wait":tenacity.wait_exponential(multiplier=1,max=5),"stop":tenacity.stop_after_attempt(3),"retry":retry_if_temporary_refresh_credentials(),"reraise":True,"before":tenacity.before_log(log,logging.DEBUG),"after":tenacity.after_log(log,logging.DEBUG),}default_kwargs.update(**kwargs)returncast(T,tenacity.retry(*args,**default_kwargs)(func))returndecorator
@staticmethod
[docs]deffallback_to_default_project_id(func:Callable[...,RT])->Callable[...,RT]:""" Provide fallback for Google Cloud project id. To be used as a decorator. If the project is None it will be replaced with the project_id from the service account the Hook is authenticated with. Project id can be specified either via project_id kwarg or via first parameter in positional args. :param func: function to wrap :return: result of the function call """@functools.wraps(func)definner_wrapper(self:GoogleBaseHook,*args,**kwargs)->RT:ifargs:raiseAirflowException("You must use keyword arguments in this methods rather than positional")if"project_id"inkwargs:kwargs["project_id"]=kwargs["project_id"]orself.project_idelse:kwargs["project_id"]=self.project_idifnotkwargs["project_id"]:raiseAirflowException("The project id must be passed either as ""keyword project_id parameter or as project_id extra ""in Google Cloud connection definition. Both are not set!")returnfunc(self,*args,**kwargs)returninner_wrapper
@staticmethod
[docs]defprovide_gcp_credential_file(func:T)->T:""" Provide a Google Cloud credentials for Application Default Credentials (ADC) strategy support. It is recommended to use ``provide_gcp_credential_file_as_context`` context manager to limit the scope when authorization data is available. Using context manager also makes it easier to use multiple connection in one function. """@functools.wraps(func)defwrapper(self:GoogleBaseHook,*args,**kwargs):withself.provide_gcp_credential_file_as_context():returnfunc(self,*args,**kwargs)returncast(T,wrapper)
@contextmanager
[docs]defprovide_gcp_credential_file_as_context(self)->Generator[str|None,None,None]:""" Provide a Google Cloud credentials for Application Default Credentials (ADC) strategy support. See: `Application Default Credentials (ADC) strategy <https://cloud.google.com/docs/authentication/production>`__. It can be used to provide credentials for external programs (e.g. gcloud) that expect authorization file in ``GOOGLE_APPLICATION_CREDENTIALS`` environment variable. """key_path:str|None=self._get_field("key_path",None)keyfile_dict:str|dict[str,str]|None=self._get_field("keyfile_dict",None)ifkey_pathandkeyfile_dict:raiseAirflowException("The `keyfile_dict` and `key_path` fields are mutually exclusive. ""Please provide only one value.")elifkey_path:ifkey_path.endswith(".p12"):raiseAirflowException("Legacy P12 key file are not supported, use a JSON key file.")withpatch_environ({CREDENTIALS:key_path}):yieldkey_pathelifkeyfile_dict:withtempfile.NamedTemporaryFile(mode="w+t")asconf_file:ifisinstance(keyfile_dict,dict):keyfile_dict=json.dumps(keyfile_dict)conf_file.write(keyfile_dict)conf_file.flush()withpatch_environ({CREDENTIALS:conf_file.name}):yieldconf_file.nameelse:# We will use the default service account credentials.yieldNone
@contextmanager
[docs]defprovide_authorized_gcloud(self)->Generator[None,None,None]:""" Provide a separate gcloud configuration with current credentials. The gcloud tool allows you to login to Google Cloud only - ``gcloud auth login`` and for the needs of Application Default Credentials ``gcloud auth application-default login``. In our case, we want all commands to use only the credentials from ADCm so we need to configure the credentials in gcloud manually. """credentials_path=_cloud_sdk.get_application_default_credentials_path()project_id=self.project_idwithExitStack()asexit_stack:exit_stack.enter_context(self.provide_gcp_credential_file_as_context())gcloud_config_tmp=exit_stack.enter_context(tempfile.TemporaryDirectory())exit_stack.enter_context(patch_environ({CLOUD_SDK_CONFIG_DIR:gcloud_config_tmp}))ifCREDENTIALSinos.environ:# This solves most cases when we are logged in using the service key in Airflow.# Don't display stdout/stderr for security reasoncheck_output(["gcloud","auth","activate-service-account",f"--key-file={os.environ[CREDENTIALS]}",])elifos.path.exists(credentials_path):# If we are logged in by `gcloud auth application-default` then we need to log in manually.# This will make the `gcloud auth application-default` and `gcloud auth` credentials equals.withopen(credentials_path)ascreds_file:creds_content=json.loads(creds_file.read())# Don't display stdout/stderr for security reasoncheck_output(["gcloud","config","set","auth/client_id",creds_content["client_id"]])# Don't display stdout/stderr for security reasoncheck_output(["gcloud","config","set","auth/client_secret",creds_content["client_secret"]])# Don't display stdout/stderr for security reasoncheck_output(["gcloud","auth","activate-refresh-token",creds_content["client_id"],creds_content["refresh_token"],])ifproject_id:# Don't display stdout/stderr for security reasoncheck_output(["gcloud","config","set","core/project",project_id])yield
@staticmethod
[docs]defdownload_content_from_request(file_handle,request:dict,chunk_size:int)->None:""" Download media resources. Note that the Python file object is compatible with io.Base and can be used with this class also. :param file_handle: io.Base or file object. The stream in which to write the downloaded bytes. :param request: googleapiclient.http.HttpRequest, the media request to perform in chunks. :param chunk_size: int, File will be downloaded in chunks of this many bytes. """downloader=MediaIoBaseDownload(file_handle,request,chunksize=chunk_size)done=FalsewhiledoneisFalse:_,done=downloader.next_chunk()file_handle.flush()
[docs]deftest_connection(self):"""Test the Google cloud connectivity from UI."""status,message=False,""ifself._get_field("is_anonymous"):returnTrue,"Credentials are anonymous"try:token=self._get_access_token()url=f"https://www.googleapis.com/oauth2/v3/tokeninfo?access_token={token}"response=requests.post(url)ifresponse.status_code==200:status=Truemessage="Connection successfully tested"exceptExceptionase:status=Falsemessage=str(e)returnstatus,message
class_CredentialsToken(Token):""" A token implementation which makes Google credentials objects accessible to [gcloud-aio](https://talkiq.github.io/gcloud-aio/) clients. This class allows us to create token instances from credentials objects and thus supports a variety of use cases for Google credentials in Airflow (i.e. impersonation chain). By relying on a existing credentials object we leverage functionality provided by the GoogleBaseHook for generating credentials objects. """def__init__(self,credentials:Credentials,*,project:str|None=None,session:ClientSession|None=None,scopes:Sequence[str]|None=None,)->None:_scopes:list[str]|None=list(scopes)ifscopeselseNonesuper().__init__(session=cast(Session,session),scopes=_scopes)self.credentials=credentialsself.project=project@classmethodasyncdeffrom_hook(cls,hook:GoogleBaseHook,*,session:ClientSession|None=None,)->_CredentialsToken:credentials,project=hook.get_credentials_and_project_id()returncls(credentials=credentials,project=project,session=session,scopes=hook.scopes,)asyncdefget_project(self)->str|None:returnself.projectasyncdefrefresh(self,*,timeout:int)->TokenResponse:awaitsync_to_async(self.credentials.refresh)(google.auth.transport.requests.Request())self.access_token=cast(str,self.credentials.token)self.access_token_duration=3600self.access_token_acquired_at=self._now()returnTokenResponse(value=self.access_token,expires_in=self.access_token_duration)asyncdefacquire_access_token(self,timeout:int=10)->None:awaitself.refresh(timeout=timeout)self.acquiring=Noneasyncdefensure_token(self)->None:ifself.acquiringandnotself.acquiring.done():awaitself.acquiringreturnifself.access_token:delta=(self._now()-self.access_token_acquired_at).total_seconds()ifdelta<=self.access_token_duration/2:returnself.acquiring=asyncio.ensure_future(# pylint: disable=used-before-assignmentself.acquire_access_token())awaitself.acquiring@staticmethoddef_now():# access_token_acquired_at is specific to gcloud-aio's Token.# On subsequent calls of `get` it will be used with `datetime.datetime.utcnow()`.# Therefore we have to use an offset-naive datetime.# https://github.com/talkiq/gcloud-aio/blob/f1132b005ba35d8059229a9ca88b90f31f77456d/auth/gcloud/aio/auth/token.py#L204returndatetime.datetime.now(tz=datetime.timezone.utc).replace(tzinfo=None)
[docs]classGoogleBaseAsyncHook(BaseHook):"""GoogleBaseAsyncHook inherits from BaseHook class, run on the trigger worker."""
def__init__(self,**kwargs:Any)->None:# add default value to gcp_conn_idif"gcp_conn_id"notinkwargs:kwargs["gcp_conn_id"]="google_cloud_default"self._hook_kwargs=kwargsself._sync_hook=None
[docs]asyncdefget_sync_hook(self)->Any:"""Sync version of the Google Cloud Hook makes blocking calls in ``__init__``; don't inherit it."""ifnotself._sync_hook:self._sync_hook=awaitsync_to_async(self.sync_hook_class)(**self._hook_kwargs)returnself._sync_hook
[docs]asyncdefget_token(self,*,session:ClientSession|None=None)->_CredentialsToken:"""Return a Token instance for use in [gcloud-aio](https://talkiq.github.io/gcloud-aio/) clients."""sync_hook=awaitself.get_sync_hook()returnawait_CredentialsToken.from_hook(sync_hook,session=session)
[docs]asyncdefservice_file_as_context(self)->Any:""" Provide a Google Cloud credentials for Application Default Credentials (ADC) strategy support. This is the async equivalent of the non-async GoogleBaseHook's `provide_gcp_credential_file_as_context` method. """sync_hook=awaitself.get_sync_hook()returnawaitsync_to_async(sync_hook.provide_gcp_credential_file_as_context)()