Integration

Reverse Proxy

Airflow can be set up behind a reverse proxy, with the ability to set its endpoint with great flexibility.

For example, you can configure your reverse proxy to get:

https://lab.mycompany.com/myorg/airflow/

To do so, you need to set the following setting in your airflow.cfg:

base_url = http://my_host/myorg/airflow

Additionally if you use Celery Executor, you can get Flower in /myorg/flower with:

flower_url_prefix = /myorg/flower

Your reverse proxy (ex: nginx) should be configured as follow:

  • pass the url and http header as it for the Airflow webserver, without any rewrite, for example:

    server {
      listen 80;
      server_name lab.mycompany.com;
    
      location /myorg/airflow/ {
          proxy_pass http://localhost:8080;
          proxy_set_header Host $host;
          proxy_redirect off;
          proxy_http_version 1.1;
          proxy_set_header Upgrade $http_upgrade;
          proxy_set_header Connection "upgrade";
      }
    }
    
  • rewrite the url for the flower endpoint:

    server {
        listen 80;
        server_name lab.mycompany.com;
    
        location /myorg/flower/ {
            rewrite ^/myorg/flower/(.*)$ /$1 break;  # remove prefix from http header
            proxy_pass http://localhost:5555;
            proxy_set_header Host $host;
            proxy_redirect off;
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection "upgrade";
        }
    }
    

Azure: Microsoft Azure

Airflow has limited support for Microsoft Azure: interfaces exist only for Azure Blob Storage and Azure Data Lake. Hook, Sensor and Operator for Blob Storage and Azure Data Lake Hook are in contrib section.

Azure Blob Storage

All classes communicate via the Window Azure Storage Blob protocol. Make sure that a Airflow connection of type wasb exists. Authorization can be done by supplying a login (=Storage account name) and password (=KEY), or login and SAS token in the extra field (see connection wasb_default for an example).

WasbBlobSensor

