Amazon Redshift (Cluster)¶
Amazon Redshift manages all the work of setting up, operating, and scaling a data warehouse: provisioning capacity, monitoring and backing up the cluster, and applying patches and upgrades to the Amazon Redshift engine. You can focus on using your data to acquire new insights for your business and customers.
Prerequisite Tasks¶
To use these operators, you must do a few things:
Create necessary resources using AWS Console or AWS CLI.
Install API libraries via pip.
pip install 'apache-airflow[amazon]'Detailed information is available Installation of Airflow®
Operators¶
Create an Amazon Redshift cluster¶
To create an Amazon Redshift Cluster with the specified parameters you can use
RedshiftCreateClusterOperator
.
tests/system/amazon/aws/example_redshift.py
create_cluster = RedshiftCreateClusterOperator(
task_id="create_cluster",
cluster_identifier=redshift_cluster_identifier,
vpc_security_group_ids=[security_group_id],
cluster_subnet_group_name=cluster_subnet_group_name,
publicly_accessible=False,
cluster_type="single-node",
node_type="dc2.large",
master_username=DB_LOGIN,
master_user_password=DB_PASS,
)
Resume an Amazon Redshift cluster¶
To resume a ‘paused’ Amazon Redshift cluster you can use
RedshiftResumeClusterOperator
You can also run this operator in deferrable mode by setting deferrable
param to True
.
This will ensure that the task is deferred from the Airflow worker slot and polling for the task status happens on the trigger.
tests/system/amazon/aws/example_redshift.py
resume_cluster = RedshiftResumeClusterOperator(
task_id="resume_cluster",
cluster_identifier=redshift_cluster_identifier,
)
Pause an Amazon Redshift cluster¶
To pause an available
Amazon Redshift cluster you can use
RedshiftPauseClusterOperator
.
You can also run this operator in deferrable mode by setting deferrable
param to True
tests/system/amazon/aws/example_redshift.py
pause_cluster = RedshiftPauseClusterOperator(
task_id="pause_cluster",
cluster_identifier=redshift_cluster_identifier,
)
Create an Amazon Redshift cluster snapshot¶
To create Amazon Redshift cluster snapshot you can use
RedshiftCreateClusterSnapshotOperator
tests/system/amazon/aws/example_redshift.py
create_cluster_snapshot = RedshiftCreateClusterSnapshotOperator(
task_id="create_cluster_snapshot",
cluster_identifier=redshift_cluster_identifier,
snapshot_identifier=redshift_cluster_snapshot_identifier,
poll_interval=30,
max_attempt=100,
retention_period=1,
wait_for_completion=True,
)
Delete an Amazon Redshift cluster snapshot¶
To delete Amazon Redshift cluster snapshot you can use
RedshiftDeleteClusterSnapshotOperator
tests/system/amazon/aws/example_redshift.py
delete_cluster_snapshot = RedshiftDeleteClusterSnapshotOperator(
task_id="delete_cluster_snapshot",
cluster_identifier=redshift_cluster_identifier,
snapshot_identifier=redshift_cluster_snapshot_identifier,
)
Delete an Amazon Redshift cluster¶
To delete an Amazon Redshift cluster you can use
RedshiftDeleteClusterOperator
.
You can also run this operator in deferrable mode by setting deferrable
param to True
tests/system/amazon/aws/example_redshift.py
delete_cluster = RedshiftDeleteClusterOperator(
task_id="delete_cluster",
cluster_identifier=redshift_cluster_identifier,
)
Sensors¶
Wait on an Amazon Redshift cluster state¶
To check the state of an Amazon Redshift Cluster until it reaches the target state or another terminal
state you can use RedshiftClusterSensor
.
tests/system/amazon/aws/example_redshift.py
wait_cluster_available = RedshiftClusterSensor(
task_id="wait_cluster_available",
cluster_identifier=redshift_cluster_identifier,
target_status="available",
poke_interval=15,
timeout=60 * 30,
)