Source code for airflow.providers.google.cloud.hooks.looker
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.#"""This module contains a Google Cloud Looker hook."""importjsonimporttimefromenumimportEnumfromtypingimportDict,Optionalfromlooker_sdk.rtlimportapi_settings,auth_session,requests_transport,serializefromlooker_sdk.sdk.api40importmethodsasmethods40frompackaging.versionimportparseasparse_versionfromairflow.exceptionsimportAirflowExceptionfromairflow.hooks.baseimportBaseHookfromairflow.models.connectionimportConnectionfromairflow.versionimportversion
[docs]classLookerHook(BaseHook):"""Hook for Looker APIs."""def__init__(self,looker_conn_id:str,)->None:super().__init__()self.looker_conn_id=looker_conn_id# source is used to track origin of the requestsself.source=f'airflow:{version}'
[docs]defstart_pdt_build(self,model:str,view:str,query_params:Optional[Dict]=None,):""" Submits a PDT materialization job to Looker. :param model: Required. The model of the PDT to start building. :param view: Required. The view of the PDT to start building. :param query_params: Optional. Additional materialization parameters. """self.log.info("Submitting PDT materialization job. Model: '%s', view: '%s'.",model,view)self.log.debug("PDT materialization job source: '%s'.",self.source)sdk=self.get_looker_sdk()looker_ver=sdk.versions().looker_release_versionifparse_version(looker_ver)<parse_version("22.2.0"):raiseAirflowException(f'This API requires Looker version 22.2+. Found: {looker_ver}.')# unpack query_params dict into kwargs (if not None)ifquery_params:resp=sdk.start_pdt_build(model_name=model,view_name=view,source=self.source,**query_params)else:resp=sdk.start_pdt_build(model_name=model,view_name=view,source=self.source)self.log.info("Start PDT build response: '%s'.",resp)returnresp
[docs]defcheck_pdt_build(self,materialization_id:str,):""" Gets the PDT materialization job status from Looker. :param materialization_id: Required. The materialization id to check status for. """self.log.info("Requesting PDT materialization job status. Job id: %s.",materialization_id)sdk=self.get_looker_sdk()resp=sdk.check_pdt_build(materialization_id=materialization_id)self.log.info("Check PDT build response: '%s'.",resp)returnresp
[docs]defpdt_build_status(self,materialization_id:str,)->Dict:""" Gets the PDT materialization job status. :param materialization_id: Required. The materialization id to check status for. """resp=self.check_pdt_build(materialization_id=materialization_id)status_json=resp['resp_text']status_dict=json.loads(status_json)self.log.info("PDT materialization job id: %s. Status: '%s'.",materialization_id,status_dict['status'])returnstatus_dict
[docs]defstop_pdt_build(self,materialization_id:str,):""" Starts a PDT materialization job cancellation request. :param materialization_id: Required. The materialization id to stop. """self.log.info("Stopping PDT materialization. Job id: %s.",materialization_id)self.log.debug("PDT materialization job source: '%s'.",self.source)sdk=self.get_looker_sdk()resp=sdk.stop_pdt_build(materialization_id=materialization_id,source=self.source)self.log.info("Stop PDT build response: '%s'.",resp)returnresp
[docs]defwait_for_job(self,materialization_id:str,wait_time:int=10,timeout:Optional[int]=None,)->None:""" Helper method which polls a PDT materialization job to check if it finishes. :param materialization_id: Required. The materialization id to wait for. :param wait_time: Optional. Number of seconds between checks. :param timeout: Optional. How many seconds wait for job to be ready. Used only if ``asynchronous`` is False. """self.log.info('Waiting for PDT materialization job to complete. Job id: %s.',materialization_id)status=Nonestart=time.monotonic()whilestatusnotin(JobStatus.DONE.value,JobStatus.ERROR.value,JobStatus.CANCELLED.value,JobStatus.UNKNOWN.value,):iftimeoutandstart+timeout<time.monotonic():self.stop_pdt_build(materialization_id=materialization_id)raiseAirflowException(f"Timeout: PDT materialization job is not ready after {timeout}s. "f"Job id: {materialization_id}.")time.sleep(wait_time)status_dict=self.pdt_build_status(materialization_id=materialization_id)status=status_dict['status']ifstatus==JobStatus.ERROR.value:msg=status_dict['message']raiseAirflowException(f'PDT materialization job failed. Job id: {materialization_id}. Message:\n"{msg}"')ifstatus==JobStatus.CANCELLED.value:raiseAirflowException(f'PDT materialization job was cancelled. Job id: {materialization_id}.')ifstatus==JobStatus.UNKNOWN.value:raiseAirflowException(f'PDT materialization job has unknown status. Job id: {materialization_id}.')self.log.info('PDT materialization job completed successfully. Job id: %s.',materialization_id)
[docs]defget_looker_sdk(self):"""Returns Looker SDK client for Looker API 4.0."""conn=self.get_connection(self.looker_conn_id)settings=LookerApiSettings(conn)transport=requests_transport.RequestsTransport.configure(settings)returnmethods40.Looker40SDK(auth_session.AuthSession(settings,transport,serialize.deserialize40,"4.0"),serialize.deserialize40,serialize.serialize,transport,"4.0",
)
[docs]classLookerApiSettings(api_settings.ApiSettings):"""Custom implementation of Looker SDK's `ApiSettings` class."""def__init__(self,conn:Connection,)->None:self.conn=conn# need to init before `read_config` is called in supersuper().__init__()
[docs]defread_config(self):""" Overrides the default logic of getting connection settings. Fetches the connection settings from Airflow's connection object. """config={}ifself.conn.hostisNone:raiseAirflowException(f'No `host` was supplied in connection: {self.conn.id}.')ifself.conn.port:config["base_url"]=f"{self.conn.host}:{self.conn.port}"# port is optionalelse:config["base_url"]=self.conn.hostifself.conn.login:config["client_id"]=self.conn.loginelse:raiseAirflowException(f'No `login` was supplied in connection: {self.conn.id}.')ifself.conn.password:config["client_secret"]=self.conn.passwordelse:raiseAirflowException(f'No `password` was supplied in connection: {self.conn.id}.')extras=self.conn.extra_dejson# type: Dictif'verify_ssl'inextras:config["verify_ssl"]=extras["verify_ssl"]# optionalif'timeout'inextras:config["timeout"]=extras["timeout"]# optionalreturnconfig
[docs]classJobStatus(Enum):"""The job status string."""