class airflow.contrib.sensors.wasb_sensor.WasbBlobSensor(container_name, blob_name, wasb_conn_id='wasb_default', check_options=None, *args, **kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Waits for a blob to arrive on Azure Blob Storage.

Parameters:
  • container_name (str) – Name of the container.
  • blob_name (str) – Name of the blob.
  • wasb_conn_id (str) – Reference to the wasb connection.
  • check_options (dict) – Optional keyword arguments that WasbHook.check_for_blob() takes.
poke(context)[source]

Function that the sensors defined while deriving this class should override.

WasbPrefixSensor

class airflow.contrib.sensors.wasb_sensor.WasbPrefixSensor(container_name, prefix, wasb_conn_id='wasb_default', check_options=None, *args, **kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Waits for blobs matching a prefix to arrive on Azure Blob Storage.

Parameters:
  • container_name (str) – Name of the container.
  • prefix (str) – Prefix of the blob.
  • wasb_conn_id (str) – Reference to the wasb connection.
  • check_options (dict) – Optional keyword arguments that WasbHook.check_for_prefix() takes.
poke(context)[source]

Function that the sensors defined while deriving this class should override.

FileToWasbOperator

class airflow.contrib.operators.file_to_wasb.FileToWasbOperator(file_path, container_name, blob_name, wasb_conn_id='wasb_default', load_options=None, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Uploads a file to Azure Blob Storage.

Parameters:
  • file_path (str) – Path to the file to load. (templated)
  • container_name (str) – Name of the container. (templated)
  • blob_name (str) – Name of the blob. (templated)
  • wasb_conn_id (str) – Reference to the wasb connection.
  • load_options (dict) – Optional keyword arguments that WasbHook.load_file() takes.
execute(context)[source]

Upload a file to Azure Blob Storage.

WasbHook

class airflow.contrib.hooks.wasb_hook.WasbHook(wasb_conn_id='wasb_default')[source]

Bases: airflow.hooks.base_hook.BaseHook

Interacts with Azure Blob Storage through the wasb:// protocol.

Additional options passed in the ‘extra’ field of the connection will be passed to the BlockBlockService() constructor. For example, authenticate using a SAS token by adding {“sas_token”: “YOUR_TOKEN”}.

Parameters:wasb_conn_id (str) – Reference to the wasb connection.
check_for_blob(container_name, blob_name, **kwargs)[source]

Check if a blob exists on Azure Blob Storage.

Parameters:
  • container_name (str) – Name of the container.
  • blob_name (str) – Name of the blob.
  • kwargs (object) – Optional keyword arguments that BlockBlobService.exists() takes.
Returns:

True if the blob exists, False otherwise.

:rtype bool

check_for_prefix(container_name, prefix, **kwargs)[source]

Check if a prefix exists on Azure Blob storage.

Parameters:
  • container_name (str) – Name of the container.
  • prefix (str) – Prefix of the blob.
  • kwargs (object) – Optional keyword arguments that BlockBlobService.list_blobs() takes.
Returns:

True if blobs matching the prefix exist, False otherwise.

:rtype bool

get_conn()[source]

Return the BlockBlobService object.

get_file(file_path, container_name, blob_name, **kwargs)[source]

Download a file from Azure Blob Storage.

Parameters:
  • file_path (str) – Path to the file to download.
  • container_name (str) – Name of the container.
  • blob_name (str) – Name of the blob.
  • kwargs (object) – Optional keyword arguments that BlockBlobService.create_blob_from_path() takes.
load_file(file_path, container_name, blob_name, **kwargs)[source]

Upload a file to Azure Blob Storage.

Parameters:
  • file_path (str) – Path to the file to load.
  • container_name (str) – Name of the container.
  • blob_name (str) – Name of the blob.
  • kwargs (object) – Optional keyword arguments that BlockBlobService.create_blob_from_path() takes.
load_string(string_data, container_name, blob_name, **kwargs)[source]

Upload a string to Azure Blob Storage.

Parameters:
  • string_data (str) – String to load.
  • container_name (str) – Name of the container.
  • blob_name (str) – Name of the blob.
  • kwargs (object) – Optional keyword arguments that BlockBlobService.create_blob_from_text() takes.
read_file(container_name, blob_name, **kwargs)[source]

Read a file from Azure Blob Storage and return as a string.

Parameters:
  • container_name (str) – Name of the container.
  • blob_name (str) – Name of the blob.
  • kwargs (object) – Optional keyword arguments that BlockBlobService.create_blob_from_path() takes.

Azure File Share

Cloud variant of a SMB file share. Make sure that a Airflow connection of type wasb exists. Authorization can be done by supplying a login (=Storage account name) and password (=Storage account key), or login and SAS token in the extra field (see connection wasb_default for an example).

AzureFileShareHook

class airflow.contrib.hooks.azure_fileshare_hook.AzureFileShareHook(wasb_conn_id='wasb_default')[source]

Bases: airflow.hooks.base_hook.BaseHook

Interacts with Azure FileShare Storage.

Additional options passed in the ‘extra’ field of the connection will be passed to the FileService() constructor.

Parameters:wasb_conn_id (str) – Reference to the wasb connection.
check_for_directory(share_name, directory_name, **kwargs)[source]

Check if a directory exists on Azure File Share.

Parameters:
  • share_name (str) – Name of the share.
  • directory_name (str) – Name of the directory.
  • kwargs (object) – Optional keyword arguments that FileService.exists() takes.
Returns:

True if the file exists, False otherwise.

:rtype bool

check_for_file(share_name, directory_name, file_name, **kwargs)[source]

Check if a file exists on Azure File Share.

Parameters:
  • share_name (str) – Name of the share.
  • directory_name (str) – Name of the directory.
  • file_name (str) – Name of the file.
  • kwargs (object) – Optional keyword arguments that FileService.exists() takes.
Returns:

True if the file exists, False otherwise.

:rtype bool

create_directory(share_name, directory_name, **kwargs)[source]

Create a new direcotry on a Azure File Share.

Parameters:
  • share_name (str) – Name of the share.
  • directory_name (str) – Name of the directory.
  • kwargs (object) – Optional keyword arguments that FileService.create_directory() takes.
Returns:

A list of files and directories

:rtype list

get_conn()[source]

Return the FileService object.

get_file(file_path, share_name, directory_name, file_name, **kwargs)[source]

Download a file from Azure File Share.

Parameters:
  • file_path (str) – Where to store the file.
  • share_name (str) – Name of the share.
  • directory_name (str) – Name of the directory.
  • file_name (str) – Name of the file.
  • kwargs (object) – Optional keyword arguments that FileService.get_file_to_path() takes.
get_file_to_stream(stream, share_name, directory_name, file_name, **kwargs)[source]

Download a file from Azure File Share.

Parameters:
  • stream (file-like object) – A filehandle to store the file to.
  • share_name (str) – Name of the share.
  • directory_name (str) – Name of the directory.
  • file_name (str) – Name of the file.
  • kwargs (object) – Optional keyword arguments that FileService.get_file_to_stream() takes.
list_directories_and_files(share_name, directory_name=None, **kwargs)[source]

Return the list of directories and files stored on a Azure File Share.

Parameters:
  • share_name (str) – Name of the share.
  • directory_name (str) – Name of the directory.
  • kwargs (object) – Optional keyword arguments that FileService.list_directories_and_files() takes.
Returns:

A list of files and directories

:rtype list

load_file(file_path, share_name, directory_name, file_name, **kwargs)[source]

Upload a file to Azure File Share.

Parameters:
  • file_path (str) – Path to the file to load.
  • share_name (str) – Name of the share.
  • directory_name (str) – Name of the directory.
  • file_name (str) – Name of the file.
  • kwargs (object) – Optional keyword arguments that FileService.create_file_from_path() takes.
load_stream(stream, share_name, directory_name, file_name, count, **kwargs)[source]

Upload a stream to Azure File Share.

Parameters:
  • stream (file-like) – Opened file/stream to upload as the file content.
  • share_name (str) – Name of the share.
  • directory_name (str) – Name of the directory.
  • file_name (str) – Name of the file.
  • count (int) – Size of the stream in bytes
  • kwargs (object) – Optional keyword arguments that FileService.create_file_from_stream() takes.
load_string(string_data, share_name, directory_name, file_name, **kwargs)[source]

Upload a string to Azure File Share.

Parameters:
  • string_data (str) – String to load.
  • share_name (str) – Name of the share.
  • directory_name (str) – Name of the directory.
  • file_name (str) – Name of the file.
  • kwargs (object) – Optional keyword arguments that FileService.create_file_from_text() takes.

Logging

Airflow can be configured to read and write task logs in Azure Blob Storage. See Writing Logs to Azure Blob Storage.

Azure Data Lake

AzureDataLakeHook communicates via a REST API compatible with WebHDFS. Make sure that a Airflow connection of type azure_data_lake exists. Authorization can be done by supplying a login (=Client ID), password (=Client Secret) and extra fields tenant (Tenant) and account_name (Account Name)

(see connection azure_data_lake_default for an example).

AzureDataLakeHook

class airflow.contrib.hooks.azure_data_lake_hook.AzureDataLakeHook(azure_data_lake_conn_id='azure_data_lake_default')[source]

Bases: airflow.hooks.base_hook.BaseHook

Interacts with Azure Data Lake.

Client ID and client secret should be in user and password parameters. Tenant and account name should be extra field as {“tenant”: “<TENANT>”, “account_name”: “ACCOUNT_NAME”}.

Parameters:azure_data_lake_conn_id (str) – Reference to the Azure Data Lake connection.
check_for_file(file_path)[source]

Check if a file exists on Azure Data Lake.

Parameters:file_path (str) – Path and name of the file.
Returns:True if the file exists, False otherwise.

:rtype bool

download_file(local_path, remote_path, nthreads=64, overwrite=True, buffersize=4194304, blocksize=4194304)[source]

Download a file from Azure Blob Storage.

Parameters:
  • local_path (str) – local path. If downloading a single file, will write to this specific file, unless it is an existing directory, in which case a file is created within it. If downloading multiple files, this is the root directory to write within. Will create directories as required.
  • remote_path (str) – remote path/globstring to use to find remote files. Recursive glob patterns using ** are not supported.
  • nthreads (int) – Number of threads to use. If None, uses the number of cores.
  • overwrite (bool) – Whether to forcibly overwrite existing files/directories. If False and remote path is a directory, will quit regardless if any files would be overwritten or not. If True, only matching filenames are actually overwritten.
  • buffersize (int) – int [2**22] Number of bytes for internal buffer. This block cannot be bigger than a chunk and cannot be smaller than a block.
  • blocksize (int) – int [2**22] Number of bytes for a block. Within each chunk, we write a smaller block for each API call. This block cannot be bigger than a chunk.
get_conn()[source]

Return a AzureDLFileSystem object.

upload_file(local_path, remote_path, nthreads=64, overwrite=True, buffersize=4194304, blocksize=4194304)[source]

Upload a file to Azure Data Lake.

Parameters:
  • local_path (str) – local path. Can be single file, directory (in which case, upload recursively) or glob pattern. Recursive glob patterns using ** are not supported.
  • remote_path (str) – Remote path to upload to; if multiple files, this is the dircetory root to write within.
  • nthreads (int) – Number of threads to use. If None, uses the number of cores.
  • overwrite (bool) – Whether to forcibly overwrite existing files/directories. If False and remote path is a directory, will quit regardless if any files would be overwritten or not. If True, only matching filenames are actually overwritten.
  • buffersize (int) – int [2**22] Number of bytes for internal buffer. This block cannot be bigger than a chunk and cannot be smaller than a block.
  • blocksize (int) – int [2**22] Number of bytes for a block. Within each chunk, we write a smaller block for each API call. This block cannot be bigger than a chunk.

AWS: Amazon Web Services

Airflow has extensive support for Amazon Web Services. But note that the Hooks, Sensors and Operators are in the contrib section.

AWS EMR

EmrAddStepsOperator

class airflow.contrib.operators.emr_add_steps_operator.EmrAddStepsOperator(job_flow_id, aws_conn_id='s3_default', steps=None, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

An operator that adds steps to an existing EMR job_flow.

Parameters:
  • job_flow_id – id of the JobFlow to add steps to. (templated)
  • aws_conn_id (str) – aws connection to uses
  • steps (list) – boto3 style steps to be added to the jobflow. (templated)

EmrCreateJobFlowOperator

class airflow.contrib.operators.emr_create_job_flow_operator.EmrCreateJobFlowOperator(aws_conn_id='s3_default', emr_conn_id='emr_default', job_flow_overrides=None, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Creates an EMR JobFlow, reading the config from the EMR connection. A dictionary of JobFlow overrides can be passed that override the config from the connection.

Parameters:
  • aws_conn_id (str) – aws connection to uses
  • emr_conn_id (str) – emr connection to use
  • job_flow_overrides – boto3 style arguments to override emr_connection extra. (templated)

EmrTerminateJobFlowOperator

class airflow.contrib.operators.emr_terminate_job_flow_operator.EmrTerminateJobFlowOperator(job_flow_id, aws_conn_id='s3_default', *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Operator to terminate EMR JobFlows.

Parameters:
  • job_flow_id – id of the JobFlow to terminate. (templated)
  • aws_conn_id (str) – aws connection to uses

EmrHook

class airflow.contrib.hooks.emr_hook.EmrHook(emr_conn_id=None, *args, **kwargs)[source]

Bases: airflow.contrib.hooks.aws_hook.AwsHook

Interact with AWS EMR. emr_conn_id is only neccessary for using the create_job_flow method.

create_job_flow(job_flow_overrides)[source]

Creates a job flow using the config from the EMR connection. Keys of the json extra hash may have the arguments of the boto3 run_job_flow method. Overrides for this config may be passed as the job_flow_overrides.

AWS S3

  • S3Hook : Interact with AWS S3.
  • S3FileTransformOperator : Copies data from a source S3 location to a temporary location on the local filesystem.
  • S3ListOperator : Lists the files matching a key prefix from a S3 location.
  • S3ToGoogleCloudStorageOperator : Syncs an S3 location with a Google Cloud Storage bucket.
  • S3ToHiveTransfer : Moves data from S3 to Hive. The operator downloads a file from S3, stores the file locally before loading it into a Hive table.

S3Hook

class airflow.hooks.S3_hook.S3Hook(aws_conn_id='aws_default')[source]

Bases: airflow.contrib.hooks.aws_hook.AwsHook

Interact with AWS S3, using the boto3 library.

check_for_bucket(bucket_name)[source]

Check if bucket_name exists.

Parameters:bucket_name (str) – the name of the bucket
check_for_key(key, bucket_name=None)[source]

Checks if a key exists in a bucket

Parameters:
  • key (str) – S3 key that will point to the file
  • bucket_name (str) – Name of the bucket in which the file is stored
check_for_prefix(bucket_name, prefix, delimiter)[source]

Checks that a prefix exists in a bucket

check_for_wildcard_key(wildcard_key, bucket_name=None, delimiter='')[source]

Checks that a key matching a wildcard expression exists in a bucket

get_bucket(bucket_name)[source]

Returns a boto3.S3.Bucket object

Parameters:bucket_name (str) – the name of the bucket
get_key(key, bucket_name=None)[source]

Returns a boto3.s3.Object

Parameters:
  • key (str) – the path to the key
  • bucket_name (str) – the name of the bucket
get_wildcard_key(wildcard_key, bucket_name=None, delimiter='')[source]

Returns a boto3.s3.Object object matching the wildcard expression

Parameters:
  • wildcard_key (str) – the path to the key
  • bucket_name (str) – the name of the bucket
list_keys(bucket_name, prefix='', delimiter='', page_size=None, max_items=None)[source]

Lists keys in a bucket under prefix and not containing delimiter

Parameters:
  • bucket_name (str) – the name of the bucket
  • prefix (str) – a key prefix
  • delimiter (str) – the delimiter marks key hierarchy.
  • page_size (int) – pagination size
  • max_items (int) – maximum items to return
list_prefixes(bucket_name, prefix='', delimiter='', page_size=None, max_items=None)[source]

Lists prefixes in a bucket under prefix

Parameters:
  • bucket_name (str) – the name of the bucket
  • prefix (str) – a key prefix
  • delimiter (str) – the delimiter marks key hierarchy.
  • page_size (int) – pagination size
  • max_items (int) – maximum items to return
load_bytes(bytes_data, key, bucket_name=None, replace=False, encrypt=False)[source]

Loads bytes to S3

This is provided as a convenience to drop a string in S3. It uses the boto infrastructure to ship a file to s3.

Parameters:
  • bytes_data (bytes) – bytes to set as content for the key.
  • key (str) – S3 key that will point to the file
  • bucket_name (str) – Name of the bucket in which to store the file
  • replace (bool) – A flag to decide whether or not to overwrite the key if it already exists
  • encrypt (bool) – If True, the file will be encrypted on the server-side by S3 and will be stored in an encrypted form while at rest in S3.
load_file(filename, key, bucket_name=None, replace=False, encrypt=False)[source]

Loads a local file to S3

Parameters:
  • filename (str) – name of the file to load.
  • key (str) – S3 key that will point to the file
  • bucket_name (str) – Name of the bucket in which to store the file
  • replace (bool) – A flag to decide whether or not to overwrite the key if it already exists. If replace is False and the key exists, an error will be raised.
  • encrypt (bool) – If True, the file will be encrypted on the server-side by S3 and will be stored in an encrypted form while at rest in S3.
load_string(string_data, key, bucket_name=None, replace=False, encrypt=False, encoding='utf-8')[source]

Loads a string to S3

This is provided as a convenience to drop a string in S3. It uses the boto infrastructure to ship a file to s3.

Parameters:
  • string_data (str) – string to set as content for the key.
  • key (str) – S3 key that will point to the file
  • bucket_name (str) – Name of the bucket in which to store the file
  • replace (bool) – A flag to decide whether or not to overwrite the key if it already exists
  • encrypt (bool) – If True, the file will be encrypted on the server-side by S3 and will be stored in an encrypted form while at rest in S3.
read_key(key, bucket_name=None)[source]

Reads a key from S3

Parameters:
  • key (str) – S3 key that will point to the file
  • bucket_name (str) – Name of the bucket in which the file is stored
select_key(key, bucket_name=None, expression='SELECT * FROM S3Object', expression_type='SQL', input_serialization={'CSV': {}}, output_serialization={'CSV': {}})[source]

Reads a key with S3 Select.

Parameters:
  • key (str) – S3 key that will point to the file
  • bucket_name (str) – Name of the bucket in which the file is stored
  • expression (str) – S3 Select expression
  • expression_type (str) – S3 Select expression type
  • input_serialization (dict) – S3 Select input data serialization format
  • output_serialization (dict) – S3 Select output data serialization format
Returns:

retrieved subset of original data by S3 Select

Return type:

str

S3FileTransformOperator

class airflow.operators.s3_file_transform_operator.S3FileTransformOperator(source_s3_key, dest_s3_key, transform_script=None, select_expression=None, source_aws_conn_id='aws_default', dest_aws_conn_id='aws_default', replace=False, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Copies data from a source S3 location to a temporary location on the local filesystem. Runs a transformation on this file as specified by the transformation script and uploads the output to a destination S3 location.

The locations of the source and the destination files in the local filesystem is provided as an first and second arguments to the transformation script. The transformation script is expected to read the data from source, transform it and write the output to the local destination file. The operator then takes over control and uploads the local destination file to S3.

S3 Select is also available to filter the source contents. Users can omit the transformation script if S3 Select expression is specified.

Parameters:
  • source_s3_key (str) – The key to be retrieved from S3. (templated)
  • source_aws_conn_id (str) – source s3 connection
  • dest_s3_key (str) – The key to be written from S3. (templated)
  • dest_aws_conn_id (str) – destination s3 connection
  • replace (bool) – Replace dest S3 key if it already exists
  • transform_script (str) – location of the executable transformation script
  • select_expression (str) – S3 Select expression

S3ListOperator

class airflow.contrib.operators.s3_list_operator.S3ListOperator(bucket, prefix='', delimiter='', aws_conn_id='aws_default', *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

List all objects from the bucket with the given string prefix in name.

This operator returns a python list with the name of objects which can be used by xcom in the downstream task.

Parameters:
  • bucket (string) – The S3 bucket where to find the objects. (templated)
  • prefix (string) – Prefix string to filters the objects whose name begin with such prefix. (templated)
  • delimiter (string) – the delimiter marks key hierarchy. (templated)
  • aws_conn_id (string) – The connection ID to use when connecting to S3 storage.
Example:

The following operator would list all the files (excluding subfolders) from the S3 customers/2018/04/ key in the data bucket.

s3_file = S3ListOperator(
    task_id='list_3s_files',
    bucket='data',
    prefix='customers/2018/04/',
    delimiter='/',
    aws_conn_id='aws_customers_conn'
)

S3ToGoogleCloudStorageOperator

class airflow.contrib.operators.s3_to_gcs_operator.S3ToGoogleCloudStorageOperator(bucket, prefix='', delimiter='', aws_conn_id='aws_default', dest_gcs_conn_id=None, dest_gcs=None, delegate_to=None, replace=False, *args, **kwargs)[source]

Bases: airflow.contrib.operators.s3_list_operator.S3ListOperator

Synchronizes an S3 key, possibly a prefix, with a Google Cloud Storage destination path.

Parameters:
  • bucket (string) – The S3 bucket where to find the objects. (templated)
  • prefix (string) – Prefix string which filters objects whose name begin with such prefix. (templated)
  • delimiter (string) – the delimiter marks key hierarchy. (templated)
  • aws_conn_id (string) – The source S3 connection
  • dest_gcs_conn_id (string) – The destination connection ID to use when connecting to Google Cloud Storage.
  • dest_gcs (string) – The destination Google Cloud Storage bucket and prefix where you want to store the files. (templated)
  • delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • replace (bool) – Whether you want to replace existing destination files or not.

Example: .. code-block:: python

s3_to_gcs_op = S3ToGoogleCloudStorageOperator(
task_id=’s3_to_gcs_example’, bucket=’my-s3-bucket’, prefix=’data/customers-201804’, dest_gcs_conn_id=’google_cloud_default’, dest_gcs=’gs://my.gcs.bucket/some/customers/’, replace=False, dag=my-dag)

Note that bucket, prefix, delimiter and dest_gcs are templated, so you can use variables in them if you wish.

S3ToHiveTransfer

class airflow.operators.s3_to_hive_operator.S3ToHiveTransfer(s3_key, field_dict, hive_table, delimiter=', ', create=True, recreate=False, partition=None, headers=False, check_headers=False, wildcard_match=False, aws_conn_id='aws_default', hive_cli_conn_id='hive_cli_default', input_compressed=False, tblproperties=None, select_expression=None, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Moves data from S3 to Hive. The operator downloads a file from S3, stores the file locally before loading it into a Hive table. If the create or recreate arguments are set to True, a CREATE TABLE and DROP TABLE statements are generated. Hive data types are inferred from the cursor’s metadata from.

Note that the table generated in Hive uses STORED AS textfile which isn’t the most efficient serialization format. If a large amount of data is loaded and/or if the tables gets queried considerably, you may want to use this operator only to stage the data into a temporary table before loading it into its final destination using a HiveOperator.

Parameters:
  • s3_key (str) – The key to be retrieved from S3. (templated)
  • field_dict (dict) – A dictionary of the fields name in the file as keys and their Hive types as values
  • hive_table (str) – target Hive table, use dot notation to target a specific database. (templated)
  • create (bool) – whether to create the table if it doesn’t exist
  • recreate (bool) – whether to drop and recreate the table at every execution
  • partition (dict) – target partition as a dict of partition columns and values. (templated)
  • headers (bool) – whether the file contains column names on the first line
  • check_headers (bool) – whether the column names on the first line should be checked against the keys of field_dict
  • wildcard_match (bool) – whether the s3_key should be interpreted as a Unix wildcard pattern
  • delimiter (str) – field delimiter in the file
  • aws_conn_id (str) – source s3 connection
  • hive_cli_conn_id (str) – destination hive connection
  • input_compressed (bool) – Boolean to determine if file decompression is required to process headers
  • tblproperties (dict) – TBLPROPERTIES of the hive table being created
  • select_expression (str) – S3 Select expression

AWS EC2 Container Service

  • ECSOperator : Execute a task on AWS EC2 Container Service.

ECSOperator

class airflow.contrib.operators.ecs_operator.ECSOperator(task_definition, cluster, overrides, aws_conn_id=None, region_name=None, launch_type='EC2', **kwargs)[source]

Bases: airflow.models.BaseOperator

Execute a task on AWS EC2 Container Service

Parameters:
  • task_definition (str) – the task definition name on EC2 Container Service
  • cluster (str) – the cluster name on EC2 Container Service
  • aws_conn_id (str) – connection id of AWS credentials / region name. If None, credential boto3 strategy will be used (http://boto3.readthedocs.io/en/latest/guide/configuration.html).
  • region_name – region name to use in AWS Hook. Override the region_name in connection (if provided)
  • launch_type – the launch type on which to run your task (‘EC2’ or ‘FARGATE’)
Param:

overrides: the same parameter that boto3 will receive (templated): http://boto3.readthedocs.org/en/latest/reference/services/ecs.html#ECS.Client.run_task

Type:

overrides: dict

Type:

launch_type: str

AWS Batch Service

AWSBatchOperator

class airflow.contrib.operators.awsbatch_operator.AWSBatchOperator(job_name, job_definition, job_queue, overrides, max_retries=4200, aws_conn_id=None, region_name=None, **kwargs)[source]

Bases: airflow.models.BaseOperator

Execute a job on AWS Batch Service

Parameters:
  • job_name (str) – the name for the job that will run on AWS Batch
  • job_definition (str) – the job definition name on AWS Batch
  • job_queue (str) – the queue name on AWS Batch
  • max_retries (int) – exponential backoff retries while waiter is not merged, 4200 = 48 hours
  • aws_conn_id (str) – connection id of AWS credentials / region name. If None, credential boto3 strategy will be used (http://boto3.readthedocs.io/en/latest/guide/configuration.html).
  • region_name – region name to use in AWS Hook. Override the region_name in connection (if provided)
Param:

overrides: the same parameter that boto3 will receive on containerOverrides (templated): http://boto3.readthedocs.io/en/latest/reference/services/batch.html#submit_job

Type:

overrides: dict

AWS RedShift

AwsRedshiftClusterSensor

class airflow.contrib.sensors.aws_redshift_cluster_sensor.AwsRedshiftClusterSensor(cluster_identifier, target_status='available', aws_conn_id='aws_default', *args, **kwargs)[source]

Bases: airflow.sensors.base_sensor_operator.BaseSensorOperator

Waits for a Redshift cluster to reach a specific status.

Parameters:
  • cluster_identifier (str) – The identifier for the cluster being pinged.
  • target_status (str) – The cluster status desired.
poke(context)[source]

Function that the sensors defined while deriving this class should override.

RedshiftHook

class airflow.contrib.hooks.redshift_hook.RedshiftHook(aws_conn_id='aws_default')[source]

Bases: airflow.contrib.hooks.aws_hook.AwsHook

Interact with AWS Redshift, using the boto3 library

cluster_status(cluster_identifier)[source]

Return status of a cluster

Parameters:cluster_identifier (str) – unique identifier of a cluster
create_cluster_snapshot(snapshot_identifier, cluster_identifier)[source]

Creates a snapshot of a cluster

Parameters:
  • snapshot_identifier (str) – unique identifier for a snapshot of a cluster
  • cluster_identifier (str) – unique identifier of a cluster
delete_cluster(cluster_identifier, skip_final_cluster_snapshot=True, final_cluster_snapshot_identifier='')[source]

Delete a cluster and optionally create a snapshot

Parameters:
  • cluster_identifier (str) – unique identifier of a cluster
  • skip_final_cluster_snapshot (bool) – determines cluster snapshot creation
  • final_cluster_snapshot_identifier (str) – name of final cluster snapshot
describe_cluster_snapshots(cluster_identifier)[source]

Gets a list of snapshots for a cluster

Parameters:cluster_identifier (str) – unique identifier of a cluster
restore_from_cluster_snapshot(cluster_identifier, snapshot_identifier)[source]

Restores a cluster from its snapshot

Parameters:
  • cluster_identifier (str) – unique identifier of a cluster
  • snapshot_identifier (str) – unique identifier for a snapshot of a cluster

RedshiftToS3Transfer

class airflow.operators.redshift_to_s3_operator.RedshiftToS3Transfer(schema, table, s3_bucket, s3_key, redshift_conn_id='redshift_default', aws_conn_id='aws_default', unload_options=(), autocommit=False, parameters=None, include_header=False, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Executes an UNLOAD command to s3 as a CSV with headers

Parameters:
  • schema (string) – reference to a specific schema in redshift database
  • table (string) – reference to a specific table in redshift database
  • s3_bucket (string) – reference to a specific S3 bucket
  • s3_key (string) – reference to a specific S3 key
  • redshift_conn_id (string) – reference to a specific redshift database
  • aws_conn_id (string) – reference to a specific S3 connection
  • unload_options (list) – reference to a list of UNLOAD options

S3ToRedshiftTransfer

class airflow.operators.s3_to_redshift_operator.S3ToRedshiftTransfer(schema, table, s3_bucket, s3_key, redshift_conn_id='redshift_default', aws_conn_id='aws_default', copy_options=(), autocommit=False, parameters=None, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Executes an COPY command to load files from s3 to Redshift

Parameters:
  • schema (string) – reference to a specific schema in redshift database
  • table (string) – reference to a specific table in redshift database
  • s3_bucket (string) – reference to a specific S3 bucket
  • s3_key (string) – reference to a specific S3 key
  • redshift_conn_id (string) – reference to a specific redshift database
  • aws_conn_id (string) – reference to a specific S3 connection
  • copy_options (list) – reference to a list of COPY options

Databricks

Databricks has contributed an Airflow operator which enables submitting runs to the Databricks platform. Internally the operator talks to the api/2.0/jobs/runs/submit endpoint.

DatabricksSubmitRunOperator

class airflow.contrib.operators.databricks_operator.DatabricksSubmitRunOperator(json=None, spark_jar_task=None, notebook_task=None, new_cluster=None, existing_cluster_id=None, libraries=None, run_name=None, timeout_seconds=None, databricks_conn_id='databricks_default', polling_period_seconds=30, databricks_retry_limit=3, do_xcom_push=False, **kwargs)[source]

Bases: airflow.models.BaseOperator

Submits an Spark job run to Databricks using the api/2.0/jobs/runs/submit API endpoint.

There are two ways to instantiate this operator.

In the first way, you can take the JSON payload that you typically use to call the api/2.0/jobs/runs/submit endpoint and pass it directly to our DatabricksSubmitRunOperator through the json parameter. For example

json = {
  'new_cluster': {
    'spark_version': '2.1.0-db3-scala2.11',
    'num_workers': 2
  },
  'notebook_task': {
    'notebook_path': '/Users/airflow@example.com/PrepareData',
  },
}
notebook_run = DatabricksSubmitRunOperator(task_id='notebook_run', json=json)

Another way to accomplish the same thing is to use the named parameters of the DatabricksSubmitRunOperator directly. Note that there is exactly one named parameter for each top level parameter in the runs/submit endpoint. In this method, your code would look like this:

new_cluster = {
  'spark_version': '2.1.0-db3-scala2.11',
  'num_workers': 2
}
notebook_task = {
  'notebook_path': '/Users/airflow@example.com/PrepareData',
}
notebook_run = DatabricksSubmitRunOperator(
    task_id='notebook_run',
    new_cluster=new_cluster,
    notebook_task=notebook_task)

In the case where both the json parameter AND the named parameters are provided, they will be merged together. If there are conflicts during the merge, the named parameters will take precedence and override the top level json keys.

Currently the named parameters that DatabricksSubmitRunOperator supports are
  • spark_jar_task
  • notebook_task
  • new_cluster
  • existing_cluster_id
  • libraries
  • run_name
  • timeout_seconds
Parameters:
  • json (dict) –

    A JSON object containing API parameters which will be passed directly to the api/2.0/jobs/runs/submit endpoint. The other named parameters (i.e. spark_jar_task, notebook_task..) to this operator will be merged with this json dictionary if they are provided. If there are conflicts during the merge, the named parameters will take precedence and override the top level json keys. (templated)

    See also

    For more information about templating see Jinja Templating. https://docs.databricks.com/api/latest/jobs.html#runs-submit

  • spark_jar_task (dict) –

    The main class and parameters for the JAR task. Note that the actual JAR is specified in the libraries. EITHER spark_jar_task OR notebook_task should be specified. This field will be templated.

  • notebook_task (dict) –

    The notebook path and parameters for the notebook task. EITHER spark_jar_task OR notebook_task should be specified. This field will be templated.

  • new_cluster (dict) –

    Specs for a new cluster on which this task will be run. EITHER new_cluster OR existing_cluster_id should be specified. This field will be templated.

  • existing_cluster_id (string) – ID for existing cluster on which to run this task. EITHER new_cluster OR existing_cluster_id should be specified. This field will be templated.
  • libraries (list of dicts) –

    Libraries which this run will use. This field will be templated.

  • run_name (string) – The run name used for this task. By default this will be set to the Airflow task_id. This task_id is a required parameter of the superclass BaseOperator. This field will be templated.
  • timeout_seconds (int32) – The timeout for this run. By default a value of 0 is used which means to have no timeout. This field will be templated.
  • databricks_conn_id (string) – The name of the Airflow connection to use. By default and in the common case this will be databricks_default. To use token based authentication, provide the key token in the extra field for the connection.
  • polling_period_seconds (int) – Controls the rate which we poll for the result of this run. By default the operator will poll every 30 seconds.
  • databricks_retry_limit (int) – Amount of times retry if the Databricks backend is unreachable. Its value must be greater than or equal to 1.
  • do_xcom_push (boolean) – Whether we should push run_id and run_page_url to xcom.

GCP: Google Cloud Platform

Airflow has extensive support for the Google Cloud Platform. But note that most Hooks and Operators are in the contrib section. Meaning that they have a beta status, meaning that they can have breaking changes between minor releases.

See the GCP connection type documentation to configure connections to GCP.

Logging

Airflow can be configured to read and write task logs in Google Cloud Storage. See Writing Logs to Google Cloud Storage.

BigQuery

BigQuery Operators

BigQueryCheckOperator
class airflow.contrib.operators.bigquery_check_operator.BigQueryCheckOperator(sql, bigquery_conn_id='bigquery_default', *args, **kwargs)[source]

Bases: airflow.operators.check_operator.CheckOperator

Performs checks against BigQuery. The BigQueryCheckOperator expects a sql query that will return a single row. Each value on that first row is evaluated using python bool casting. If any of the values return False the check is failed and errors out.

Note that Python bool casting evals the following as False:

  • False
  • 0
  • Empty string ("")
  • Empty list ([])
  • Empty dictionary or set ({})

Given a query like SELECT COUNT(*) FROM foo, it will fail only if the count == 0. You can craft much more complex query that could, for instance, check that the table has the same number of rows as the source table upstream, or that the count of today’s partition is greater than yesterday’s partition, or that a set of metrics are less than 3 standard deviation for the 7 day average.

This operator can be used as a data quality check in your pipeline, and depending on where you put it in your DAG, you have the choice to stop the critical path, preventing from publishing dubious data, or on the side and receive email alterts without stopping the progress of the DAG.

Parameters:
  • sql (string) – the sql to be executed
  • bigquery_conn_id (string) – reference to the BigQuery database
BigQueryValueCheckOperator
class airflow.contrib.operators.bigquery_check_operator.BigQueryValueCheckOperator(sql, pass_value, tolerance=None, bigquery_conn_id='bigquery_default', *args, **kwargs)[source]

Bases: airflow.operators.check_operator.ValueCheckOperator

Performs a simple value check using sql code.

Parameters:sql (string) – the sql to be executed
BigQueryIntervalCheckOperator
class airflow.contrib.operators.bigquery_check_operator.BigQueryIntervalCheckOperator(table, metrics_thresholds, date_filter_column='ds', days_back=-7, bigquery_conn_id='bigquery_default', *args, **kwargs)[source]

Bases: airflow.operators.check_operator.IntervalCheckOperator

Checks that the values of metrics given as SQL expressions are within a certain tolerance of the ones from days_back before.

This method constructs a query like so

SELECT {metrics_threshold_dict_key} FROM {table}
    WHERE {date_filter_column}=<date>
Parameters:
  • table (str) – the table name
  • days_back (int) – number of days between ds and the ds we want to check against. Defaults to 7 days
  • metrics_threshold (dict) – a dictionary of ratios indexed by metrics, for example ‘COUNT(*)’: 1.5 would require a 50 percent or less difference between the current day, and the prior days_back.
BigQueryGetDataOperator
class airflow.contrib.operators.bigquery_get_data.BigQueryGetDataOperator(dataset_id, table_id, max_results='100', selected_fields=None, bigquery_conn_id='bigquery_default', delegate_to=None, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Fetches the data from a BigQuery table (alternatively fetch data for selected columns) and returns data in a python list. The number of elements in the returned list will be equal to the number of rows fetched. Each element in the list will again be a list where element would represent the columns values for that row.

Example Result: [['Tony', '10'], ['Mike', '20'], ['Steve', '15']]

Note

If you pass fields to selected_fields which are in different order than the order of columns already in BQ table, the data will still be in the order of BQ table. For example if the BQ table has 3 columns as [A,B,C] and you pass ‘B,A’ in the selected_fields the data would still be of the form 'A,B'.

Example:

get_data = BigQueryGetDataOperator(
    task_id='get_data_from_bq',
    dataset_id='test_dataset',
    table_id='Transaction_partitions',
    max_results='100',
    selected_fields='DATE',
    bigquery_conn_id='airflow-service-account'
)
Parameters:
  • dataset_id – The dataset ID of the requested table. (templated)
  • table_id (string) – The table ID of the requested table. (templated)
  • max_results (string) – The maximum number of records (rows) to be fetched from the table. (templated)
  • selected_fields (string) – List of fields to return (comma-separated). If unspecified, all fields are returned.
  • bigquery_conn_id (string) – reference to a specific BigQuery hook.
  • delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
BigQueryCreateEmptyTableOperator
class airflow.contrib.operators.bigquery_operator.BigQueryCreateEmptyTableOperator(dataset_id, table_id, project_id=None, schema_fields=None, gcs_schema_object=None, time_partitioning={}, bigquery_conn_id='bigquery_default', google_cloud_storage_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Creates a new, empty table in the specified BigQuery dataset, optionally with schema.

The schema to be used for the BigQuery table may be specified in one of two ways. You may either directly pass the schema fields in, or you may point the operator to a Google cloud storage object name. The object in Google cloud storage must be a JSON file with the schema fields in it. You can also create a table without schema.

Parameters:
  • project_id (string) – The project to create the table into. (templated)
  • dataset_id (string) – The dataset to create the table into. (templated)
  • table_id (string) – The Name of the table to be created. (templated)
  • schema_fields (list) –

    If set, the schema field list as defined here: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema

    Example:

    schema_fields=[{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
                   {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}]
    
  • gcs_schema_object (string) – Full path to the JSON file containing schema (templated). For example: gs://test-bucket/dir1/dir2/employee_schema.json
  • time_partitioning (dict) –

    configure optional time partitioning fields i.e. partition by field, type and expiration as per API specifications.

  • bigquery_conn_id (string) – Reference to a specific BigQuery hook.
  • google_cloud_storage_conn_id (string) – Reference to a specific Google cloud storage hook.
  • delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.

Example (with schema JSON in GCS):

CreateTable = BigQueryCreateEmptyTableOperator(
    task_id='BigQueryCreateEmptyTableOperator_task',
    dataset_id='ODS',
    table_id='Employees',
    project_id='internal-gcp-project',
    gcs_schema_object='gs://schema-bucket/employee_schema.json',
    bigquery_conn_id='airflow-service-account',
    google_cloud_storage_conn_id='airflow-service-account'
)

Corresponding Schema file (employee_schema.json):

[
  {
    "mode": "NULLABLE",
    "name": "emp_name",
    "type": "STRING"
  },
  {
    "mode": "REQUIRED",
    "name": "salary",
    "type": "INTEGER"
  }
]

Example (with schema in the DAG):

CreateTable = BigQueryCreateEmptyTableOperator(
    task_id='BigQueryCreateEmptyTableOperator_task',
    dataset_id='ODS',
    table_id='Employees',
    project_id='internal-gcp-project',
    schema_fields=[{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
                   {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}],
    bigquery_conn_id='airflow-service-account',
    google_cloud_storage_conn_id='airflow-service-account'
)
BigQueryCreateExternalTableOperator
class airflow.contrib.operators.bigquery_operator.BigQueryCreateExternalTableOperator(bucket, source_objects, destination_project_dataset_table, schema_fields=None, schema_object=None, source_format='CSV', compression='NONE', skip_leading_rows=0, field_delimiter=', ', max_bad_records=0, quote_character=None, allow_quoted_newlines=False, allow_jagged_rows=False, bigquery_conn_id='bigquery_default', google_cloud_storage_conn_id='google_cloud_default', delegate_to=None, src_fmt_configs={}, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Creates a new external table in the dataset with the data in Google Cloud Storage.

The schema to be used for the BigQuery table may be specified in one of two ways. You may either directly pass the schema fields in, or you may point the operator to a Google cloud storage object name. The object in Google cloud storage must be a JSON file with the schema fields in it.

Parameters:
  • bucket (string) – The bucket to point the external table to. (templated)
  • source_objects – List of Google cloud storage URIs to point table to. (templated) If source_format is ‘DATASTORE_BACKUP’, the list must only contain a single URI.
  • destination_project_dataset_table (string) – The dotted (<project>.)<dataset>.<table> BigQuery table to load data into (templated). If <project> is not included, project will be the project defined in the connection json.
  • schema_fields (list) –

    If set, the schema field list as defined here: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema

    Example:

    schema_fields=[{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
                   {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}]
    

    Should not be set when source_format is ‘DATASTORE_BACKUP’.

  • schema_object – If set, a GCS object path pointing to a .json file that contains the schema for the table. (templated)
  • schema_object – string
  • source_format (string) – File format of the data.
  • compression (string) – [Optional] The compression type of the data source. Possible values include GZIP and NONE. The default value is NONE. This setting is ignored for Google Cloud Bigtable, Google Cloud Datastore backups and Avro formats.
  • skip_leading_rows (int) – Number of rows to skip when loading from a CSV.
  • field_delimiter (string) – The delimiter to use for the CSV.
  • max_bad_records (int) – The maximum number of bad records that BigQuery can ignore when running the job.
  • quote_character (string) – The value that is used to quote data sections in a CSV file.
  • allow_quoted_newlines (boolean) – Whether to allow quoted newlines (true) or not (false).
  • allow_jagged_rows (bool) – Accept rows that are missing trailing optional columns. The missing values are treated as nulls. If false, records with missing trailing columns are treated as bad records, and if there are too many bad records, an invalid error is returned in the job result. Only applicable to CSV, ignored for other formats.
  • bigquery_conn_id (string) – Reference to a specific BigQuery hook.
  • google_cloud_storage_conn_id (string) – Reference to a specific Google cloud storage hook.
  • delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • src_fmt_configs (dict) – configure optional fields specific to the source format
BigQueryDeleteDatasetOperator
BigQueryOperator
class airflow.contrib.operators.bigquery_operator.BigQueryOperator(bql=None, sql=None, destination_dataset_table=False, write_disposition='WRITE_EMPTY', allow_large_results=False, flatten_results=False, bigquery_conn_id='bigquery_default', delegate_to=None, udf_config=False, use_legacy_sql=True, maximum_billing_tier=None, maximum_bytes_billed=None, create_disposition='CREATE_IF_NEEDED', schema_update_options=(), query_params=None, priority='INTERACTIVE', time_partitioning={}, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Executes BigQuery SQL queries in a specific BigQuery database

Parameters:
  • bql (Can receive a str representing a sql statement, a list of str (sql statements), or reference to a template file. Template reference are recognized by str ending in '.sql'.) – (Deprecated. Use sql parameter instead) the sql code to be executed (templated)
  • sql (Can receive a str representing a sql statement, a list of str (sql statements), or reference to a template file. Template reference are recognized by str ending in '.sql'.) – the sql code to be executed (templated)
  • destination_dataset_table (string) – A dotted (<project>.|<project>:)<dataset>.<table> that, if set, will store the results of the query. (templated)
  • write_disposition (string) – Specifies the action that occurs if the destination table already exists. (default: ‘WRITE_EMPTY’)
  • create_disposition (string) – Specifies whether the job is allowed to create new tables. (default: ‘CREATE_IF_NEEDED’)
  • allow_large_results (boolean) – Whether to allow large results.
  • flatten_results (boolean) – If true and query uses legacy SQL dialect, flattens all nested and repeated fields in the query results. allow_large_results must be true if this is set to false. For standard SQL queries, this flag is ignored and results are never flattened.
  • bigquery_conn_id (string) – reference to a specific BigQuery hook.
  • delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • udf_config (list) – The User Defined Function configuration for the query. See https://cloud.google.com/bigquery/user-defined-functions for details.
  • use_legacy_sql (boolean) – Whether to use legacy SQL (true) or standard SQL (false).
  • maximum_billing_tier (integer) – Positive integer that serves as a multiplier of the basic price. Defaults to None, in which case it uses the value set in the project.
  • maximum_bytes_billed (float) – Limits the bytes billed for this job. Queries that will have bytes billed beyond this limit will fail (without incurring a charge). If unspecified, this will be set to your project default.
  • schema_update_options (tuple) – Allows the schema of the destination table to be updated as a side effect of the load job.
  • query_params (dict) – a dictionary containing query parameter types and values, passed to BigQuery.
  • priority (string) – Specifies a priority for the query. Possible values include INTERACTIVE and BATCH. The default value is INTERACTIVE.
  • time_partitioning (dict) – configure optional time partitioning fields i.e. partition by field, type and expiration as per API specifications. Note that ‘field’ is not available in conjunction with dataset.table$partition.
BigQueryTableDeleteOperator
class airflow.contrib.operators.bigquery_table_delete_operator.BigQueryTableDeleteOperator(deletion_dataset_table, bigquery_conn_id='bigquery_default', delegate_to=None, ignore_if_missing=False, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Deletes BigQuery tables

Parameters:
  • deletion_dataset_table (string) – A dotted (<project>.|<project>:)<dataset>.<table> that indicates which table will be deleted. (templated)
  • bigquery_conn_id (string) – reference to a specific BigQuery hook.
  • delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • ignore_if_missing (boolean) – if True, then return success even if the requested table does not exist.
BigQueryToBigQueryOperator
class airflow.contrib.operators.bigquery_to_bigquery.BigQueryToBigQueryOperator(source_project_dataset_tables, destination_project_dataset_table, write_disposition='WRITE_EMPTY', create_disposition='CREATE_IF_NEEDED', bigquery_conn_id='bigquery_default', delegate_to=None, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Copies data from one BigQuery table to another.

See also

For more details about these parameters: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.copy

Parameters:
  • source_project_dataset_tables (list|string) – One or more dotted (project:|project.)<dataset>.<table> BigQuery tables to use as the source data. If <project> is not included, project will be the project defined in the connection json. Use a list if there are multiple source tables. (templated)
  • destination_project_dataset_table (string) – The destination BigQuery table. Format is: (project:|project.)<dataset>.<table> (templated)
  • write_disposition (string) – The write disposition if the table already exists.
  • create_disposition (string) – The create disposition if the table doesn’t exist.
  • bigquery_conn_id (string) – reference to a specific BigQuery hook.
  • delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
BigQueryToCloudStorageOperator
class airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator(source_project_dataset_table, destination_cloud_storage_uris, compression='NONE', export_format='CSV', field_delimiter=', ', print_header=True, bigquery_conn_id='bigquery_default', delegate_to=None, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Transfers a BigQuery table to a Google Cloud Storage bucket.

See also

For more details about these parameters: https://cloud.google.com/bigquery/docs/reference/v2/jobs

Parameters:
  • source_project_dataset_table (string) – The dotted (<project>.|<project>:)<dataset>.<table> BigQuery table to use as the source data. If <project> is not included, project will be the project defined in the connection json. (templated)
  • destination_cloud_storage_uris (list) – The destination Google Cloud Storage URI (e.g. gs://some-bucket/some-file.txt). (templated) Follows convention defined here: https://cloud.google.com/bigquery/exporting-data-from-bigquery#exportingmultiple
  • compression (string) – Type of compression to use.
  • export_format – File format to export.
  • field_delimiter (string) – The delimiter to use when extracting to a CSV.
  • print_header (boolean) – Whether to print a header for a CSV file extract.
  • bigquery_conn_id (string) – reference to a specific BigQuery hook.
  • delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.

BigQueryHook

class airflow.contrib.hooks.bigquery_hook.BigQueryHook(bigquery_conn_id='bigquery_default', delegate_to=None, use_legacy_sql=True)[source]

Bases: airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook, airflow.hooks.dbapi_hook.DbApiHook, airflow.utils.log.logging_mixin.LoggingMixin

Interact with BigQuery. This hook uses the Google Cloud Platform connection.

get_conn()[source]

Returns a BigQuery PEP 249 connection object.

get_pandas_df(sql, parameters=None, dialect=None)[source]

Returns a Pandas DataFrame for the results produced by a BigQuery query. The DbApiHook method must be overridden because Pandas doesn’t support PEP 249 connections, except for SQLite. See:

https://github.com/pydata/pandas/blob/master/pandas/io/sql.py#L447 https://github.com/pydata/pandas/issues/6900

Parameters:
  • sql (string) – The BigQuery SQL to execute.
  • parameters (mapping or iterable) – The parameters to render the SQL query with (not used, leave to override superclass method)
  • dialect (string in {'legacy', 'standard'}) – Dialect of BigQuery SQL – legacy SQL or standard SQL defaults to use self.use_legacy_sql if not specified
get_service()[source]

Returns a BigQuery service object.

insert_rows(table, rows, target_fields=None, commit_every=1000)[source]

Insertion is currently unsupported. Theoretically, you could use BigQuery’s streaming API to insert rows into a table, but this hasn’t been implemented.

table_exists(project_id, dataset_id, table_id)[source]

Checks for the existence of a table in Google BigQuery.

Parameters:
  • project_id (string) – The Google cloud project in which to look for the table. The connection supplied to the hook must provide access to the specified project.
  • dataset_id (string) – The name of the dataset in which to look for the table.
  • table_id (string) – The name of the table to check the existence of.

Cloud DataFlow

DataFlow Operators

DataFlowJavaOperator
class airflow.contrib.operators.dataflow_operator.DataFlowJavaOperator(jar, dataflow_default_options=None, options=None, gcp_conn_id='google_cloud_default', delegate_to=None, poll_sleep=10, job_class=None, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Start a Java Cloud DataFlow batch job. The parameters of the operation will be passed to the job.

It’s a good practice to define dataflow_* parameters in the default_args of the dag like the project, zone and staging location.

default_args = {
    'dataflow_default_options': {
        'project': 'my-gcp-project',
        'zone': 'europe-west1-d',
        'stagingLocation': 'gs://my-staging-bucket/staging/'
    }
}

You need to pass the path to your dataflow as a file reference with the jar parameter, the jar needs to be a self executing jar (see documentation here: https://beam.apache.org/documentation/runners/dataflow/#self-executing-jar). Use options to pass on options to your job.

t1 = DataFlowOperation(
    task_id='datapflow_example',
    jar='{{var.value.gcp_dataflow_base}}pipeline/build/libs/pipeline-example-1.0.jar',
    options={
        'autoscalingAlgorithm': 'BASIC',
        'maxNumWorkers': '50',
        'start': '{{ds}}',
        'partitionType': 'DAY',
        'labels': {'foo' : 'bar'}
    },
    gcp_conn_id='gcp-airflow-service-account',
    dag=my-dag)

Both jar and options are templated so you can use variables in them.

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date':
        (2016, 8, 1),
    'email': ['alex@vanboxel.be'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=30),
    'dataflow_default_options': {
        'project': 'my-gcp-project',
        'zone': 'us-central1-f',
        'stagingLocation': 'gs://bucket/tmp/dataflow/staging/',
    }
}

dag = DAG('test-dag', default_args=default_args)

task = DataFlowJavaOperator(
    gcp_conn_id='gcp_default',
    task_id='normalize-cal',
    jar='{{var.value.gcp_dataflow_base}}pipeline-ingress-cal-normalize-1.0.jar',
    options={
        'autoscalingAlgorithm': 'BASIC',
        'maxNumWorkers': '50',
        'start': '{{ds}}',
        'partitionType': 'DAY'

    },
    dag=dag)
DataflowTemplateOperator
class airflow.contrib.operators.dataflow_operator.DataflowTemplateOperator(template, dataflow_default_options=None, parameters=None, gcp_conn_id='google_cloud_default', delegate_to=None, poll_sleep=10, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Start a Templated Cloud DataFlow batch job. The parameters of the operation will be passed to the job. It’s a good practice to define dataflow_* parameters in the default_args of the dag like the project, zone and staging location.

default_args = {
    'dataflow_default_options': {
        'project': 'my-gcp-project'
        'zone': 'europe-west1-d',
        'tempLocation': 'gs://my-staging-bucket/staging/'
        }
    }
}

You need to pass the path to your dataflow template as a file reference with the template parameter. Use parameters to pass on parameters to your job. Use environment to pass on runtime environment variables to your job.

t1 = DataflowTemplateOperator(
    task_id='datapflow_example',
    template='{{var.value.gcp_dataflow_base}}',
    parameters={
        'inputFile': "gs://bucket/input/my_input.txt",
        'outputFile': "gs://bucket/output/my_output.txt"
    },
    gcp_conn_id='gcp-airflow-service-account',
    dag=my-dag)

template, dataflow_default_options and parameters are templated so you can use variables in them.

DataFlowPythonOperator
class airflow.contrib.operators.dataflow_operator.DataFlowPythonOperator(py_file, py_options=None, dataflow_default_options=None, options=None, gcp_conn_id='google_cloud_default', delegate_to=None, poll_sleep=10, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

execute(context)[source]

Execute the python dataflow job.

DataFlowHook

class airflow.contrib.hooks.gcp_dataflow_hook.DataFlowHook(gcp_conn_id='google_cloud_default', delegate_to=None, poll_sleep=10)[source]

Bases: airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook

get_conn()[source]

Returns a Google Cloud Storage service object.

Cloud DataProc

DataProc Operators

DataprocClusterCreateOperator
class airflow.contrib.operators.dataproc_operator.DataprocClusterCreateOperator(cluster_name, project_id, num_workers, zone, network_uri=None, subnetwork_uri=None, internal_ip_only=None, tags=None, storage_bucket=None, init_actions_uris=None, init_action_timeout='10m', metadata=None, image_version=None, properties=None, master_machine_type='n1-standard-4', master_disk_size=500, worker_machine_type='n1-standard-4', worker_disk_size=500, num_preemptible_workers=0, labels=None, region='global', gcp_conn_id='google_cloud_default', delegate_to=None, service_account=None, service_account_scopes=None, idle_delete_ttl=None, auto_delete_time=None, auto_delete_ttl=None, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Create a new cluster on Google Cloud Dataproc. The operator will wait until the creation is successful or an error occurs in the creation process.

The parameters allow to configure the cluster. Please refer to

https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters

for a detailed explanation on the different parameters. Most of the configuration parameters detailed in the link are available as a parameter to this operator.

Parameters:
  • cluster_name (string) – The name of the DataProc cluster to create. (templated)
  • project_id (string) – The ID of the google cloud project in which to create the cluster. (templated)
  • num_workers (int) – The # of workers to spin up
  • storage_bucket (string) – The storage bucket to use, setting to None lets dataproc generate a custom one for you
  • init_actions_uris (list[string]) – List of GCS uri’s containing dataproc initialization scripts
  • init_action_timeout (string) – Amount of time executable scripts in init_actions_uris has to complete
  • metadata (dict) – dict of key-value google compute engine metadata entries to add to all instances
  • image_version (string) – the version of software inside the Dataproc cluster
  • properties (dict) – dict of properties to set on config files (e.g. spark-defaults.conf), see https://cloud.google.com/dataproc/docs/reference/rest/v1/ projects.regions.clusters#SoftwareConfig
  • master_machine_type (string) – Compute engine machine type to use for the master node
  • master_disk_size (int) – Disk size for the master node
  • worker_machine_type (string) – Compute engine machine type to use for the worker nodes
  • worker_disk_size (int) – Disk size for the worker nodes
  • num_preemptible_workers (int) – The # of preemptible worker nodes to spin up
  • labels (dict) – dict of labels to add to the cluster
  • zone (string) – The zone where the cluster will be located. (templated)
  • network_uri (string) – The network uri to be used for machine communication, cannot be specified with subnetwork_uri
  • subnetwork_uri (string) – The subnetwork uri to be used for machine communication, cannot be specified with network_uri
  • internal_ip_only (bool) – If true, all instances in the cluster will only have internal IP addresses. This can only be enabled for subnetwork enabled networks
  • tags (list[string]) – The GCE tags to add to all instances
  • region – leave as ‘global’, might become relevant in the future. (templated)
  • gcp_conn_id (string) – The connection ID to use connecting to Google Cloud Platform.
  • delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • service_account (string) – The service account of the dataproc instances.
  • service_account_scopes (list[string]) – The URIs of service account scopes to be included.
  • idle_delete_ttl (int) – The longest duration that cluster would keep alive while staying idle. Passing this threshold will cause cluster to be auto-deleted. A duration in seconds.
  • auto_delete_time (datetime.datetime) – The time when cluster will be auto-deleted.
  • auto_delete_ttl (int) – The life duration of cluster, the cluster will be auto-deleted at the end of this duration. A duration in seconds. (If auto_delete_time is set this parameter will be ignored)
DataprocClusterScaleOperator
class airflow.contrib.operators.dataproc_operator.DataprocClusterScaleOperator(cluster_name, project_id, region='global', gcp_conn_id='google_cloud_default', delegate_to=None, num_workers=2, num_preemptible_workers=0, graceful_decommission_timeout=None, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Scale, up or down, a cluster on Google Cloud Dataproc. The operator will wait until the cluster is re-scaled.

Example:

t1 = DataprocClusterScaleOperator(
task_id=’dataproc_scale’, project_id=’my-project’, cluster_name=’cluster-1’, num_workers=10, num_preemptible_workers=10, graceful_decommission_timeout=‘1h’ dag=dag)

See also

For more detail on about scaling clusters have a look at the reference: https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/scaling-clusters

Parameters:
  • cluster_name (string) – The name of the cluster to scale. (templated)
  • project_id (string) – The ID of the google cloud project in which the cluster runs. (templated)
  • region (string) – The region for the dataproc cluster. (templated)
  • gcp_conn_id (string) – The connection ID to use connecting to Google Cloud Platform.
  • num_workers (int) – The new number of workers
  • num_preemptible_workers (int) – The new number of preemptible workers
  • graceful_decommission_timeout (string) – Timeout for graceful YARN decomissioning. Maximum value is 1d
  • delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
DataprocClusterDeleteOperator
class airflow.contrib.operators.dataproc_operator.DataprocClusterDeleteOperator(cluster_name, project_id, region='global', gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Delete a cluster on Google Cloud Dataproc. The operator will wait until the cluster is destroyed.

Parameters:
  • cluster_name (string) – The name of the cluster to create. (templated)
  • project_id (string) – The ID of the google cloud project in which the cluster runs. (templated)
  • region (string) – leave as ‘global’, might become relevant in the future. (templated)
  • gcp_conn_id (string) – The connection ID to use connecting to Google Cloud Platform.
  • delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
DataProcPigOperator
class airflow.contrib.operators.dataproc_operator.DataProcPigOperator(query=None, query_uri=None, variables=None, job_name='{{task.task_id}}_{{ds_nodash}}', cluster_name='cluster-1', dataproc_pig_properties=None, dataproc_pig_jars=None, gcp_conn_id='google_cloud_default', delegate_to=None, region='global', *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Start a Pig query Job on a Cloud DataProc cluster. The parameters of the operation will be passed to the cluster.

It’s a good practice to define dataproc_* parameters in the default_args of the dag like the cluster name and UDFs.

default_args = {
    'cluster_name': 'cluster-1',
    'dataproc_pig_jars': [
        'gs://example/udf/jar/datafu/1.2.0/datafu.jar',
        'gs://example/udf/jar/gpig/1.2/gpig.jar'
    ]
}

You can pass a pig script as string or file reference. Use variables to pass on variables for the pig script to be resolved on the cluster or use the parameters to be resolved in the script as template parameters.

Example:

t1 = DataProcPigOperator(
        task_id='dataproc_pig',
        query='a_pig_script.pig',
        variables={'out': 'gs://example/output/{{ds}}'},
        dag=dag)

See also

For more detail on about job submission have a look at the reference: https://cloud.google.com/dataproc/reference/rest/v1/projects.regions.jobs

Parameters:
  • query (string) – The query or reference to the query file (pg or pig extension). (templated)
  • query_uri (string) – The uri of a pig script on Cloud Storage.
  • variables (dict) – Map of named parameters for the query. (templated)
  • job_name (string) – The job name used in the DataProc cluster. This name by default is the task_id appended with the execution data, but can be templated. The name will always be appended with a random number to avoid name clashes. (templated)
  • cluster_name (string) – The name of the DataProc cluster. (templated)
  • dataproc_pig_properties (dict) – Map for the Pig properties. Ideal to put in default arguments
  • dataproc_pig_jars (list) – URIs to jars provisioned in Cloud Storage (example: for UDFs and libs) and are ideal to put in default arguments.
  • gcp_conn_id (string) – The connection ID to use connecting to Google Cloud Platform.
  • delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • region (string) – The specified region where the dataproc cluster is created.
DataProcHiveOperator
class airflow.contrib.operators.dataproc_operator.DataProcHiveOperator(query=None, query_uri=None, variables=None, job_name='{{task.task_id}}_{{ds_nodash}}', cluster_name='cluster-1', dataproc_hive_properties=None, dataproc_hive_jars=None, gcp_conn_id='google_cloud_default', delegate_to=None, region='global', *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Start a Hive query Job on a Cloud DataProc cluster.

Parameters:
  • query (string) – The query or reference to the query file (q extension).
  • query_uri (string) – The uri of a hive script on Cloud Storage.
  • variables (dict) – Map of named parameters for the query.
  • job_name (string) – The job name used in the DataProc cluster. This name by default is the task_id appended with the execution data, but can be templated. The name will always be appended with a random number to avoid name clashes.
  • cluster_name (string) – The name of the DataProc cluster.
  • dataproc_hive_properties (dict) – Map for the Pig properties. Ideal to put in default arguments
  • dataproc_hive_jars (list) – URIs to jars provisioned in Cloud Storage (example: for UDFs and libs) and are ideal to put in default arguments.
  • gcp_conn_id (string) – The connection ID to use connecting to Google Cloud Platform.
  • delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • region (string) – The specified region where the dataproc cluster is created.
DataProcSparkSqlOperator
class airflow.contrib.operators.dataproc_operator.DataProcSparkSqlOperator(query=None, query_uri=None, variables=None, job_name='{{task.task_id}}_{{ds_nodash}}', cluster_name='cluster-1', dataproc_spark_properties=None, dataproc_spark_jars=None, gcp_conn_id='google_cloud_default', delegate_to=None, region='global', *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Start a Spark SQL query Job on a Cloud DataProc cluster.

Parameters:
  • query (string) – The query or reference to the query file (q extension). (templated)
  • query_uri (string) – The uri of a spark sql script on Cloud Storage.
  • variables (dict) – Map of named parameters for the query. (templated)
  • job_name (string) – The job name used in the DataProc cluster. This name by default is the task_id appended with the execution data, but can be templated. The name will always be appended with a random number to avoid name clashes. (templated)
  • cluster_name (string) – The name of the DataProc cluster. (templated)
  • dataproc_spark_properties (dict) – Map for the Pig properties. Ideal to put in default arguments
  • dataproc_spark_jars (list) – URIs to jars provisioned in Cloud Storage (example: for UDFs and libs) and are ideal to put in default arguments.
  • gcp_conn_id (string) – The connection ID to use connecting to Google Cloud Platform.
  • delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • region (string) – The specified region where the dataproc cluster is created.
DataProcSparkOperator
class airflow.contrib.operators.dataproc_operator.DataProcSparkOperator(main_jar=None, main_class=None, arguments=None, archives=None, files=None, job_name='{{task.task_id}}_{{ds_nodash}}', cluster_name='cluster-1', dataproc_spark_properties=None, dataproc_spark_jars=None, gcp_conn_id='google_cloud_default', delegate_to=None, region='global', *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Start a Spark Job on a Cloud DataProc cluster.

Parameters:
  • main_jar (string) – URI of the job jar provisioned on Cloud Storage. (use this or the main_class, not both together).
  • main_class (string) – Name of the job class. (use this or the main_jar, not both together).
  • arguments (list) – Arguments for the job. (templated)
  • archives (list) – List of archived files that will be unpacked in the work directory. Should be stored in Cloud Storage.
  • files (list) – List of files to be copied to the working directory
  • job_name (string) – The job name used in the DataProc cluster. This name by default is the task_id appended with the execution data, but can be templated. The name will always be appended with a random number to avoid name clashes. (templated)
  • cluster_name (string) – The name of the DataProc cluster. (templated)
  • dataproc_spark_properties (dict) – Map for the Pig properties. Ideal to put in default arguments
  • dataproc_spark_jars (list) – URIs to jars provisioned in Cloud Storage (example: for UDFs and libs) and are ideal to put in default arguments.
  • gcp_conn_id (string) – The connection ID to use connecting to Google Cloud Platform.
  • delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • region (string) – The specified region where the dataproc cluster is created.
DataProcHadoopOperator
class airflow.contrib.operators.dataproc_operator.DataProcHadoopOperator(main_jar=None, main_class=None, arguments=None, archives=None, files=None, job_name='{{task.task_id}}_{{ds_nodash}}', cluster_name='cluster-1', dataproc_hadoop_properties=None, dataproc_hadoop_jars=None, gcp_conn_id='google_cloud_default', delegate_to=None, region='global', *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Start a Hadoop Job on a Cloud DataProc cluster.

Parameters:
  • main_jar (string) – URI of the job jar provisioned on Cloud Storage. (use this or the main_class, not both together).
  • main_class (string) – Name of the job class. (use this or the main_jar, not both together).
  • arguments (list) – Arguments for the job. (templated)
  • archives (list) – List of archived files that will be unpacked in the work directory. Should be stored in Cloud Storage.
  • files (list) – List of files to be copied to the working directory
  • job_name (string) – The job name used in the DataProc cluster. This name by default is the task_id appended with the execution data, but can be templated. The name will always be appended with a random number to avoid name clashes. (templated)
  • cluster_name (string) – The name of the DataProc cluster. (templated)
  • dataproc_hadoop_properties (dict) – Map for the Pig properties. Ideal to put in default arguments
  • dataproc_hadoop_jars (list) – URIs to jars provisioned in Cloud Storage (example: for UDFs and libs) and are ideal to put in default arguments.
  • gcp_conn_id (string) – The connection ID to use connecting to Google Cloud Platform.
  • delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • region (string) – The specified region where the dataproc cluster is created.
DataProcPySparkOperator
class airflow.contrib.operators.dataproc_operator.DataProcPySparkOperator(main, arguments=None, archives=None, pyfiles=None, files=None, job_name='{{task.task_id}}_{{ds_nodash}}', cluster_name='cluster-1', dataproc_pyspark_properties=None, dataproc_pyspark_jars=None, gcp_conn_id='google_cloud_default', delegate_to=None, region='global', *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Start a PySpark Job on a Cloud DataProc cluster.

Parameters:
  • main (string) – [Required] The Hadoop Compatible Filesystem (HCFS) URI of the main Python file to use as the driver. Must be a .py file.
  • arguments (list) – Arguments for the job. (templated)
  • archives (list) – List of archived files that will be unpacked in the work directory. Should be stored in Cloud Storage.
  • files (list) – List of files to be copied to the working directory
  • pyfiles (list) – List of Python files to pass to the PySpark framework. Supported file types: .py, .egg, and .zip
  • job_name (string) – The job name used in the DataProc cluster. This name by default is the task_id appended with the execution data, but can be templated. The name will always be appended with a random number to avoid name clashes. (templated)
  • cluster_name (string) – The name of the DataProc cluster.
  • dataproc_pyspark_properties (dict) – Map for the Pig properties. Ideal to put in default arguments
  • dataproc_pyspark_jars (list) – URIs to jars provisioned in Cloud Storage (example: for UDFs and libs) and are ideal to put in default arguments.
  • gcp_conn_id (string) – The connection ID to use connecting to Google Cloud Platform.
  • delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • region (string) – The specified region where the dataproc cluster is created.
DataprocWorkflowTemplateInstantiateOperator
class airflow.contrib.operators.dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(template_id, *args, **kwargs)[source]

Bases: airflow.contrib.operators.dataproc_operator.DataprocWorkflowTemplateBaseOperator

Instantiate a WorkflowTemplate on Google Cloud Dataproc. The operator will wait until the WorkflowTemplate is finished executing.

Parameters:
  • template_id (string) – The id of the template. (templated)
  • project_id (string) – The ID of the google cloud project in which the template runs
  • region (string) – leave as ‘global’, might become relevant in the future
  • gcp_conn_id (string) – The connection ID to use connecting to Google Cloud Platform.
  • delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
DataprocWorkflowTemplateInstantiateInlineOperator
class airflow.contrib.operators.dataproc_operator.DataprocWorkflowTemplateInstantiateInlineOperator(template, *args, **kwargs)[source]

Bases: airflow.contrib.operators.dataproc_operator.DataprocWorkflowTemplateBaseOperator

Instantiate a WorkflowTemplate Inline on Google Cloud Dataproc. The operator will wait until the WorkflowTemplate is finished executing.

Parameters:
  • template (map) – The template contents. (templated)
  • project_id (string) – The ID of the google cloud project in which the template runs
  • region (string) – leave as ‘global’, might become relevant in the future
  • gcp_conn_id (string) – The connection ID to use connecting to Google Cloud Platform.
  • delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.

Cloud Datastore

Datastore Operators

DatastoreExportOperator
class airflow.contrib.operators.datastore_export_operator.DatastoreExportOperator(bucket, namespace=None, datastore_conn_id='google_cloud_default', cloud_storage_conn_id='google_cloud_default', delegate_to=None, entity_filter=None, labels=None, polling_interval_in_seconds=10, overwrite_existing=False, xcom_push=False, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Export entities from Google Cloud Datastore to Cloud Storage

Parameters:
  • bucket (string) – name of the cloud storage bucket to backup data
  • namespace (str) – optional namespace path in the specified Cloud Storage bucket to backup data. If this namespace does not exist in GCS, it will be created.
  • datastore_conn_id (string) – the name of the Datastore connection id to use
  • cloud_storage_conn_id (string) – the name of the cloud storage connection id to force-write backup
  • delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • entity_filter (dict) – description of what data from the project is included in the export, refer to https://cloud.google.com/datastore/docs/reference/rest/Shared.Types/EntityFilter
  • labels (dict) – client-assigned labels for cloud storage
  • polling_interval_in_seconds (int) – number of seconds to wait before polling for execution status again
  • overwrite_existing (bool) – if the storage bucket + namespace is not empty, it will be emptied prior to exports. This enables overwriting existing backups.
  • xcom_push (bool) – push operation name to xcom for reference
DatastoreImportOperator
class airflow.contrib.operators.datastore_import_operator.DatastoreImportOperator(bucket, file, namespace=None, entity_filter=None, labels=None, datastore_conn_id='google_cloud_default', delegate_to=None, polling_interval_in_seconds=10, xcom_push=False, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Import entities from Cloud Storage to Google Cloud Datastore

Parameters:
  • bucket (string) – container in Cloud Storage to store data
  • file (string) – path of the backup metadata file in the specified Cloud Storage bucket. It should have the extension .overall_export_metadata
  • namespace (str) – optional namespace of the backup metadata file in the specified Cloud Storage bucket.
  • entity_filter (dict) – description of what data from the project is included in the export, refer to https://cloud.google.com/datastore/docs/reference/rest/Shared.Types/EntityFilter
  • labels (dict) – client-assigned labels for cloud storage
  • datastore_conn_id (string) – the name of the connection id to use
  • delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • polling_interval_in_seconds (int) – number of seconds to wait before polling for execution status again
  • xcom_push (bool) – push operation name to xcom for reference

DatastoreHook

class airflow.contrib.hooks.datastore_hook.DatastoreHook(datastore_conn_id='google_cloud_datastore_default', delegate_to=None)[source]

Bases: airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook

Interact with Google Cloud Datastore. This hook uses the Google Cloud Platform connection.

This object is not threads safe. If you want to make multiple requests simultaneously, you will need to create a hook per thread.

allocate_ids(partialKeys)[source]

Allocate IDs for incomplete keys. see https://cloud.google.com/datastore/docs/reference/rest/v1/projects/allocateIds

Parameters:partialKeys – a list of partial keys
Returns:a list of full keys.
begin_transaction()[source]

Get a new transaction handle

Returns:a transaction handle
commit(body)[source]

Commit a transaction, optionally creating, deleting or modifying some entities.

Parameters:body – the body of the commit request
Returns:the response body of the commit request
delete_operation(name)[source]

Deletes the long-running operation

Parameters:name – the name of the operation resource
export_to_storage_bucket(bucket, namespace=None, entity_filter=None, labels=None)[source]

Export entities from Cloud Datastore to Cloud Storage for backup

get_conn(version='v1')[source]

Returns a Google Cloud Storage service object.

get_operation(name)[source]

Gets the latest state of a long-running operation

Parameters:name – the name of the operation resource
import_from_storage_bucket(bucket, file, namespace=None, entity_filter=None, labels=None)[source]

Import a backup from Cloud Storage to Cloud Datastore

lookup(keys, read_consistency=None, transaction=None)[source]

Lookup some entities by key

Parameters:
  • keys – the keys to lookup
  • read_consistency – the read consistency to use. default, strong or eventual. Cannot be used with a transaction.
  • transaction – the transaction to use, if any.
Returns:

the response body of the lookup request.

poll_operation_until_done(name, polling_interval_in_seconds)[source]

Poll backup operation state until it’s completed

rollback(transaction)[source]

Roll back a transaction

Parameters:transaction – the transaction to roll back
run_query(body)[source]

Run a query for entities.

Parameters:body – the body of the query request
Returns:the batch of query results.

Cloud ML Engine

Cloud ML Engine Operators

MLEngineBatchPredictionOperator
class airflow.contrib.operators.mlengine_operator.MLEngineBatchPredictionOperator(project_id, job_id, region, data_format, input_paths, output_path, model_name=None, version_name=None, uri=None, max_worker_count=None, runtime_version=None, gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Start a Google Cloud ML Engine prediction job.

NOTE: For model origin, users should consider exactly one from the three options below: 1. Populate ‘uri’ field only, which should be a GCS location that points to a tensorflow savedModel directory. 2. Populate ‘model_name’ field only, which refers to an existing model, and the default version of the model will be used. 3. Populate both ‘model_name’ and ‘version_name’ fields, which refers to a specific version of a specific model.

In options 2 and 3, both model and version name should contain the minimal identifier. For instance, call

MLEngineBatchPredictionOperator(
    ...,
    model_name='my_model',
    version_name='my_version',
    ...)

if the desired model version is “projects/my_project/models/my_model/versions/my_version”.

See https://cloud.google.com/ml-engine/reference/rest/v1/projects.jobs for further documentation on the parameters.

Parameters:
  • project_id (string) – The Google Cloud project name where the prediction job is submitted. (templated)
  • job_id (string) – A unique id for the prediction job on Google Cloud ML Engine. (templated)
  • data_format (string) – The format of the input data. It will default to ‘DATA_FORMAT_UNSPECIFIED’ if is not provided or is not one of [“TEXT”, “TF_RECORD”, “TF_RECORD_GZIP”].
  • input_paths (list of string) – A list of GCS paths of input data for batch prediction. Accepting wildcard operator *, but only at the end. (templated)
  • output_path (string) – The GCS path where the prediction results are written to. (templated)
  • region (string) – The Google Compute Engine region to run the prediction job in. (templated)
  • model_name (string) – The Google Cloud ML Engine model to use for prediction. If version_name is not provided, the default version of this model will be used. Should not be None if version_name is provided. Should be None if uri is provided. (templated)
  • version_name (string) – The Google Cloud ML Engine model version to use for prediction. Should be None if uri is provided. (templated)
  • uri (string) – The GCS path of the saved model to use for prediction. Should be None if model_name is provided. It should be a GCS path pointing to a tensorflow SavedModel. (templated)
  • max_worker_count (int) – The maximum number of workers to be used for parallel processing. Defaults to 10 if not specified.
  • runtime_version (string) – The Google Cloud ML Engine runtime version to use for batch prediction.
  • gcp_conn_id (string) – The connection ID used for connection to Google Cloud Platform.
  • delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have doamin-wide delegation enabled.
Raises:
ValueError: if a unique model/version origin cannot be determined.
MLEngineModelOperator
class airflow.contrib.operators.mlengine_operator.MLEngineModelOperator(project_id, model, operation='create', gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Operator for managing a Google Cloud ML Engine model.

Parameters:
  • project_id (string) – The Google Cloud project name to which MLEngine model belongs. (templated)
  • model (dict) –

    A dictionary containing the information about the model. If the operation is create, then the model parameter should contain all the information about this model such as name.

    If the operation is get, the model parameter should contain the name of the model.

  • operation

    The operation to perform. Available operations are:

    • create: Creates a new model as provided by the model parameter.
    • get: Gets a particular model where the name is specified in model.
  • gcp_conn_id (string) – The connection ID to use when fetching connection info.
  • delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
MLEngineTrainingOperator
class airflow.contrib.operators.mlengine_operator.MLEngineTrainingOperator(project_id, job_id, package_uris, training_python_module, training_args, region, scale_tier=None, runtime_version=None, python_version=None, job_dir=None, gcp_conn_id='google_cloud_default', delegate_to=None, mode='PRODUCTION', *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Operator for launching a MLEngine training job.

Parameters:
  • project_id (string) – The Google Cloud project name within which MLEngine training job should run (templated).
  • job_id (string) – A unique templated id for the submitted Google MLEngine training job. (templated)
  • package_uris (string) – A list of package locations for MLEngine training job, which should include the main training program + any additional dependencies. (templated)
  • training_python_module (string) – The Python module name to run within MLEngine training job after installing ‘package_uris’ packages. (templated)
  • training_args (string) – A list of templated command line arguments to pass to the MLEngine training program. (templated)
  • region (string) – The Google Compute Engine region to run the MLEngine training job in (templated).
  • scale_tier (string) – Resource tier for MLEngine training job. (templated)
  • runtime_version (string) – The Google Cloud ML runtime version to use for training. (templated)
  • python_version (string) – The version of Python used in training. (templated)
  • job_dir (string) – A Google Cloud Storage path in which to store training outputs and other data needed for training. (templated)
  • gcp_conn_id (string) – The connection ID to use when fetching connection info.
  • delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • mode (string) – Can be one of ‘DRY_RUN’/’CLOUD’. In ‘DRY_RUN’ mode, no real training job will be launched, but the MLEngine training job request will be printed out. In ‘CLOUD’ mode, a real MLEngine training job creation request will be issued.
MLEngineVersionOperator
class airflow.contrib.operators.mlengine_operator.MLEngineVersionOperator(project_id, model_name, version_name=None, version=None, operation='create', gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Operator for managing a Google Cloud ML Engine version.

Parameters:
  • project_id (string) – The Google Cloud project name to which MLEngine model belongs.
  • model_name (string) – The name of the Google Cloud ML Engine model that the version belongs to. (templated)
  • version_name (string) – A name to use for the version being operated upon. If not None and the version argument is None or does not have a value for the name key, then this will be populated in the payload for the name key. (templated)
  • version (dict) – A dictionary containing the information about the version. If the operation is create, version should contain all the information about this version such as name, and deploymentUrl. If the operation is get or delete, the version parameter should contain the name of the version. If it is None, the only operation possible would be list. (templated)
  • operation (string) –

    The operation to perform. Available operations are:

    • create: Creates a new version in the model specified by model_name, in which case the version parameter should contain all the information to create that version (e.g. name, deploymentUrl).
    • get: Gets full information of a particular version in the model specified by model_name. The name of the version should be specified in the version parameter.
    • list: Lists all available versions of the model specified by model_name.
    • delete: Deletes the version specified in version parameter from the model specified by model_name). The name of the version should be specified in the version parameter.
  • gcp_conn_id (string) – The connection ID to use when fetching connection info.
  • delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.

Cloud ML Engine Hook

MLEngineHook
class airflow.contrib.hooks.gcp_mlengine_hook.MLEngineHook(gcp_conn_id='google_cloud_default', delegate_to=None)[source]

Bases: airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook

create_job(project_id, job, use_existing_job_fn=None)[source]

Launches a MLEngine job and wait for it to reach a terminal state.

Parameters:
  • project_id (string) – The Google Cloud project id within which MLEngine job will be launched.
  • job (dict) –

    MLEngine Job object that should be provided to the MLEngine API, such as:

    {
      'jobId': 'my_job_id',
      'trainingInput': {
        'scaleTier': 'STANDARD_1',
        ...
      }
    }
    
  • use_existing_job_fn (function) – In case that a MLEngine job with the same job_id already exist, this method (if provided) will decide whether we should use this existing job, continue waiting for it to finish and returning the job object. It should accepts a MLEngine job object, and returns a boolean value indicating whether it is OK to reuse the existing job. If ‘use_existing_job_fn’ is not provided, we by default reuse the existing MLEngine job.
Returns:

The MLEngine job object if the job successfully reach a terminal state (which might be FAILED or CANCELLED state).

Return type:

dict

create_model(project_id, model)[source]

Create a Model. Blocks until finished.

create_version(project_id, model_name, version_spec)[source]

Creates the Version on Google Cloud ML Engine.

Returns the operation if the version was created successfully and raises an error otherwise.

delete_version(project_id, model_name, version_name)[source]

Deletes the given version of a model. Blocks until finished.

get_conn()[source]

Returns a Google MLEngine service object.

get_model(project_id, model_name)[source]

Gets a Model. Blocks until finished.

list_versions(project_id, model_name)[source]

Lists all available versions of a model. Blocks until finished.

set_default_version(project_id, model_name, version_name)[source]

Sets a version to be the default. Blocks until finished.

Cloud Storage

Storage Operators

FileToGoogleCloudStorageOperator
class airflow.contrib.operators.file_to_gcs.FileToGoogleCloudStorageOperator(src, dst, bucket, google_cloud_storage_conn_id='google_cloud_default', mime_type='application/octet-stream', delegate_to=None, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Uploads a file to Google Cloud Storage

Parameters:
  • src (string) – Path to the local file. (templated)
  • dst (string) – Destination path within the specified bucket. (templated)
  • bucket (string) – The bucket to upload to. (templated)
  • google_cloud_storage_conn_id (string) – The Airflow connection ID to upload with
  • mime_type (string) – The mime-type string
  • delegate_to (string) – The account to impersonate, if any
execute(context)[source]

Uploads the file to Google cloud storage

GoogleCloudStorageCreateBucketOperator
class airflow.contrib.operators.gcs_operator.GoogleCloudStorageCreateBucketOperator(bucket_name, storage_class='MULTI_REGIONAL', location='US', project_id=None, labels=None, google_cloud_storage_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Creates a new bucket. Google Cloud Storage uses a flat namespace, so you can’t create a bucket with a name that is already in use.

See also

For more information, see Bucket Naming Guidelines: https://cloud.google.com/storage/docs/bucketnaming.html#requirements

Parameters:
  • bucket_name (string) – The name of the bucket. (templated)
  • storage_class (string) –

    This defines how objects in the bucket are stored and determines the SLA and the cost of storage (templated). Values include

    • MULTI_REGIONAL
    • REGIONAL
    • STANDARD
    • NEARLINE
    • COLDLINE.

    If this value is not specified when the bucket is created, it will default to STANDARD.

  • location (string) –

    The location of the bucket. (templated) Object data for objects in the bucket resides in physical storage within this region. Defaults to US.

  • project_id (string) – The ID of the GCP Project. (templated)
  • labels (dict) – User-provided labels, in key/value pairs.
  • google_cloud_storage_conn_id (string) – The connection ID to use when connecting to Google cloud storage.
  • delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
Example:

The following Operator would create a new bucket test-bucket with MULTI_REGIONAL storage class in EU region

CreateBucket = GoogleCloudStorageCreateBucketOperator(
    task_id='CreateNewBucket',
    bucket_name='test-bucket',
    storage_class='MULTI_REGIONAL',
    location='EU',
    labels={'env': 'dev', 'team': 'airflow'},
    google_cloud_storage_conn_id='airflow-service-account'
)
GoogleCloudStorageDownloadOperator
class airflow.contrib.operators.gcs_download_operator.GoogleCloudStorageDownloadOperator(bucket, object, filename=None, store_to_xcom_key=None, google_cloud_storage_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Downloads a file from Google Cloud Storage.

Parameters:
  • bucket (string) – The Google cloud storage bucket where the object is. (templated)
  • object (string) – The name of the object to download in the Google cloud storage bucket. (templated)
  • filename (string) – The file path on the local file system (where the operator is being executed) that the file should be downloaded to. (templated) If no filename passed, the downloaded data will not be stored on the local file system.
  • store_to_xcom_key (string) – If this param is set, the operator will push the contents of the downloaded file to XCom with the key set in this parameter. If not set, the downloaded data will not be pushed to XCom. (templated)
  • google_cloud_storage_conn_id (string) – The connection ID to use when connecting to Google cloud storage.
  • delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
GoogleCloudStorageListOperator
class airflow.contrib.operators.gcs_list_operator.GoogleCloudStorageListOperator(bucket, prefix=None, delimiter=None, google_cloud_storage_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

List all objects from the bucket with the give string prefix and delimiter in name.

This operator returns a python list with the name of objects which can be used by
xcom in the downstream task.
Parameters:
  • bucket (string) – The Google cloud storage bucket to find the objects. (templated)
  • prefix (string) – Prefix string which filters objects whose name begin with this prefix. (templated)
  • delimiter (string) – The delimiter by which you want to filter the objects. (templated) For e.g to lists the CSV files from in a directory in GCS you would use delimiter=’.csv’.
  • google_cloud_storage_conn_id (string) – The connection ID to use when connecting to Google cloud storage.
  • delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
Example:

The following Operator would list all the Avro files from sales/sales-2017 folder in data bucket.

GCS_Files = GoogleCloudStorageListOperator(
    task_id='GCS_Files',
    bucket='data',
    prefix='sales/sales-2017/',
    delimiter='.avro',
    google_cloud_storage_conn_id=google_cloud_conn_id
)
GoogleCloudStorageToBigQueryOperator
class airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator(bucket, source_objects, destination_project_dataset_table, schema_fields=None, schema_object=None, source_format='CSV', compression='NONE', create_disposition='CREATE_IF_NEEDED', skip_leading_rows=0, write_disposition='WRITE_EMPTY', field_delimiter=', ', max_bad_records=0, quote_character=None, ignore_unknown_values=False, allow_quoted_newlines=False, allow_jagged_rows=False, max_id_key=None, bigquery_conn_id='bigquery_default', google_cloud_storage_conn_id='google_cloud_default', delegate_to=None, schema_update_options=(), src_fmt_configs={}, external_table=False, time_partitioning={}, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Loads files from Google cloud storage into BigQuery.

The schema to be used for the BigQuery table may be specified in one of two ways. You may either directly pass the schema fields in, or you may point the operator to a Google cloud storage object name. The object in Google cloud storage must be a JSON file with the schema fields in it.

Parameters:
  • bucket (string) – The bucket to load from. (templated)
  • source_objects – List of Google cloud storage URIs to load from. (templated) If source_format is ‘DATASTORE_BACKUP’, the list must only contain a single URI.
  • destination_project_dataset_table (string) – The dotted (<project>.)<dataset>.<table> BigQuery table to load data into. If <project> is not included, project will be the project defined in the connection json. (templated)
  • schema_fields (list) – If set, the schema field list as defined here: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load Should not be set when source_format is ‘DATASTORE_BACKUP’.
  • schema_object – If set, a GCS object path pointing to a .json file that contains the schema for the table. (templated)
  • schema_object – string
  • source_format (string) – File format to export.
  • compression (string) – [Optional] The compression type of the data source. Possible values include GZIP and NONE. The default value is NONE. This setting is ignored for Google Cloud Bigtable, Google Cloud Datastore backups and Avro formats.
  • create_disposition (string) – The create disposition if the table doesn’t exist.
  • skip_leading_rows (int) – Number of rows to skip when loading from a CSV.
  • write_disposition (string) – The write disposition if the table already exists.
  • field_delimiter (string) – The delimiter to use when loading from a CSV.
  • max_bad_records (int) – The maximum number of bad records that BigQuery can ignore when running the job.
  • quote_character (string) – The value that is used to quote data sections in a CSV file.
  • ignore_unknown_values (bool) – [Optional] Indicates if BigQuery should allow extra values that are not represented in the table schema. If true, the extra values are ignored. If false, records with extra columns are treated as bad records, and if there are too many bad records, an invalid error is returned in the job result.
  • allow_quoted_newlines (boolean) – Whether to allow quoted newlines (true) or not (false).
  • allow_jagged_rows (bool) – Accept rows that are missing trailing optional columns. The missing values are treated as nulls. If false, records with missing trailing columns are treated as bad records, and if there are too many bad records, an invalid error is returned in the job result. Only applicable to CSV, ignored for other formats.
  • max_id_key (string) – If set, the name of a column in the BigQuery table that’s to be loaded. Thsi will be used to select the MAX value from BigQuery after the load occurs. The results will be returned by the execute() command, which in turn gets stored in XCom for future operators to use. This can be helpful with incremental loads–during future executions, you can pick up from the max ID.
  • bigquery_conn_id (string) – Reference to a specific BigQuery hook.
  • google_cloud_storage_conn_id (string) – Reference to a specific Google cloud storage hook.
  • delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
  • schema_update_options (list) – Allows the schema of the destination table to be updated as a side effect of the load job.
  • src_fmt_configs (dict) – configure optional fields specific to the source format
  • external_table (bool) – Flag to specify if the destination table should be a BigQuery external table. Default Value is False.
  • time_partitioning (dict) – configure optional time partitioning fields i.e. partition by field, type and expiration as per API specifications. Note that ‘field’ is not available in concurrency with dataset.table$partition.
GoogleCloudStorageToGoogleCloudStorageOperator
class airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageOperator(source_bucket, source_object, destination_bucket=None, destination_object=None, move_object=False, google_cloud_storage_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

Copies objects from a bucket to another, with renaming if requested.

Parameters:
  • source_bucket (string) – The source Google cloud storage bucket where the object is. (templated)
  • source_object (string) –

    The source name of the object to copy in the Google cloud storage bucket. (templated) If wildcards are used in this argument:

    You can use only one wildcard for objects (filenames) within your bucket. The wildcard can appear inside the object name or at the end of the object name. Appending a wildcard to the bucket name is unsupported.
  • destination_bucket – The destination Google cloud storage bucket

where the object should be. (templated) :type destination_bucket: string :param destination_object: The destination name of the object in the

destination Google cloud storage bucket. (templated) If a wildcard is supplied in the source_object argument, this is the prefix that will be prepended to the final destination objects’ paths. Note that the source path’s part before the wildcard will be removed; if it needs to be retained it should be appended to destination_object. For example, with prefix foo/* and destination_object ‘blah/`, the file foo/baz will be copied to blah/baz; to retain the prefix write the destination_object as e.g. blah/foo, in which case the copied file will be named blah/foo/baz.
Parameters:move_object – When move object is True, the object is moved instead
of copied to the new location.
This is the equivalent of a mv command as opposed to a cp command.
Parameters:
  • google_cloud_storage_conn_id (string) – The connection ID to use when connecting to Google cloud storage.
  • delegate_to (string) – The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled.
Examples:

The following Operator would copy a single file named sales/sales-2017/january.avro in the data bucket to the file named copied_sales/2017/january-backup.avro` in the ``data_backup bucket

copy_single_file = GoogleCloudStorageToGoogleCloudStorageOperator(
    task_id='copy_single_file',
    source_bucket='data',
    source_object='sales/sales-2017/january.avro',
    destination_bucket='data_backup',
    destination_object='copied_sales/2017/january-backup.avro',
    google_cloud_storage_conn_id=google_cloud_conn_id
)

The following Operator would copy all the Avro files from sales/sales-2017 folder (i.e. with names starting with that prefix) in data bucket to the copied_sales/2017 folder in the data_backup bucket.

copy_files = GoogleCloudStorageToGoogleCloudStorageOperator(
    task_id='copy_files',
    source_bucket='data',
    source_object='sales/sales-2017/*.avro',
    destination_bucket='data_backup',
    destination_object='copied_sales/2017/',
    google_cloud_storage_conn_id=google_cloud_conn_id
)

The following Operator would move all the Avro files from sales/sales-2017 folder (i.e. with names starting with that prefix) in data bucket to the same folder in the data_backup bucket, deleting the original files in the process.

move_files = GoogleCloudStorageToGoogleCloudStorageOperator(
    task_id='move_files',
    source_bucket='data',
    source_object='sales/sales-2017/*.avro',
    destination_bucket='data_backup',
    move_object=True,
    google_cloud_storage_conn_id=google_cloud_conn_id
)

GoogleCloudStorageHook

class airflow.contrib.hooks.gcs_hook.GoogleCloudStorageHook(google_cloud_storage_conn_id='google_cloud_default', delegate_to=None)[source]

Bases: airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook

Interact with Google Cloud Storage. This hook uses the Google Cloud Platform connection.

copy(source_bucket, source_object, destination_bucket=None, destination_object=None)[source]

Copies an object from a bucket to another, with renaming if requested.

destination_bucket or destination_object can be omitted, in which case source bucket/object is used, but not both.

Parameters:
  • source_bucket (string) – The bucket of the object to copy from.
  • source_object (string) – The object to copy.
  • destination_bucket (string) – The destination of the object to copied to. Can be omitted; then the same bucket is used.
  • destination_object – The (renamed) path of the object if given. Can be omitted; then the same name is used.
create_bucket(bucket_name, storage_class='MULTI_REGIONAL', location='US', project_id=None, labels=None)[source]

Creates a new bucket. Google Cloud Storage uses a flat namespace, so you can’t create a bucket with a name that is already in use.

See also

For more information, see Bucket Naming Guidelines: https://cloud.google.com/storage/docs/bucketnaming.html#requirements

Parameters:
  • bucket_name (string) – The name of the bucket.
  • storage_class (string) –

    This defines how objects in the bucket are stored and determines the SLA and the cost of storage. Values include

    • MULTI_REGIONAL
    • REGIONAL
    • STANDARD
    • NEARLINE
    • COLDLINE.

    If this value is not specified when the bucket is created, it will default to STANDARD.

  • location (string) –

    The location of the bucket. Object data for objects in the bucket resides in physical storage within this region. Defaults to US.

  • project_id (string) – The ID of the GCP Project.
  • labels (dict) – User-provided labels, in key/value pairs.
Returns:

If successful, it returns the id of the bucket.

delete(bucket, object, generation=None)[source]

Delete an object if versioning is not enabled for the bucket, or if generation parameter is used.

Parameters:
  • bucket (string) – name of the bucket, where the object resides
  • object (string) – name of the object to delete
  • generation (string) – if present, permanently delete the object of this generation
Returns:

True if succeeded

download(bucket, object, filename=None)[source]

Get a file from Google Cloud Storage.

Parameters:
  • bucket (string) – The bucket to fetch from.
  • object (string) – The object to fetch.
  • filename (string) – If set, a local file path where the file should be written to.
exists(bucket, object)[source]

Checks for the existence of a file in Google Cloud Storage.

Parameters:
  • bucket (string) – The Google cloud storage bucket where the object is.
  • object (string) – The name of the object to check in the Google cloud storage bucket.
get_conn()[source]

Returns a Google Cloud Storage service object.

get_crc32c(bucket, object)[source]

Gets the CRC32c checksum of an object in Google Cloud Storage.

Parameters:
  • bucket (string) – The Google cloud storage bucket where the object is.
  • object (string) – The name of the object to check in the Google cloud storage bucket.
get_md5hash(bucket, object)[source]

Gets the MD5 hash of an object in Google Cloud Storage.

Parameters:
  • bucket (string) – The Google cloud storage bucket where the object is.
  • object (string) – The name of the object to check in the Google cloud storage bucket.
get_size(bucket, object)[source]

Gets the size of a file in Google Cloud Storage.

Parameters:
  • bucket (string) – The Google cloud storage bucket where the object is.
  • object (string) – The name of the object to check in the Google cloud storage bucket.
is_updated_after(bucket, object, ts)[source]

Checks if an object is updated in Google Cloud Storage.

Parameters:
  • bucket (string) – The Google cloud storage bucket where the object is.
  • object (string) – The name of the object to check in the Google cloud storage bucket.
  • ts (datetime) – The timestamp to check against.
list(bucket, versions=None, maxResults=None, prefix=None, delimiter=None)[source]

List all objects from the bucket with the give string prefix in name

Parameters:
  • bucket (string) – bucket name
  • versions (boolean) – if true, list all versions of the objects
  • maxResults (integer) – max count of items to return in a single page of responses
  • prefix (string) – prefix string which filters objects whose name begin with this prefix
  • delimiter (string) – filters objects based on the delimiter (for e.g ‘.csv’)
Returns:

a stream of object names matching the filtering criteria

rewrite(source_bucket, source_object, destination_bucket, destination_object=None)[source]

Has the same functionality as copy, except that will work on files over 5 TB, as well as when copying between locations and/or storage classes.

destination_object can be omitted, in which case source_object is used.

Parameters:
  • source_bucket (string) – The bucket of the object to copy from.
  • source_object (string) – The object to copy.
  • destination_bucket (string) – The destination of the object to copied to.
  • destination_object – The (renamed) path of the object if given. Can be omitted; then the same name is used.
upload(bucket, object, filename, mime_type='application/octet-stream')[source]

Uploads a local file to Google Cloud Storage.

Parameters:
  • bucket (string) – The bucket to upload to.
  • object (string) – The object name to set when uploading the local file.
  • filename (string) – The local file path to the file to be uploaded.
  • mime_type (string) – The MIME type to set when uploading the file.

Google Kubernetes Engine

Google Kubernetes Engine Cluster Operators

GKEClusterCreateOperator
class airflow.contrib.operators.gcp_container_operator.GKEClusterCreateOperator(project_id, location, body={}, gcp_conn_id='google_cloud_default', api_version='v2', *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

GKEClusterDeleteOperator
class airflow.contrib.operators.gcp_container_operator.GKEClusterDeleteOperator(project_id, name, location, gcp_conn_id='google_cloud_default', api_version='v2', *args, **kwargs)[source]

Bases: airflow.models.BaseOperator

GKEPodOperator

Google Kubernetes Engine Hook

class airflow.contrib.hooks.gcp_container_hook.GKEClusterHook(project_id, location)[source]

Bases: airflow.hooks.base_hook.BaseHook

create_cluster(cluster, retry=<object object>, timeout=<object object>)[source]

Creates a cluster, consisting of the specified number and type of Google Compute Engine instances.

Parameters:
  • cluster (dict or google.cloud.container_v1.types.Cluster) – A Cluster protobuf or dict. If dict is provided, it must be of the same form as the protobuf message google.cloud.container_v1.types.Cluster
  • retry (google.api_core.retry.Retry) – A retry object (google.api_core.retry.Retry) used to retry requests. If None is specified, requests will not be retried.
  • timeout (float) – The amount of time, in seconds, to wait for the request to complete. Note that if retry is specified, the timeout applies to each individual attempt.
Returns:

The full url to the new, or existing, cluster

:raises
ParseError: On JSON parsing problems when trying to convert dict AirflowException: cluster is not dict type nor Cluster proto type
delete_cluster(name, retry=<object object>, timeout=<object object>)[source]

Deletes the cluster, including the Kubernetes endpoint and all worker nodes. Firewalls and routes that were configured during cluster creation are also deleted. Other Google Compute Engine resources that might be in use by the cluster (e.g. load balancer resources) will not be deleted if they weren’t present at the initial create time.

Parameters:
  • name (str) – The name of the cluster to delete
  • retry (google.api_core.retry.Retry) – Retry object used to determine when/if to retry requests. If None is specified, requests will not be retried.
  • timeout (float) – The amount of time, in seconds, to wait for the request to complete. Note that if retry is specified, the timeout applies to each individual attempt.
Returns:

The full url to the delete operation if successful, else None

get_cluster(name, retry=<object object>, timeout=<object object>)[source]

Gets details of specified cluster :param name: The name of the cluster to retrieve :type name: str :param retry: A retry object used to retry requests. If None is specified,

requests will not be retried.
Parameters:timeout (float) – The amount of time, in seconds, to wait for the request to complete. Note that if retry is specified, the timeout applies to each individual attempt.
Returns:A google.cloud.container_v1.types.Cluster instance
get_operation(operation_name)[source]

Fetches the operation from Google Cloud :param operation_name: Name of operation to fetch :type operation_name: str :return: The new, updated operation from Google Cloud

wait_for_operation(operation)[source]

Given an operation, continuously fetches the status from Google Cloud until either completion or an error occurring :param operation: The Operation to wait for :type operation: A google.cloud.container_V1.gapic.enums.Operator :return: A new, updated operation fetched from Google Cloud