Amazon Relational Database Service (RDS)¶
Amazon Relational Database Service (Amazon RDS) is a web service that makes it easier to set up, operate, and scale a relational database in the cloud. It provides cost-efficient, resizable capacity for an industry-standard relational database and manages common database administration tasks.
Prerequisite Tasks¶
To use these operators, you must do a few things:
Create necessary resources using AWS Console or AWS CLI.
Install API libraries via pip.
pip install 'apache-airflow[amazon]'Detailed information is available Installation of Airflow®
Operators¶
Create a database snapshot¶
To create a snapshot of an Amazon RDS database instance or cluster you can use
RDSCreateDBSnapshotOperator
.
The source database instance must be in the available
or storage-optimization
state.
tests/system/amazon/aws/example_rds_snapshot.py
create_snapshot = RdsCreateDbSnapshotOperator(
task_id="create_snapshot",
db_type="instance",
db_identifier=rds_instance_name,
db_snapshot_identifier=rds_snapshot_name,
)
Copy a database snapshot¶
To copy a snapshot of an Amazon RDS database instance or cluster you can use
RDSCopyDBSnapshotOperator
.
The source database snapshot must be in the available
state.
tests/system/amazon/aws/example_rds_snapshot.py
copy_snapshot = RdsCopyDbSnapshotOperator(
task_id="copy_snapshot",
db_type="instance",
source_db_snapshot_identifier=rds_snapshot_name,
target_db_snapshot_identifier=rds_snapshot_copy_name,
)
Delete a database snapshot¶
To delete a snapshot of an Amazon RDS database instance or cluster you can use
RDSDeleteDBSnapshotOperator
.
The database snapshot must be in the available
state to be deleted.
tests/system/amazon/aws/example_rds_snapshot.py
delete_snapshot = RdsDeleteDbSnapshotOperator(
task_id="delete_snapshot",
db_type="instance",
db_snapshot_identifier=rds_snapshot_name,
)
Export an Amazon RDS snapshot to Amazon S3¶
To export an Amazon RDS snapshot to Amazon S3 you can use
RDSStartExportTaskOperator
.
The provided IAM role must have access to the S3 bucket.
tests/system/amazon/aws/example_rds_export.py
start_export = RdsStartExportTaskOperator(
task_id="start_export",
export_task_identifier=rds_export_task_id,
source_arn=snapshot_arn,
s3_bucket_name=bucket_name,
s3_prefix="rds-test",
iam_role_arn=test_context[ROLE_ARN_KEY],
kms_key_id=test_context[KMS_KEY_ID_KEY],
)
Cancel an Amazon RDS export task¶
To cancel an Amazon RDS export task to S3 you can use
RDSCancelExportTaskOperator
.
Any data that has already been written to the S3 bucket isn’t removed.
tests/system/amazon/aws/example_rds_export.py
cancel_export = RdsCancelExportTaskOperator(
task_id="cancel_export",
export_task_identifier=rds_export_task_id,
)
Subscribe to an Amazon RDS event notification¶
To create an Amazon RDS event subscription you can use
RDSCreateEventSubscriptionOperator
.
This action requires an Amazon SNS topic Amazon Resource Name (ARN).
Amazon RDS event notification is only available for not encrypted SNS topics.
If you specify an encrypted SNS topic, event notifications are not sent for the topic.
tests/system/amazon/aws/example_rds_event.py
create_subscription = RdsCreateEventSubscriptionOperator(
task_id="create_subscription",
subscription_name=rds_subscription_name,
sns_topic_arn=sns_topic,
source_type="db-instance",
source_ids=[rds_instance_name],
event_categories=["availability"],
)
Unsubscribe to an Amazon RDS event notification¶
To delete an Amazon RDS event subscription you can use
RDSDeleteEventSubscriptionOperator
.
tests/system/amazon/aws/example_rds_event.py
delete_subscription = RdsDeleteEventSubscriptionOperator(
task_id="delete_subscription",
subscription_name=rds_subscription_name,
)
Create a database instance¶
To create a AWS DB instance you can use
RdsCreateDbInstanceOperator
.
You can also run this operator in deferrable mode by setting deferrable
param to True
.
tests/system/amazon/aws/example_rds_instance.py
create_db_instance = RdsCreateDbInstanceOperator(
task_id="create_db_instance",
db_instance_identifier=rds_db_identifier,
db_instance_class="db.t4g.micro",
engine="postgres",
rds_kwargs={
"MasterUsername": RDS_USERNAME,
"MasterUserPassword": RDS_PASSWORD,
"AllocatedStorage": 20,
"PubliclyAccessible": False,
},
)
Delete a database instance¶
To delete a AWS DB instance you can use
RDSDeleteDbInstanceOperator
.
You can also run this operator in deferrable mode by setting deferrable
param to True
.
tests/system/amazon/aws/example_rds_instance.py
delete_db_instance = RdsDeleteDbInstanceOperator(
task_id="delete_db_instance",
db_instance_identifier=rds_db_identifier,
rds_kwargs={
"SkipFinalSnapshot": True,
},
)
Start a database instance or cluster¶
To start an Amazon RDS DB instance or cluster you can use
RdsStartDbOperator
.
tests/system/amazon/aws/example_rds_instance.py
start_db_instance = RdsStartDbOperator(
task_id="start_db_instance",
db_identifier=rds_db_identifier,
)
Stop a database instance or cluster¶
To stop an Amazon RDS DB instance or cluster you can use
RdsStopDbOperator
.
tests/system/amazon/aws/example_rds_instance.py
stop_db_instance = RdsStopDbOperator(
task_id="stop_db_instance",
db_identifier=rds_db_identifier,
)
Sensors¶
Wait on an Amazon RDS instance or cluster status¶
To wait for an Amazon RDS instance or cluster to reach a specific status you can use
RdsDbSensor
.
By default, the sensor waits for a database instance to reach the available
state.
tests/system/amazon/aws/example_rds_instance.py
await_db_instance = RdsDbSensor(
task_id="await_db_instance",
db_identifier=rds_db_identifier,
)
Wait on an Amazon RDS snapshot status¶
To wait for an Amazon RDS snapshot with specific statuses you can use
RdsSnapshotExistenceSensor
.
By default, the sensor waits for the existence of a snapshot with status available
.
tests/system/amazon/aws/example_rds_snapshot.py
snapshot_sensor = RdsSnapshotExistenceSensor(
task_id="snapshot_sensor",
db_type="instance",
db_snapshot_identifier=rds_snapshot_name,
target_statuses=["available"],
)
Wait on an Amazon RDS export task status¶
To wait a for an Amazon RDS snapshot export task with specific statuses you can use
RdsExportTaskExistenceSensor
.
By default, the sensor waits for the existence of a snapshot with status available
.
tests/system/amazon/aws/example_rds_export.py
export_sensor = RdsExportTaskExistenceSensor(
task_id="export_sensor",
export_task_identifier=rds_export_task_id,
target_statuses=["canceled"],
)