Source code for airflow.providers.google.cloud.triggers.bigquery
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportasynciofromtypingimportAny,AsyncIterator,SupportsAbsfromaiohttpimportClientSessionfromaiohttp.client_exceptionsimportClientResponseErrorfromairflow.providers.google.cloud.hooks.bigqueryimportBigQueryAsyncHook,BigQueryTableAsyncHookfromairflow.triggers.baseimportBaseTrigger,TriggerEvent
[docs]classBigQueryInsertJobTrigger(BaseTrigger):""" BigQueryInsertJobTrigger run on the trigger worker to perform insert operation :param conn_id: Reference to google cloud connection id :param job_id: The ID of the job. It will be suffixed with hash of job configuration :param project_id: Google Cloud Project where the job is running :param dataset_id: The dataset ID of the requested table. (templated) :param table_id: The table ID of the requested table. (templated) :param poll_interval: polling period in seconds to check for the status """def__init__(self,conn_id:str,job_id:str|None,project_id:str|None,dataset_id:str|None=None,table_id:str|None=None,poll_interval:float=4.0,):super().__init__()self.log.info("Using the connection %s .",conn_id)self.conn_id=conn_idself.job_id=job_idself._job_conn=Noneself.dataset_id=dataset_idself.project_id=project_idself.table_id=table_idself.poll_interval=poll_interval
[docs]defserialize(self)->tuple[str,dict[str,Any]]:"""Serializes BigQueryInsertJobTrigger arguments and classpath."""return("airflow.providers.google.cloud.triggers.bigquery.BigQueryInsertJobTrigger",{"conn_id":self.conn_id,"job_id":self.job_id,"dataset_id":self.dataset_id,"project_id":self.project_id,"table_id":self.table_id,"poll_interval":self.poll_interval,
},)
[docs]asyncdefrun(self)->AsyncIterator["TriggerEvent"]:# type: ignore[override]"""Gets current job execution status and yields a TriggerEvent"""hook=self._get_async_hook()whileTrue:try:# Poll for job execution statusresponse_from_hook=awaithook.get_job_status(job_id=self.job_id,project_id=self.project_id)self.log.debug("Response from hook: %s",response_from_hook)ifresponse_from_hook=="success":yieldTriggerEvent({"job_id":self.job_id,"status":"success","message":"Job completed",})elifresponse_from_hook=="pending":self.log.info("Query is still running...")self.log.info("Sleeping for %s seconds.",self.poll_interval)awaitasyncio.sleep(self.poll_interval)else:yieldTriggerEvent({"status":"error","message":response_from_hook})exceptExceptionase:self.log.exception("Exception occurred while checking for query completion")yieldTriggerEvent({"status":"error","message":str(e)})
[docs]classBigQueryCheckTrigger(BigQueryInsertJobTrigger):"""BigQueryCheckTrigger run on the trigger worker"""
[docs]defserialize(self)->tuple[str,dict[str,Any]]:"""Serializes BigQueryCheckTrigger arguments and classpath."""return("airflow.providers.google.cloud.triggers.bigquery.BigQueryCheckTrigger",{"conn_id":self.conn_id,"job_id":self.job_id,"dataset_id":self.dataset_id,"project_id":self.project_id,"table_id":self.table_id,"poll_interval":self.poll_interval,
},)
[docs]asyncdefrun(self)->AsyncIterator["TriggerEvent"]:# type: ignore[override]"""Gets current job execution status and yields a TriggerEvent"""hook=self._get_async_hook()whileTrue:try:# Poll for job execution statusresponse_from_hook=awaithook.get_job_status(job_id=self.job_id,project_id=self.project_id)ifresponse_from_hook=="success":query_results=awaithook.get_job_output(job_id=self.job_id,project_id=self.project_id)records=hook.get_records(query_results)# If empty list, then no records are availableifnotrecords:yieldTriggerEvent({"status":"success","records":None,})else:# Extract only first record from the query resultsfirst_record=records.pop(0)yieldTriggerEvent({"status":"success","records":first_record,})returnelifresponse_from_hook=="pending":self.log.info("Query is still running...")self.log.info("Sleeping for %s seconds.",self.poll_interval)awaitasyncio.sleep(self.poll_interval)else:yieldTriggerEvent({"status":"error","message":response_from_hook})exceptExceptionase:self.log.exception("Exception occurred while checking for query completion")yieldTriggerEvent({"status":"error","message":str(e)})
[docs]classBigQueryGetDataTrigger(BigQueryInsertJobTrigger):"""BigQueryGetDataTrigger run on the trigger worker, inherits from BigQueryInsertJobTrigger class"""
[docs]defserialize(self)->tuple[str,dict[str,Any]]:"""Serializes BigQueryInsertJobTrigger arguments and classpath."""return("airflow.providers.google.cloud.triggers.bigquery.BigQueryGetDataTrigger",{"conn_id":self.conn_id,"job_id":self.job_id,"dataset_id":self.dataset_id,"project_id":self.project_id,"table_id":self.table_id,"poll_interval":self.poll_interval,
},)
[docs]asyncdefrun(self)->AsyncIterator["TriggerEvent"]:# type: ignore[override]"""Gets current job execution status and yields a TriggerEvent with response data"""hook=self._get_async_hook()whileTrue:try:# Poll for job execution statusresponse_from_hook=awaithook.get_job_status(job_id=self.job_id,project_id=self.project_id)ifresponse_from_hook=="success":query_results=awaithook.get_job_output(job_id=self.job_id,project_id=self.project_id)records=hook.get_records(query_results)self.log.debug("Response from hook: %s",response_from_hook)yieldTriggerEvent({"status":"success","message":response_from_hook,"records":records,})returnelifresponse_from_hook=="pending":self.log.info("Query is still running...")self.log.info("Sleeping for %s seconds.",self.poll_interval)awaitasyncio.sleep(self.poll_interval)else:yieldTriggerEvent({"status":"error","message":response_from_hook})returnexceptExceptionase:self.log.exception("Exception occurred while checking for query completion")yieldTriggerEvent({"status":"error","message":str(e)})return
[docs]classBigQueryIntervalCheckTrigger(BigQueryInsertJobTrigger):""" BigQueryIntervalCheckTrigger run on the trigger worker, inherits from BigQueryInsertJobTrigger class :param conn_id: Reference to google cloud connection id :param first_job_id: The ID of the job 1 performed :param second_job_id: The ID of the job 2 performed :param project_id: Google Cloud Project where the job is running :param dataset_id: The dataset ID of the requested table. (templated) :param table: table name :param metrics_thresholds: dictionary of ratios indexed by metrics :param date_filter_column: column name :param days_back: number of days between ds and the ds we want to check against :param ratio_formula: ration formula :param ignore_zero: boolean value to consider zero or not :param table_id: The table ID of the requested table. (templated) :param poll_interval: polling period in seconds to check for the status """def__init__(self,conn_id:str,first_job_id:str,second_job_id:str,project_id:str|None,table:str,metrics_thresholds:dict[str,int],date_filter_column:str|None="ds",days_back:SupportsAbs[int]=-7,ratio_formula:str="max_over_min",ignore_zero:bool=True,dataset_id:str|None=None,table_id:str|None=None,poll_interval:float=4.0,):super().__init__(conn_id=conn_id,job_id=first_job_id,project_id=project_id,dataset_id=dataset_id,table_id=table_id,poll_interval=poll_interval,)self.conn_id=conn_idself.first_job_id=first_job_idself.second_job_id=second_job_idself.project_id=project_idself.table=tableself.metrics_thresholds=metrics_thresholdsself.date_filter_column=date_filter_columnself.days_back=days_backself.ratio_formula=ratio_formulaself.ignore_zero=ignore_zero
[docs]defserialize(self)->tuple[str,dict[str,Any]]:"""Serializes BigQueryCheckTrigger arguments and classpath."""return("airflow.providers.google.cloud.triggers.bigquery.BigQueryIntervalCheckTrigger",{"conn_id":self.conn_id,"first_job_id":self.first_job_id,"second_job_id":self.second_job_id,"project_id":self.project_id,"table":self.table,"metrics_thresholds":self.metrics_thresholds,"date_filter_column":self.date_filter_column,"days_back":self.days_back,"ratio_formula":self.ratio_formula,"ignore_zero":self.ignore_zero,
},)
[docs]asyncdefrun(self)->AsyncIterator["TriggerEvent"]:# type: ignore[override]"""Gets current job execution status and yields a TriggerEvent"""hook=self._get_async_hook()whileTrue:try:first_job_response_from_hook=awaithook.get_job_status(job_id=self.first_job_id,project_id=self.project_id)second_job_response_from_hook=awaithook.get_job_status(job_id=self.second_job_id,project_id=self.project_id)iffirst_job_response_from_hook=="success"andsecond_job_response_from_hook=="success":first_query_results=awaithook.get_job_output(job_id=self.first_job_id,project_id=self.project_id)second_query_results=awaithook.get_job_output(job_id=self.second_job_id,project_id=self.project_id)first_records=hook.get_records(first_query_results)second_records=hook.get_records(second_query_results)# If empty list, then no records are availableifnotfirst_records:first_job_row:str|None=Noneelse:# Extract only first record from the query resultsfirst_job_row=first_records.pop(0)# If empty list, then no records are availableifnotsecond_records:second_job_row:str|None=Noneelse:# Extract only first record from the query resultssecond_job_row=second_records.pop(0)hook.interval_check(first_job_row,second_job_row,self.metrics_thresholds,self.ignore_zero,self.ratio_formula,)yieldTriggerEvent({"status":"success","message":"Job completed","first_row_data":first_job_row,"second_row_data":second_job_row,})returneliffirst_job_response_from_hook=="pending"orsecond_job_response_from_hook=="pending":self.log.info("Query is still running...")self.log.info("Sleeping for %s seconds.",self.poll_interval)awaitasyncio.sleep(self.poll_interval)else:yieldTriggerEvent({"status":"error","message":second_job_response_from_hook,"data":None})returnexceptExceptionase:self.log.exception("Exception occurred while checking for query completion")yieldTriggerEvent({"status":"error","message":str(e)})return
[docs]classBigQueryValueCheckTrigger(BigQueryInsertJobTrigger):""" BigQueryValueCheckTrigger run on the trigger worker, inherits from BigQueryInsertJobTrigger class :param conn_id: Reference to google cloud connection id :param sql: the sql to be executed :param pass_value: pass value :param job_id: The ID of the job :param project_id: Google Cloud Project where the job is running :param tolerance: certain metrics for tolerance :param dataset_id: The dataset ID of the requested table. (templated) :param table_id: The table ID of the requested table. (templated) :param poll_interval: polling period in seconds to check for the status """def__init__(self,conn_id:str,sql:str,pass_value:int|float|str,job_id:str|None,project_id:str|None,tolerance:Any=None,dataset_id:str|None=None,table_id:str|None=None,poll_interval:float=4.0,):super().__init__(conn_id=conn_id,job_id=job_id,project_id=project_id,dataset_id=dataset_id,table_id=table_id,poll_interval=poll_interval,)self.sql=sqlself.pass_value=pass_valueself.tolerance=tolerance
[docs]defserialize(self)->tuple[str,dict[str,Any]]:"""Serializes BigQueryValueCheckTrigger arguments and classpath."""return("airflow.providers.google.cloud.triggers.bigquery.BigQueryValueCheckTrigger",{"conn_id":self.conn_id,"pass_value":self.pass_value,"job_id":self.job_id,"dataset_id":self.dataset_id,"project_id":self.project_id,"sql":self.sql,"table_id":self.table_id,"tolerance":self.tolerance,"poll_interval":self.poll_interval,
},)
[docs]asyncdefrun(self)->AsyncIterator["TriggerEvent"]:# type: ignore[override]"""Gets current job execution status and yields a TriggerEvent"""hook=self._get_async_hook()whileTrue:try:# Poll for job execution statusresponse_from_hook=awaithook.get_job_status(job_id=self.job_id,project_id=self.project_id)ifresponse_from_hook=="success":query_results=awaithook.get_job_output(job_id=self.job_id,project_id=self.project_id)records=hook.get_records(query_results)records=records.pop(0)ifrecordselseNonehook.value_check(self.sql,self.pass_value,records,self.tolerance)yieldTriggerEvent({"status":"success","message":"Job completed","records":records})returnelifresponse_from_hook=="pending":self.log.info("Query is still running...")self.log.info("Sleeping for %s seconds.",self.poll_interval)awaitasyncio.sleep(self.poll_interval)else:yieldTriggerEvent({"status":"error","message":response_from_hook,"records":None})returnexceptExceptionase:self.log.exception("Exception occurred while checking for query completion")yieldTriggerEvent({"status":"error","message":str(e)})return
[docs]classBigQueryTableExistenceTrigger(BaseTrigger):""" Initialize the BigQuery Table Existence Trigger with needed parameters :param project_id: Google Cloud Project where the job is running :param dataset_id: The dataset ID of the requested table. :param table_id: The table ID of the requested table. :param gcp_conn_id: Reference to google cloud connection id :param hook_params: params for hook :param poll_interval: polling period in seconds to check for the status """def__init__(self,project_id:str,dataset_id:str,table_id:str,gcp_conn_id:str,hook_params:dict[str,Any],poll_interval:float=4.0,):self.dataset_id=dataset_idself.project_id=project_idself.table_id=table_idself.gcp_conn_id:str=gcp_conn_idself.poll_interval=poll_intervalself.hook_params=hook_params
[docs]defserialize(self)->tuple[str,dict[str,Any]]:"""Serializes BigQueryTableExistenceTrigger arguments and classpath."""return("airflow.providers.google.cloud.triggers.bigquery.BigQueryTableExistenceTrigger",{"dataset_id":self.dataset_id,"project_id":self.project_id,"table_id":self.table_id,"gcp_conn_id":self.gcp_conn_id,"poll_interval":self.poll_interval,"hook_params":self.hook_params,
[docs]asyncdefrun(self)->AsyncIterator["TriggerEvent"]:# type: ignore[override]"""Will run until the table exists in the Google Big Query."""whileTrue:try:hook=self._get_async_hook()response=awaitself._table_exists(hook=hook,dataset=self.dataset_id,table_id=self.table_id,project_id=self.project_id)ifresponse:yieldTriggerEvent({"status":"success","message":"success"})returnawaitasyncio.sleep(self.poll_interval)exceptExceptionase:self.log.exception("Exception occurred while checking for Table existence")yieldTriggerEvent({"status":"error","message":str(e)})return
asyncdef_table_exists(self,hook:BigQueryTableAsyncHook,dataset:str,table_id:str,project_id:str)->bool:""" Create client session and make call to BigQueryTableAsyncHook and check for the table in Google Big Query. :param hook: BigQueryTableAsyncHook Hook class :param dataset: The name of the dataset in which to look for the table storage bucket. :param table_id: The name of the table to check the existence of. :param project_id: The Google cloud project in which to look for the table. The connection supplied to the hook must provide access to the specified project. """asyncwithClientSession()assession:try:client=awaithook.get_table_client(dataset=dataset,table_id=table_id,project_id=project_id,session=session)response=awaitclient.get()returnTrueifresponseelseFalseexceptClientResponseErroraserr:iferr.status==404:returnFalseraiseerr