Source code for airflow.providers.google.cloud.transfers.mysql_to_gcs
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""MySQL to GCS operator."""from__future__importannotationsimportbase64fromdatetimeimportdate,datetime,time,timedeltafromdecimalimportDecimalfromMySQLdb.constantsimportFIELD_TYPEfromairflow.providers.google.cloud.transfers.sql_to_gcsimportBaseSQLToGCSOperatorfromairflow.providers.mysql.hooks.mysqlimportMySqlHook
[docs]classMySQLToGCSOperator(BaseSQLToGCSOperator):""" Copy data from MySQL to Google Cloud Storage in JSON, CSV or Parquet format. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:MySQLToGCSOperator` :param mysql_conn_id: Reference to :ref:`mysql connection id <howto/connection:mysql>`. :param ensure_utc: Ensure TIMESTAMP columns exported as UTC. If set to `False`, TIMESTAMP columns will be exported using the MySQL server's default timezone. """
[docs]defquery(self):"""Query mysql and returns a cursor to the results."""mysql=MySqlHook(mysql_conn_id=self.mysql_conn_id)conn=mysql.get_conn()cursor=conn.cursor()ifself.ensure_utc:# Ensure TIMESTAMP results are in UTCtz_query="SET time_zone = '+00:00'"self.log.info("Executing: %s",tz_query)cursor.execute(tz_query)self.log.info("Executing: %s",self.sql)cursor.execute(self.sql)returncursor
[docs]deffield_to_bigquery(self,field)->dict[str,str]:field_type=self.type_map.get(field[1],"STRING")# Always allow TIMESTAMP to be nullable. MySQLdb returns None types# for required fields because some MySQL timestamps can't be# represented by Python's datetime (e.g. 0000-00-00 00:00:00).field_mode="NULLABLE"iffield[6]orfield_type=="TIMESTAMP"else"REQUIRED"return{"name":field[0],"type":field_type,"mode":field_mode,}
[docs]defconvert_type(self,value,schema_type:str,**kwargs):""" Take a value from MySQLdb and convert it to a value safe for JSON/Google Cloud Storage/BigQuery. * Datetimes are converted to `str(value)` (`datetime.isoformat(' ')`) strings. * Times are converted to `str((datetime.min + value).time())` strings. * Decimals are converted to floats. * Dates are converted to ISO formatted strings if given schema_type is DATE, or `datetime.isoformat(' ')` strings otherwise. * Binary type fields are converted to integer if given schema_type is INTEGER, or encoded with base64 otherwise. Imported BYTES data must be base64-encoded according to BigQuery documentation: https://cloud.google.com/bigquery/data-types :param value: MySQLdb column value :param schema_type: BigQuery data type """ifvalueisNone:returnvalueifisinstance(value,datetime):value=str(value)elifisinstance(value,timedelta):value=str((datetime.min+value).time())elifisinstance(value,Decimal):value=float(value)elifisinstance(value,date):ifschema_type=="DATE":value=value.isoformat()else:value=str(datetime.combine(value,time.min))elifisinstance(value,bytes):ifschema_type=="INTEGER":value=int.from_bytes(value,"big")else:value=base64.standard_b64encode(value).decode("ascii")returnvalue