# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from apiclient.discovery import build
from apiclient.http import MediaFileUpload
from googleapiclient import errors
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
from airflow.exceptions import AirflowException
import gzip as gz
import shutil
import re
import os
[docs]class GoogleCloudStorageHook(GoogleCloudBaseHook):
Interact with Google Cloud Storage. This hook uses the Google Cloud Platform
def __init__(self,
super(GoogleCloudStorageHook, self).__init__(google_cloud_storage_conn_id,
[docs] def get_conn(self):
Returns a Google Cloud Storage service object.
http_authorized = self._authorize()
return build(
'storage', 'v1', http=http_authorized, cache_discovery=False)
# pylint:disable=redefined-builtin
[docs] def copy(self, source_bucket, source_object, destination_bucket=None,
Copies an object from a bucket to another, with renaming if requested.
destination_bucket or destination_object can be omitted, in which case
source bucket/object is used, but not both.
:param source_bucket: The bucket of the object to copy from.
:type source_bucket: string
:param source_object: The object to copy.
:type source_object: string
:param destination_bucket: The destination of the object to copied to.
Can be omitted; then the same bucket is used.
:type destination_bucket: string
:param destination_object: The (renamed) path of the object if given.
Can be omitted; then the same name is used.
destination_bucket = destination_bucket or source_bucket
destination_object = destination_object or source_object
if source_bucket == destination_bucket and \
source_object == destination_object:
raise ValueError(
'Either source/destination bucket or source/destination object '
'must be different, not both the same: bucket=%s, object=%s' %
(source_bucket, source_object))
if not source_bucket or not source_object:
raise ValueError('source_bucket and source_object cannot be empty.')
service = self.get_conn()
service \
.objects() \
.copy(sourceBucket=source_bucket, sourceObject=source_object,
destinationObject=destination_object, body='') \
return True
except errors.HttpError as ex:
if ex.resp['status'] == '404':
return False
[docs] def rewrite(self, source_bucket, source_object, destination_bucket,
Has the same functionality as copy, except that will work on files
over 5 TB, as well as when copying between locations and/or storage
destination_object can be omitted, in which case source_object is used.
:param source_bucket: The bucket of the object to copy from.
:type source_bucket: string
:param source_object: The object to copy.
:type source_object: string
:param destination_bucket: The destination of the object to copied to.
:type destination_bucket: string
:param destination_object: The (renamed) path of the object if given.
Can be omitted; then the same name is used.
destination_object = destination_object or source_object
if (source_bucket == destination_bucket and
source_object == destination_object):
raise ValueError(
'Either source/destination bucket or source/destination object '
'must be different, not both the same: bucket=%s, object=%s' %
(source_bucket, source_object))
if not source_bucket or not source_object:
raise ValueError('source_bucket and source_object cannot be empty.')
service = self.get_conn()
request_count = 1
result = service.objects() \
.rewrite(sourceBucket=source_bucket, sourceObject=source_object,
destinationObject=destination_object, body='') \
self.log.info('Rewrite request #%s: %s', request_count, result)
while not result['done']:
request_count += 1
result = service.objects() \
.rewrite(sourceBucket=source_bucket, sourceObject=source_object,
rewriteToken=result['rewriteToken'], body='') \
self.log.info('Rewrite request #%s: %s', request_count, result)
return True
except errors.HttpError as ex:
if ex.resp['status'] == '404':
return False
# pylint:disable=redefined-builtin
[docs] def download(self, bucket, object, filename=None):
Get a file from Google Cloud Storage.
:param bucket: The bucket to fetch from.
:type bucket: string
:param object: The object to fetch.
:type object: string
:param filename: If set, a local file path where the file should be written to.
:type filename: string
service = self.get_conn()
downloaded_file_bytes = service \
.objects() \
.get_media(bucket=bucket, object=object) \
# Write the file to local file path, if requested.
if filename:
write_argument = 'wb' if isinstance(downloaded_file_bytes, bytes) else 'w'
with open(filename, write_argument) as file_fd:
return downloaded_file_bytes
# pylint:disable=redefined-builtin
[docs] def upload(self, bucket, object, filename,
mime_type='application/octet-stream', gzip=False):
Uploads a local file to Google Cloud Storage.
:param bucket: The bucket to upload to.
:type bucket: string
:param object: The object name to set when uploading the local file.
:type object: string
:param filename: The local file path to the file to be uploaded.
:type filename: string
:param mime_type: The MIME type to set when uploading the file.
:type mime_type: str
:param gzip: Option to compress file for upload
:type gzip: bool
service = self.get_conn()
if gzip:
filename_gz = filename + '.gz'
with open(filename, 'rb') as f_in:
with gz.open(filename_gz, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
filename = filename_gz
media = MediaFileUpload(filename, mime_type)
service \
.objects() \
.insert(bucket=bucket, name=object, media_body=media) \
# Clean up gzip file
if gzip:
return True
except errors.HttpError as ex:
if ex.resp['status'] == '404':
return False
# pylint:disable=redefined-builtin
[docs] def exists(self, bucket, object):
Checks for the existence of a file in Google Cloud Storage.
:param bucket: The Google cloud storage bucket where the object is.
:type bucket: string
:param object: The name of the object to check in the Google cloud
storage bucket.
:type object: string
service = self.get_conn()
service \
.objects() \
.get(bucket=bucket, object=object) \
return True
except errors.HttpError as ex:
if ex.resp['status'] == '404':
return False
# pylint:disable=redefined-builtin
[docs] def is_updated_after(self, bucket, object, ts):
Checks if an object is updated in Google Cloud Storage.
:param bucket: The Google cloud storage bucket where the object is.
:type bucket: string
:param object: The name of the object to check in the Google cloud
storage bucket.
:type object: string
:param ts: The timestamp to check against.
:type ts: datetime
service = self.get_conn()
response = (service
.get(bucket=bucket, object=object)
if 'updated' in response:
import dateutil.parser
import dateutil.tz
if not ts.tzinfo:
ts = ts.replace(tzinfo=dateutil.tz.tzutc())
updated = dateutil.parser.parse(response['updated'])
self.log.info("Verify object date: %s > %s", updated, ts)
if updated > ts:
return True
except errors.HttpError as ex:
if ex.resp['status'] != '404':
return False
[docs] def delete(self, bucket, object, generation=None):
Delete an object if versioning is not enabled for the bucket, or if generation
parameter is used.
:param bucket: name of the bucket, where the object resides
:type bucket: string
:param object: name of the object to delete
:type object: string
:param generation: if present, permanently delete the object of this generation
:type generation: string
:return: True if succeeded
service = self.get_conn()
service \
.objects() \
.delete(bucket=bucket, object=object, generation=generation) \
return True
except errors.HttpError as ex:
if ex.resp['status'] == '404':
return False
[docs] def list(self, bucket, versions=None, maxResults=None, prefix=None, delimiter=None):
List all objects from the bucket with the give string prefix in name
:param bucket: bucket name
:type bucket: string
:param versions: if true, list all versions of the objects
:type versions: boolean
:param maxResults: max count of items to return in a single page of responses
:type maxResults: integer
:param prefix: prefix string which filters objects whose name begin with
this prefix
:type prefix: string
:param delimiter: filters objects based on the delimiter (for e.g '.csv')
:type delimiter: string
:return: a stream of object names matching the filtering criteria
service = self.get_conn()
ids = list()
pageToken = None
response = service.objects().list(
if 'prefixes' not in response:
if 'items' not in response:
self.log.info("No items found for prefix: %s", prefix)
for item in response['items']:
if item and 'name' in item:
for item in response['prefixes']:
if 'nextPageToken' not in response:
# no further pages of results, so stop the loop
pageToken = response['nextPageToken']
if not pageToken:
# empty next page token
return ids
[docs] def get_size(self, bucket, object):
Gets the size of a file in Google Cloud Storage.
:param bucket: The Google cloud storage bucket where the object is.
:type bucket: string
:param object: The name of the object to check in the Google cloud storage bucket.
:type object: string
self.log.info('Checking the file size of object: %s in bucket: %s',
service = self.get_conn()
response = service.objects().get(
if 'name' in response and response['name'][-1] != '/':
# Remove Directories & Just check size of files
size = response['size']
self.log.info('The file size of %s is %s bytes.', object, size)
return size
raise ValueError('Object is not a file')
except errors.HttpError as ex:
if ex.resp['status'] == '404':
raise ValueError('Object Not Found')
[docs] def get_crc32c(self, bucket, object):
Gets the CRC32c checksum of an object in Google Cloud Storage.
:param bucket: The Google cloud storage bucket where the object is.
:type bucket: string
:param object: The name of the object to check in the Google cloud
storage bucket.
:type object: string
self.log.info('Retrieving the crc32c checksum of '
'object: %s in bucket: %s', object, bucket)
service = self.get_conn()
response = service.objects().get(
crc32c = response['crc32c']
self.log.info('The crc32c checksum of %s is %s', object, crc32c)
return crc32c
except errors.HttpError as ex:
if ex.resp['status'] == '404':
raise ValueError('Object Not Found')
[docs] def get_md5hash(self, bucket, object):
Gets the MD5 hash of an object in Google Cloud Storage.
:param bucket: The Google cloud storage bucket where the object is.
:type bucket: string
:param object: The name of the object to check in the Google cloud
storage bucket.
:type object: string
self.log.info('Retrieving the MD5 hash of '
'object: %s in bucket: %s', object, bucket)
service = self.get_conn()
response = service.objects().get(
md5hash = response['md5Hash']
self.log.info('The md5Hash of %s is %s', object, md5hash)
return md5hash
except errors.HttpError as ex:
if ex.resp['status'] == '404':
raise ValueError('Object Not Found')
[docs] def create_bucket(self,
Creates a new bucket. Google Cloud Storage uses a flat namespace, so
you can't create a bucket with a name that is already in use.
.. seealso::
For more information, see Bucket Naming Guidelines:
:param bucket_name: The name of the bucket.
:type bucket_name: string
:param storage_class: This defines how objects in the bucket are stored
and determines the SLA and the cost of storage. Values include
If this value is not specified when the bucket is
created, it will default to STANDARD.
:type storage_class: string
:param location: The location of the bucket.
Object data for objects in the bucket resides in physical storage
within this region. Defaults to US.
.. seealso::
:type location: string
:param project_id: The ID of the GCP Project.
:type project_id: string
:param labels: User-provided labels, in key/value pairs.
:type labels: dict
:return: If successful, it returns the ``id`` of the bucket.
project_id = project_id if project_id is not None else self.project_id
storage_classes = [
'STANDARD', # alias for MULTI_REGIONAL/REGIONAL, based on location
self.log.info('Creating Bucket: %s; Location: %s; Storage Class: %s',
bucket_name, location, storage_class)
if storage_class not in storage_classes:
raise ValueError(
'Invalid value ({}) passed to storage_class. Value should be '
'one of {}'.format(storage_class, storage_classes))
if not re.match('[a-zA-Z0-9]+', bucket_name[0]):
raise ValueError('Bucket names must start with a number or letter.')
if not re.match('[a-zA-Z0-9]+', bucket_name[-1]):
raise ValueError('Bucket names must end with a number or letter.')
service = self.get_conn()
bucket_resource = {
'name': bucket_name,
'location': location,
'storageClass': storage_class
self.log.info('The Default Project ID is %s', self.project_id)
if labels is not None:
bucket_resource['labels'] = labels
response = service.buckets().insert(
self.log.info('Bucket: %s created successfully.', bucket_name)
return response['id']
except errors.HttpError as ex:
raise AirflowException(
'Bucket creation failed. Error was: {}'.format(ex.content)
def _parse_gcs_url(gsurl):
Given a Google Cloud Storage URL (gs://<bucket>/<blob>), returns a
tuple containing the corresponding bucket and blob.
# Python 3
from urllib.parse import urlparse
# Python 2
except ImportError:
from urlparse import urlparse
parsed_url = urlparse(gsurl)
if not parsed_url.netloc:
raise AirflowException('Please provide a bucket name')
bucket = parsed_url.netloc
# Remove leading '/' but NOT trailing one
blob = parsed_url.path.lstrip('/')
return bucket, blob