Amazon Relational Database Service (RDS)¶
Amazon Relational Database Service (Amazon RDS) is a web service that makes it easier to set up, operate, and scale a relational database in the cloud. It provides cost-efficient, resizable capacity for an industry-standard relational database and manages common database administration tasks.
Prerequisite Tasks¶
To use these operators, you must do a few things:
Create necessary resources using AWS Console or AWS CLI.
Install API libraries via pip.
pip install 'apache-airflow[amazon]'Detailed information is available Installation of Airflow®
Generic Parameters¶
- aws_conn_id
Reference to Amazon Web Services Connection ID. If this parameter is set to
None
then the default boto3 behaviour is used without a connection lookup. Otherwise use the credentials stored in the Connection. Default:aws_default
- region_name
AWS Region Name. If this parameter is set to
None
or omitted then region_name from AWS Connection Extra Parameter will be used. Otherwise use the specified value instead of the connection value. Default:None
- verify
Whether or not to verify SSL certificates.
False
- Do not validate SSL certificates.path/to/cert/bundle.pem - A filename of the CA cert bundle to use. You can specify this argument if you want to use a different CA cert bundle than the one used by botocore.
If this parameter is set to
None
or is omitted then verify from AWS Connection Extra Parameter will be used. Otherwise use the specified value instead of the connection value. Default:None
- botocore_config
The provided dictionary is used to construct a botocore.config.Config. This configuration can be used to configure Avoid Throttling exceptions, timeouts, etc.
Example, for more detail about parameters please have a look botocore.config.Config¶{ "signature_version": "unsigned", "s3": { "us_east_1_regional_endpoint": True, }, "retries": { "mode": "standard", "max_attempts": 10, }, "connect_timeout": 300, "read_timeout": 300, "tcp_keepalive": True, }
If this parameter is set to
None
or omitted then config_kwargs from AWS Connection Extra Parameter will be used. Otherwise use the specified value instead of the connection value. Default:None
Note
Specifying an empty dictionary,
{}
, will overwrite the connection configuration for botocore.config.Config
Operators¶
Create a database snapshot¶
To create a snapshot of an Amazon RDS database instance or cluster you can use
RDSCreateDBSnapshotOperator
.
The source database instance must be in the available
or storage-optimization
state.
tests/system/amazon/aws/example_rds_snapshot.py
create_snapshot = RdsCreateDbSnapshotOperator(
task_id="create_snapshot",
db_type="instance",
db_identifier=rds_instance_name,
db_snapshot_identifier=rds_snapshot_name,
)
Copy a database snapshot¶
To copy a snapshot of an Amazon RDS database instance or cluster you can use
RDSCopyDBSnapshotOperator
.
The source database snapshot must be in the available
state.
tests/system/amazon/aws/example_rds_snapshot.py
copy_snapshot = RdsCopyDbSnapshotOperator(
task_id="copy_snapshot",
db_type="instance",
source_db_snapshot_identifier=rds_snapshot_name,
target_db_snapshot_identifier=rds_snapshot_copy_name,
)
Delete a database snapshot¶
To delete a snapshot of an Amazon RDS database instance or cluster you can use
RDSDeleteDBSnapshotOperator
.
The database snapshot must be in the available
state to be deleted.
tests/system/amazon/aws/example_rds_snapshot.py
delete_snapshot = RdsDeleteDbSnapshotOperator(
task_id="delete_snapshot",
db_type="instance",
db_snapshot_identifier=rds_snapshot_name,
)
Export an Amazon RDS snapshot to Amazon S3¶
To export an Amazon RDS snapshot to Amazon S3 you can use
RDSStartExportTaskOperator
.
The provided IAM role must have access to the S3 bucket.
tests/system/amazon/aws/example_rds_export.py
start_export = RdsStartExportTaskOperator(
task_id="start_export",
export_task_identifier=rds_export_task_id,
source_arn=snapshot_arn,
s3_bucket_name=bucket_name,
s3_prefix="rds-test",
iam_role_arn=test_context[ROLE_ARN_KEY],
kms_key_id=test_context[KMS_KEY_ID_KEY],
)
Cancel an Amazon RDS export task¶
To cancel an Amazon RDS export task to S3 you can use
RDSCancelExportTaskOperator
.
Any data that has already been written to the S3 bucket isn’t removed.
tests/system/amazon/aws/example_rds_export.py
cancel_export = RdsCancelExportTaskOperator(
task_id="cancel_export",
export_task_identifier=rds_export_task_id,
)
Subscribe to an Amazon RDS event notification¶
To create an Amazon RDS event subscription you can use
RDSCreateEventSubscriptionOperator
.
This action requires an Amazon SNS topic Amazon Resource Name (ARN).
Amazon RDS event notification is only available for not encrypted SNS topics.
If you specify an encrypted SNS topic, event notifications are not sent for the topic.
tests/system/amazon/aws/example_rds_event.py
create_subscription = RdsCreateEventSubscriptionOperator(
task_id="create_subscription",
subscription_name=rds_subscription_name,
sns_topic_arn=sns_topic,
source_type="db-instance",
source_ids=[rds_instance_name],
event_categories=["availability"],
)
Unsubscribe to an Amazon RDS event notification¶
To delete an Amazon RDS event subscription you can use
RDSDeleteEventSubscriptionOperator
.
tests/system/amazon/aws/example_rds_event.py
delete_subscription = RdsDeleteEventSubscriptionOperator(
task_id="delete_subscription",
subscription_name=rds_subscription_name,
)
Create a database instance¶
To create a AWS DB instance you can use
RdsCreateDbInstanceOperator
.
You can also run this operator in deferrable mode by setting deferrable
param to True
.
tests/system/amazon/aws/example_rds_instance.py
create_db_instance = RdsCreateDbInstanceOperator(
task_id="create_db_instance",
db_instance_identifier=rds_db_identifier,
db_instance_class="db.t4g.micro",
engine="postgres",
rds_kwargs={
"MasterUsername": RDS_USERNAME,
"MasterUserPassword": RDS_PASSWORD,
"AllocatedStorage": 20,
"PubliclyAccessible": False,
},
)
Delete a database instance¶
To delete a AWS DB instance you can use
RDSDeleteDbInstanceOperator
.
You can also run this operator in deferrable mode by setting deferrable
param to True
.
tests/system/amazon/aws/example_rds_instance.py
delete_db_instance = RdsDeleteDbInstanceOperator(
task_id="delete_db_instance",
db_instance_identifier=rds_db_identifier,
rds_kwargs={
"SkipFinalSnapshot": True,
},
)
Start a database instance or cluster¶
To start an Amazon RDS DB instance or cluster you can use
RdsStartDbOperator
.
tests/system/amazon/aws/example_rds_instance.py
start_db_instance = RdsStartDbOperator(
task_id="start_db_instance",
db_identifier=rds_db_identifier,
)
Stop a database instance or cluster¶
To stop an Amazon RDS DB instance or cluster you can use
RdsStopDbOperator
.
tests/system/amazon/aws/example_rds_instance.py
stop_db_instance = RdsStopDbOperator(
task_id="stop_db_instance",
db_identifier=rds_db_identifier,
)
Sensors¶
Wait on an Amazon RDS instance or cluster status¶
To wait for an Amazon RDS instance or cluster to reach a specific status you can use
RdsDbSensor
.
By default, the sensor waits for a database instance to reach the available
state.
tests/system/amazon/aws/example_rds_instance.py
await_db_instance = RdsDbSensor(
task_id="await_db_instance",
db_identifier=rds_db_identifier,
)
Wait on an Amazon RDS snapshot status¶
To wait for an Amazon RDS snapshot with specific statuses you can use
RdsSnapshotExistenceSensor
.
By default, the sensor waits for the existence of a snapshot with status available
.
tests/system/amazon/aws/example_rds_snapshot.py
snapshot_sensor = RdsSnapshotExistenceSensor(
task_id="snapshot_sensor",
db_type="instance",
db_snapshot_identifier=rds_snapshot_name,
target_statuses=["available"],
)
Wait on an Amazon RDS export task status¶
To wait a for an Amazon RDS snapshot export task with specific statuses you can use
RdsExportTaskExistenceSensor
.
By default, the sensor waits for the existence of a snapshot with status available
.
tests/system/amazon/aws/example_rds_export.py
export_sensor = RdsExportTaskExistenceSensor(
task_id="export_sensor",
export_task_identifier=rds_export_task_id,
target_statuses=["canceled"],
)