Amazon S3¶
Amazon Simple Storage Service (Amazon S3) is storage for the internet. You can use Amazon S3 to store and retrieve any amount of data at any time, from anywhere on the web.
Prerequisite Tasks¶
To use these operators, you must do a few things:
Create necessary resources using AWS Console or AWS CLI.
Install API libraries via pip.
pip install 'apache-airflow[amazon]'Detailed information is available Installation of Airflow®
Operators¶
Create an Amazon S3 bucket¶
To create an Amazon S3 bucket you can use
S3CreateBucketOperator
.
tests/system/amazon/aws/example_s3.py
create_bucket = S3CreateBucketOperator(
task_id="create_bucket",
bucket_name=bucket_name,
)
Delete an Amazon S3 bucket¶
To delete an Amazon S3 bucket you can use
S3DeleteBucketOperator
.
tests/system/amazon/aws/example_s3.py
delete_bucket = S3DeleteBucketOperator(
task_id="delete_bucket",
bucket_name=bucket_name,
force_delete=True,
)
Set the tags for an Amazon S3 bucket¶
To set the tags for an Amazon S3 bucket you can use
S3PutBucketTaggingOperator
.
tests/system/amazon/aws/example_s3.py
put_tagging = S3PutBucketTaggingOperator(
task_id="put_tagging",
bucket_name=bucket_name,
key=TAG_KEY,
value=TAG_VALUE,
)
Get the tag of an Amazon S3 bucket¶
To get the tag set associated with an Amazon S3 bucket you can use
S3GetBucketTaggingOperator
.
tests/system/amazon/aws/example_s3.py
get_tagging = S3GetBucketTaggingOperator(
task_id="get_tagging",
bucket_name=bucket_name,
)
Delete the tags of an Amazon S3 bucket¶
To delete the tags of an Amazon S3 bucket you can use
S3DeleteBucketTaggingOperator
.
tests/system/amazon/aws/example_s3.py
delete_tagging = S3DeleteBucketTaggingOperator(
task_id="delete_tagging",
bucket_name=bucket_name,
)
Create an Amazon S3 object¶
To create a new (or replace) Amazon S3 object you can use
S3CreateObjectOperator
.
tests/system/amazon/aws/example_s3.py
create_object = S3CreateObjectOperator(
task_id="create_object",
s3_bucket=bucket_name,
s3_key=key,
data=DATA,
replace=True,
)
Copy an Amazon S3 object¶
To copy an Amazon S3 object from one bucket to another you can use
S3CopyObjectOperator
.
The Amazon S3 connection used here needs to have access to both source and destination bucket/key.
tests/system/amazon/aws/example_s3.py
copy_object = S3CopyObjectOperator(
task_id="copy_object",
source_bucket_name=bucket_name,
dest_bucket_name=bucket_name_2,
source_bucket_key=key,
dest_bucket_key=key_2,
)
Delete Amazon S3 objects¶
To delete one or multiple Amazon S3 objects you can use
S3DeleteObjectsOperator
.
tests/system/amazon/aws/example_s3.py
delete_objects = S3DeleteObjectsOperator(
task_id="delete_objects",
bucket=bucket_name_2,
keys=key_2,
)
Transform an Amazon S3 object¶
To transform the data from one Amazon S3 object and save it to another object you can use
S3FileTransformOperator
.
You can also apply an optional Amazon S3 Select expression
to select the data you want to retrieve from source_s3_key
using select_expression
.
tests/system/amazon/aws/example_s3.py
file_transform = S3FileTransformOperator(
task_id="file_transform",
source_s3_key=f"s3://{bucket_name}/{key}",
dest_s3_key=f"s3://{bucket_name_2}/{key_2}",
# Use `cp` command as transform script as an example
transform_script="cp",
replace=True,
)
List Amazon S3 prefixes¶
To list all Amazon S3 prefixes within an Amazon S3 bucket you can use
S3ListPrefixesOperator
.
See here
for more information about Amazon S3 prefixes.
tests/system/amazon/aws/example_s3.py
list_prefixes = S3ListPrefixesOperator(
task_id="list_prefixes",
bucket=bucket_name,
prefix=PREFIX,
delimiter=DELIMITER,
)
List Amazon S3 objects¶
To list all Amazon S3 objects within an Amazon S3 bucket you can use
S3ListOperator
.
You can specify a prefix
to filter the objects whose name begins with such prefix.
tests/system/amazon/aws/example_s3.py
list_keys = S3ListOperator(
task_id="list_keys",
bucket=bucket_name,
prefix=PREFIX,
)
Sensors¶
Wait on an Amazon S3 key¶
To wait for one or multiple keys to be present in an Amazon S3 bucket you can use
S3KeySensor
.
For each key, it calls
head_object
API (or list_objects_v2
API if wildcard_match
is True
) to check whether it is present or not.
Please keep in mind, especially when used to check a large volume of keys, that it makes one API call per key.
To check one file:
tests/system/amazon/aws/example_s3.py
# Check if a file exists
sensor_one_key = S3KeySensor(
task_id="sensor_one_key",
bucket_name=bucket_name,
bucket_key=key,
)
To check multiple files:
tests/system/amazon/aws/example_s3.py
# Check if both files exist
sensor_two_keys = S3KeySensor(
task_id="sensor_two_keys",
bucket_name=bucket_name,
bucket_key=[key, key_2],
)
To check a file with regular expression:
tests/system/amazon/aws/example_s3.py
# Check if a file exists and match a certain regular expression pattern
sensor_key_with_regex = S3KeySensor(
task_id="sensor_key_with_regex", bucket_name=bucket_name, bucket_key=key_regex_pattern, use_regex=True
)
To check with an additional custom check you can define a function which receives a list of matched S3 object attributes and returns a boolean:
True
: a certain criteria is metFalse
: the criteria isn’t met
This function is called for each key passed as parameter in bucket_key
.
The reason why the parameter of this function is a list of objects is when wildcard_match
is True
,
multiple files can match one key. The list of matched S3 object attributes contain only the size and is this format:
[{"Size": int}]
tests/system/amazon/aws/example_s3.py
def check_fn(files: list, **kwargs) -> bool:
"""
Example of custom check: check if all files are bigger than ``20 bytes``
:param files: List of S3 object attributes.
:return: true if the criteria is met
"""
return all(f.get("Size", 0) > 20 for f in files)
tests/system/amazon/aws/example_s3.py
# Check if a file exists and match a certain pattern defined in check_fn
sensor_key_with_function = S3KeySensor(
task_id="sensor_key_with_function",
bucket_name=bucket_name,
bucket_key=key,
check_fn=check_fn,
)
You can also run this operator in deferrable mode by setting the parameter deferrable
to True.
This will lead to efficient utilization of Airflow workers as polling for job status happens on
the triggerer asynchronously. Note that this will need triggerer to be available on your Airflow deployment.
To check one file:
tests/system/amazon/aws/example_s3.py
# Check if a file exists
sensor_one_key_deferrable = S3KeySensor(
task_id="sensor_one_key_deferrable",
bucket_name=bucket_name,
bucket_key=key,
deferrable=True,
)
To check multiple files:
tests/system/amazon/aws/example_s3.py
# Check if both files exist
sensor_two_keys_deferrable = S3KeySensor(
task_id="sensor_two_keys_deferrable",
bucket_name=bucket_name,
bucket_key=[key, key_2],
deferrable=True,
)
To check a file with regular expression:
tests/system/amazon/aws/example_s3.py
# Check if a file exists and match a certain regular expression pattern
sensor_key_with_regex_deferrable = S3KeySensor(
task_id="sensor_key_with_regex_deferrable",
bucket_name=bucket_name,
bucket_key=key_regex_pattern,
use_regex=True,
deferrable=True,
)
Wait on Amazon S3 prefix changes¶
To check for changes in the number of objects at a specific prefix in an Amazon S3 bucket and waits until
the inactivity period has passed with no increase in the number of objects you can use
S3KeysUnchangedSensor
.
Note, this sensor will not behave correctly in reschedule mode,
as the state of the listed objects in the Amazon S3 bucket will be lost between rescheduled invocations.
tests/system/amazon/aws/example_s3.py
sensor_keys_unchanged = S3KeysUnchangedSensor(
task_id="sensor_keys_unchanged",
bucket_name=bucket_name_2,
prefix=PREFIX,
inactivity_period=10, # inactivity_period in seconds
)
You can also run this sensor in deferrable mode by setting the parameter deferrable
to True.
This will lead to efficient utilization of Airflow workers as polling for job status happens on
the triggerer asynchronously. Note that this will need triggerer to be available on your Airflow deployment.