Source code for airflow.providers.google.cloud.utils.credentials_provider
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This module contains a mechanism for providing temporary Google Cloud authentication."""from__future__importannotationsimportjsonimportloggingimportosimporttempfilefromcontextlibimportExitStack,contextmanagerfromtypingimportCollection,Generator,Sequencefromurllib.parseimporturlencodeimportgoogle.authimportgoogle.auth.credentialsimportgoogle.oauth2.service_accountfromgoogle.authimportimpersonated_credentials# type: ignore[attr-defined]fromgoogle.auth.environment_varsimportCREDENTIALS,LEGACY_PROJECT,PROJECTfromairflow.exceptionsimportAirflowExceptionfromairflow.providers.google.cloud._internal_client.secret_manager_clientimport_SecretManagerClientfromairflow.utils.log.logging_mixinimportLoggingMixinfromairflow.utils.process_utilsimportpatch_environ
[docs]defbuild_gcp_conn(key_file_path:str|None=None,scopes:Sequence[str]|None=None,project_id:str|None=None,)->str:""" Build a uri that can be used as :envvar:`AIRFLOW_CONN_{CONN_ID}` with provided values. :param key_file_path: Path to service key. :param scopes: Required OAuth scopes. :param project_id: The Google Cloud project id to be used for the connection. :return: String representing Airflow connection. """conn="google-cloud-platform://?{}"query_params={}ifkey_file_path:query_params["key_path"]=key_file_pathifscopes:scopes_string=",".join(scopes)query_params["scope"]=scopes_stringifproject_id:query_params["projects"]=project_idquery=urlencode(query_params)returnconn.format(query)
@contextmanager
[docs]defprovide_gcp_credentials(key_file_path:str|None=None,key_file_dict:dict|None=None,)->Generator[None,None,None]:""" Context manager that provides Google Cloud credentials for Application Default Credentials (ADC). .. seealso:: `Application Default Credentials (ADC) strategy`__. It can be used to provide credentials for external programs (e.g. gcloud) that expect authorization file in ``GOOGLE_APPLICATION_CREDENTIALS`` environment variable. :param key_file_path: Path to file with Google Cloud Service Account .json file. :param key_file_dict: Dictionary with credentials. __ https://cloud.google.com/docs/authentication/production """ifnotkey_file_pathandnotkey_file_dict:raiseValueError("Please provide `key_file_path` or `key_file_dict`.")ifkey_file_pathandkey_file_path.endswith(".p12"):raiseAirflowException("Legacy P12 key file are not supported, use a JSON key file.")withtempfile.NamedTemporaryFile(mode="w+t")asconf_file:ifnotkey_file_pathandkey_file_dict:conf_file.write(json.dumps(key_file_dict))conf_file.flush()key_file_path=conf_file.nameifkey_file_path:withpatch_environ({CREDENTIALS:key_file_path}):yieldelse:# We will use the default service account credentials.yield
@contextmanager
[docs]defprovide_gcp_connection(key_file_path:str|None=None,scopes:Sequence|None=None,project_id:str|None=None,)->Generator[None,None,None]:""" Context manager that provides a temporary value of :envvar:`AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT` connection. It builds a new connection that includes path to provided service json, required scopes and project id. :param key_file_path: Path to file with Google Cloud Service Account .json file. :param scopes: OAuth scopes for the connection :param project_id: The id of Google Cloud project for the connection. """ifkey_file_pathandkey_file_path.endswith(".p12"):raiseAirflowException("Legacy P12 key file are not supported, use a JSON key file.")conn=build_gcp_conn(scopes=scopes,key_file_path=key_file_path,project_id=project_id)withpatch_environ({AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT:conn}):yield
@contextmanager
[docs]defprovide_gcp_conn_and_credentials(key_file_path:str|None=None,scopes:Sequence|None=None,project_id:str|None=None,)->Generator[None,None,None]:""" Context manager that provides GPC connection and credentials. It provides both: - Google Cloud credentials for application supporting `Application Default Credentials (ADC) strategy`__. - temporary value of :envvar:`AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT` connection :param key_file_path: Path to file with Google Cloud Service Account .json file. :param scopes: OAuth scopes for the connection :param project_id: The id of Google Cloud project for the connection. __ https://cloud.google.com/docs/authentication/production """withExitStack()asstack:ifkey_file_path:stack.enter_context(provide_gcp_credentials(key_file_path))# type; ignoreifproject_id:stack.enter_context(# type; ignorepatch_environ({PROJECT:project_id,LEGACY_PROJECT:project_id}))stack.enter_context(provide_gcp_connection(key_file_path,scopes,project_id))# type; ignoreyield
class_CredentialProvider(LoggingMixin):""" Prepare the Credentials object for Google API and the associated project_id. Only either `key_path` or `keyfile_dict` should be provided, or an exception will occur. If neither of them are provided, return default credentials for the current environment :param key_path: Path to Google Cloud Service Account key file (JSON). :param keyfile_dict: A dict representing Cloud Service Account as in the Credential JSON file :param key_secret_name: Keyfile Secret Name in GCP Secret Manager. :param key_secret_project_id: Project ID to read the secrets from. If not passed, the project ID from default credentials will be used. :param scopes: OAuth scopes for the connection :param delegate_to: The account to impersonate using domain-wide delegation of authority, if any. For this to work, the service account making the request must have domain-wide delegation enabled. :param disable_logging: If true, disable all log messages, which allows you to use this class to configure Logger. :param target_principal: The service account to directly impersonate using short-term credentials, if any. For this to work, the target_principal account must grant the originating account the Service Account Token Creator IAM role. :param delegates: optional chained list of accounts required to get the access_token of target_principal. If set, the sequence of identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account and target_principal granting the role to the last account from the list. """def__init__(self,key_path:str|None=None,keyfile_dict:dict[str,str]|None=None,credential_config_file:dict[str,str]|str|None=None,key_secret_name:str|None=None,key_secret_project_id:str|None=None,scopes:Collection[str]|None=None,delegate_to:str|None=None,disable_logging:bool=False,target_principal:str|None=None,delegates:Sequence[str]|None=None,)->None:super().__init__()key_options=[key_path,key_secret_name,keyfile_dict]iflen([xforxinkey_optionsifx])>1:raiseAirflowException("The `keyfile_dict`, `key_path`, and `key_secret_name` fields ""are all mutually exclusive. Please provide only one value.")self.key_path=key_pathself.keyfile_dict=keyfile_dictself.credential_config_file=credential_config_fileself.key_secret_name=key_secret_nameself.key_secret_project_id=key_secret_project_idself.scopes=scopesself.delegate_to=delegate_toself.disable_logging=disable_loggingself.target_principal=target_principalself.delegates=delegatesdefget_credentials_and_project(self)->tuple[google.auth.credentials.Credentials,str]:""" Get current credentials and project ID. :return: Google Auth Credentials """ifself.key_path:credentials,project_id=self._get_credentials_using_key_path()elifself.key_secret_name:credentials,project_id=self._get_credentials_using_key_secret_name()elifself.keyfile_dict:credentials,project_id=self._get_credentials_using_keyfile_dict()elifself.credential_config_file:credentials,project_id=self._get_credentials_using_credential_config_file()else:credentials,project_id=self._get_credentials_using_adc()ifself.delegate_to:ifhasattr(credentials,"with_subject"):credentials=credentials.with_subject(self.delegate_to)else:raiseAirflowException("The `delegate_to` parameter cannot be used here as the current ""authentication method does not support account impersonate. ""Please use service-account for authorization.")ifself.target_principal:credentials=impersonated_credentials.Credentials(source_credentials=credentials,target_principal=self.target_principal,delegates=self.delegates,target_scopes=self.scopes,)project_id=_get_project_id_from_service_account_email(self.target_principal)returncredentials,project_iddef_get_credentials_using_keyfile_dict(self):self._log_debug("Getting connection using JSON Dict")# Depending on how the JSON was formatted, it may contain# escaped newlines. Convert those to actual newlines.self.keyfile_dict["private_key"]=self.keyfile_dict["private_key"].replace("\\n","\n")credentials=google.oauth2.service_account.Credentials.from_service_account_info(self.keyfile_dict,scopes=self.scopes)project_id=credentials.project_idreturncredentials,project_iddef_get_credentials_using_key_path(self):ifself.key_path.endswith(".p12"):raiseAirflowException("Legacy P12 key file are not supported, use a JSON key file.")ifnotself.key_path.endswith(".json"):raiseAirflowException("Unrecognised extension for key file.")self._log_debug("Getting connection using JSON key file %s",self.key_path)credentials=google.oauth2.service_account.Credentials.from_service_account_file(self.key_path,scopes=self.scopes)project_id=credentials.project_idreturncredentials,project_iddef_get_credentials_using_key_secret_name(self):self._log_debug("Getting connection using JSON key data from GCP secret: %s",self.key_secret_name)# Use ADC to access GCP Secret Manager.adc_credentials,adc_project_id=google.auth.default(scopes=self.scopes)secret_manager_client=_SecretManagerClient(credentials=adc_credentials)ifnotsecret_manager_client.is_valid_secret_name(self.key_secret_name):raiseAirflowException("Invalid secret name specified for fetching JSON key data.")secret_value=secret_manager_client.get_secret(secret_id=self.key_secret_name,project_id=self.key_secret_project_idifself.key_secret_project_idelseadc_project_id,)ifsecret_valueisNone:raiseAirflowException(f"Failed getting value of secret {self.key_secret_name}.")try:keyfile_dict=json.loads(secret_value)exceptjson.decoder.JSONDecodeError:raiseAirflowException("Key data read from GCP Secret Manager is not valid JSON.")credentials=google.oauth2.service_account.Credentials.from_service_account_info(keyfile_dict,scopes=self.scopes)project_id=credentials.project_idreturncredentials,project_iddef_get_credentials_using_credential_config_file(self):ifisinstance(self.credential_config_file,str)andos.path.exists(self.credential_config_file):self._log_info(f"Getting connection using credential configuration file: `{self.credential_config_file}`")credentials,project_id=google.auth.load_credentials_from_file(self.credential_config_file,scopes=self.scopes)else:withtempfile.NamedTemporaryFile(mode="w+t")astemp_credentials_fd:ifisinstance(self.credential_config_file,dict):self._log_info("Getting connection using credential configuration dict.")temp_credentials_fd.write(json.dumps(self.credential_config_file))elifisinstance(self.credential_config_file,str):self._log_info("Getting connection using credential configuration string.")temp_credentials_fd.write(self.credential_config_file)temp_credentials_fd.flush()credentials,project_id=google.auth.load_credentials_from_file(temp_credentials_fd.name,scopes=self.scopes)returncredentials,project_iddef_get_credentials_using_adc(self):self._log_info("Getting connection using `google.auth.default()` since no explicit credentials are provided.")credentials,project_id=google.auth.default(scopes=self.scopes)returncredentials,project_iddef_log_info(self,*args,**kwargs)->None:ifnotself.disable_logging:self.log.info(*args,**kwargs)def_log_debug(self,*args,**kwargs)->None:ifnotself.disable_logging:self.log.debug(*args,**kwargs)
[docs]defget_credentials_and_project_id(*args,**kwargs)->tuple[google.auth.credentials.Credentials,str]:"""Return the Credentials object for Google API and the associated project_id."""return_CredentialProvider(*args,**kwargs).get_credentials_and_project()
def_get_scopes(scopes:str|None=None)->Sequence[str]:""" Parse a comma-separated string containing OAuth2 scopes if `scopes` is provided; otherwise return default. :param scopes: A comma-separated string containing OAuth2 scopes :return: Returns the scope defined in the connection configuration, or the default scope """return[s.strip()forsinscopes.split(",")]ifscopeselse_DEFAULT_SCOPESdef_get_target_principal_and_delegates(impersonation_chain:str|Sequence[str]|None=None,)->tuple[str|None,Sequence[str]|None]:""" Get the target_principal and optional list of delegates from impersonation_chain. Analyze contents of impersonation_chain and return target_principal (the service account to directly impersonate using short-term credentials, if any) and optional list of delegates required to get the access_token of target_principal. :param impersonation_chain: the service account to impersonate or a chained list leading to this account :return: Returns the tuple of target_principal and delegates """ifnotimpersonation_chain:returnNone,Noneifisinstance(impersonation_chain,str):returnimpersonation_chain,Nonereturnimpersonation_chain[-1],impersonation_chain[:-1]def_get_project_id_from_service_account_email(service_account_email:str)->str:""" Extract project_id from service account's email address. :param service_account_email: email of the service account. :return: Returns the project_id of the provided service account. """try:returnservice_account_email.split("@")[1].split(".")[0]exceptIndexError:raiseAirflowException(f"Could not extract project_id from service account's email: {service_account_email}.")