# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from datetime import datetime

from airflow.models.baseoperator import chain
from airflow.models.dag import DAG
from airflow.operators.python import BranchPythonOperator
from import (
from import S3KeySensor, S3KeysUnchangedSensor
from airflow.utils.trigger_rule import TriggerRule
from import ENV_ID_KEY, SystemTestContextBuilder

[docs]DAG_ID = "example_s3"
[docs]sys_test_context_task = SystemTestContextBuilder().build()
[docs]DATA = """ apple,0.5 milk,2.5 bread,4.0 """
# Empty string prefix refers to the bucket root # See what prefix is here
[docs]PREFIX = ""
[docs]DELIMITER = "/"
[docs]TAG_KEY = "test-s3-bucket-tagging-key"
[docs]TAG_VALUE = "test-s3-bucket-tagging-value"
with DAG( dag_id=DAG_ID, schedule="@once", start_date=datetime(2021, 1, 1), catchup=False, tags=["example"], ) as dag:
[docs] test_context = sys_test_context_task()
env_id = test_context[ENV_ID_KEY] bucket_name = f"{env_id}-s3-bucket" bucket_name_2 = f"{env_id}-s3-bucket-2" key = f"{env_id}-key" key_2 = f"{env_id}-key2" # [START howto_sensor_s3_key_function_definition] def check_fn(files: list) -> bool: """ Example of custom check: check if all files are bigger than ``20 bytes`` :param files: List of S3 object attributes. :return: true if the criteria is met """ return all(f.get("Size", 0) > 20 for f in files) # [END howto_sensor_s3_key_function_definition] # [START howto_operator_s3_create_bucket] create_bucket = S3CreateBucketOperator( task_id="create_bucket", bucket_name=bucket_name, ) # [END howto_operator_s3_create_bucket] create_bucket_2 = S3CreateBucketOperator( task_id="create_bucket_2", bucket_name=bucket_name_2, ) # [START howto_operator_s3_put_bucket_tagging] put_tagging = S3PutBucketTaggingOperator( task_id="put_tagging", bucket_name=bucket_name, key=TAG_KEY, value=TAG_VALUE, ) # [END howto_operator_s3_put_bucket_tagging] # [START howto_operator_s3_get_bucket_tagging] get_tagging = S3GetBucketTaggingOperator( task_id="get_tagging", bucket_name=bucket_name, ) # [END howto_operator_s3_get_bucket_tagging] # [START howto_operator_s3_delete_bucket_tagging] delete_tagging = S3DeleteBucketTaggingOperator( task_id="delete_tagging", bucket_name=bucket_name, ) # [END howto_operator_s3_delete_bucket_tagging] # [START howto_operator_s3_create_object] create_object = S3CreateObjectOperator( task_id="create_object", s3_bucket=bucket_name, s3_key=key, data=DATA, replace=True, ) # [END howto_operator_s3_create_object] create_object_2 = S3CreateObjectOperator( task_id="create_object_2", s3_bucket=bucket_name, s3_key=key_2, data=DATA, replace=True, ) # [START howto_operator_s3_list_prefixes] list_prefixes = S3ListPrefixesOperator( task_id="list_prefixes", bucket=bucket_name, prefix=PREFIX, delimiter=DELIMITER, ) # [END howto_operator_s3_list_prefixes] # [START howto_operator_s3_list] list_keys = S3ListOperator( task_id="list_keys", bucket=bucket_name, prefix=PREFIX, ) # [END howto_operator_s3_list] # [START howto_sensor_s3_key_single_key] # Check if a file exists sensor_one_key = S3KeySensor( task_id="sensor_one_key", bucket_name=bucket_name, bucket_key=key, ) # [END howto_sensor_s3_key_single_key] # [START howto_sensor_s3_key_multiple_keys] # Check if both files exist sensor_two_keys = S3KeySensor( task_id="sensor_two_keys", bucket_name=bucket_name, bucket_key=[key, key_2], ) # [END howto_sensor_s3_key_multiple_keys] # [START howto_sensor_s3_key_single_key_deferrable] # Check if a file exists sensor_one_key_deferrable = S3KeySensor( task_id="sensor_one_key_deferrable", bucket_name=bucket_name, bucket_key=key, deferrable=True, ) # [END howto_sensor_s3_key_single_key_deferrable] # [START howto_sensor_s3_key_multiple_keys_deferrable] # Check if both files exist sensor_two_keys_deferrable = S3KeySensor( task_id="sensor_two_keys_deferrable", bucket_name=bucket_name, bucket_key=[key, key_2], deferrable=True, ) # [END howto_sensor_s3_key_multiple_keys_deferrable] # [START howto_sensor_s3_key_function] # Check if a file exists and match a certain pattern defined in check_fn sensor_key_with_function_deferrable = S3KeySensor( task_id="sensor_key_with_function_deferrable", bucket_name=bucket_name, bucket_key=key, check_fn=check_fn, deferrable=True, ) # [END howto_sensor_s3_key_function] # [START howto_sensor_s3_key_function] # Check if a file exists and match a certain pattern defined in check_fn sensor_key_with_function = S3KeySensor( task_id="sensor_key_with_function", bucket_name=bucket_name, bucket_key=key, check_fn=check_fn, ) # [END howto_sensor_s3_key_function] # [START howto_operator_s3_copy_object] copy_object = S3CopyObjectOperator( task_id="copy_object", source_bucket_name=bucket_name, dest_bucket_name=bucket_name_2, source_bucket_key=key, dest_bucket_key=key_2, ) # [END howto_operator_s3_copy_object] # [START howto_operator_s3_file_transform] file_transform = S3FileTransformOperator( task_id="file_transform", source_s3_key=f"s3://{bucket_name}/{key}", dest_s3_key=f"s3://{bucket_name_2}/{key_2}", # Use `cp` command as transform script as an example transform_script="cp", replace=True, ) # [END howto_operator_s3_file_transform] # This task skips the `sensor_keys_unchanged` task because the S3KeysUnchangedSensor # runs in poke mode only, which is not supported by the DebugExecutor, causing system tests to fail. branching = BranchPythonOperator( task_id="branch_to_delete_objects", python_callable=lambda: "delete_objects" ) # [START howto_sensor_s3_keys_unchanged] sensor_keys_unchanged = S3KeysUnchangedSensor( task_id="sensor_keys_unchanged", bucket_name=bucket_name_2, prefix=PREFIX, inactivity_period=10, # inactivity_period in seconds ) # [END howto_sensor_s3_keys_unchanged] # [START howto_operator_s3_delete_objects] delete_objects = S3DeleteObjectsOperator( task_id="delete_objects", bucket=bucket_name_2, keys=key_2, ) # [END howto_operator_s3_delete_objects] delete_objects.trigger_rule = TriggerRule.ALL_DONE # [START howto_operator_s3_delete_bucket] delete_bucket = S3DeleteBucketOperator( task_id="delete_bucket", bucket_name=bucket_name, force_delete=True, ) # [END howto_operator_s3_delete_bucket] delete_bucket.trigger_rule = TriggerRule.ALL_DONE delete_bucket_2 = S3DeleteBucketOperator( task_id="delete_bucket_2", bucket_name=bucket_name_2, force_delete=True, ) delete_bucket_2.trigger_rule = TriggerRule.ALL_DONE chain( # TEST SETUP test_context, # TEST BODY create_bucket, create_bucket_2, put_tagging, get_tagging, delete_tagging, create_object, create_object_2, list_prefixes, list_keys, [sensor_one_key, sensor_two_keys, sensor_key_with_function], [sensor_one_key_deferrable, sensor_two_keys_deferrable, sensor_key_with_function_deferrable], copy_object, file_transform, branching, sensor_keys_unchanged, # TEST TEARDOWN delete_objects, delete_bucket, delete_bucket_2, ) from tests.system.utils.watcher import watcher # This test needs watcher in order to properly mark success/failure # when "tearDown" task with trigger rule is part of the DAG list(dag.tasks) >> watcher() from tests.system.utils import get_test_run # noqa: E402 # Needed to run the example DAG with pytest (see: tests/system/
[docs]test_run = get_test_run(dag)

