Source code for airflow.providers.google.cloud.transfers.oracle_to_gcs
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportbase64importcalendarfromdatetimeimportdate,datetime,timedeltafromdecimalimportDecimalimportoracledbfromairflow.providers.google.cloud.transfers.sql_to_gcsimportBaseSQLToGCSOperatorfromairflow.providers.oracle.hooks.oracleimportOracleHook
[docs]classOracleToGCSOperator(BaseSQLToGCSOperator):"""Copy data from Oracle to Google Cloud Storage in JSON, CSV or Parquet format. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:OracleToGCSOperator` :param oracle_conn_id: Reference to a specific :ref:`Oracle hook <howto/connection:oracle>`. :param ensure_utc: Ensure TIMESTAMP columns exported as UTC. If set to `False`, TIMESTAMP columns will be exported using the Oracle server's default timezone. """
[docs]defquery(self):"""Queries Oracle and returns a cursor to the results."""oracle=OracleHook(oracle_conn_id=self.oracle_conn_id)conn=oracle.get_conn()cursor=conn.cursor()ifself.ensure_utc:# Ensure TIMESTAMP results are in UTCtz_query="SET time_zone = '+00:00'"self.log.info("Executing: %s",tz_query)cursor.execute(tz_query)self.log.info("Executing: %s",self.sql)cursor.execute(self.sql)returncursor
[docs]defconvert_type(self,value,schema_type,**kwargs):""" Takes a value from Oracle db, and converts it to a value that's safe for JSON/Google Cloud Storage/BigQuery. * Datetimes are converted to UTC seconds. * Decimals are converted to floats. * Dates are converted to ISO formatted string if given schema_type is DATE, or UTC seconds otherwise. * Binary type fields are converted to integer if given schema_type is INTEGER, or encoded with base64 otherwise. Imported BYTES data must be base64-encoded according to BigQuery documentation: https://cloud.google.com/bigquery/data-types :param value: Oracle db column value :param schema_type: BigQuery data type """ifvalueisNone:returnvalueifisinstance(value,datetime):value=calendar.timegm(value.timetuple())elifisinstance(value,timedelta):value=value.total_seconds()elifisinstance(value,Decimal):value=float(value)elifisinstance(value,date):ifschema_type=="DATE":value=value.isoformat()else:value=calendar.timegm(value.timetuple())elifisinstance(value,bytes):ifschema_type=="INTEGER":value=int.from_bytes(value,"big")else:value=base64.standard_b64encode(value).decode("ascii")returnvalue