Source code for airflow.providers.amazon.aws.operators.dms
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsfromcollections.abcimportSequencefromdatetimeimportdatetimefromtypingimportTYPE_CHECKING,Any,ClassVarfromairflow.configurationimportconffromairflow.exceptionsimportAirflowExceptionfromairflow.providers.amazon.aws.hooks.dmsimportDmsHookfromairflow.providers.amazon.aws.operators.base_awsimportAwsBaseOperatorfromairflow.providers.amazon.aws.triggers.dmsimport(DmsReplicationCompleteTrigger,DmsReplicationConfigDeletedTrigger,DmsReplicationDeprovisionedTrigger,DmsReplicationStoppedTrigger,DmsReplicationTerminalStatusTrigger,)fromairflow.providers.amazon.aws.utils.mixinsimportaws_template_fieldsfromairflow.utils.contextimportContextifTYPE_CHECKING:fromairflow.utils.contextimportContext
[docs]classDmsCreateTaskOperator(AwsBaseOperator[DmsHook]):""" Creates AWS DMS replication task. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:DmsCreateTaskOperator` :param replication_task_id: Replication task id :param source_endpoint_arn: Source endpoint ARN :param target_endpoint_arn: Target endpoint ARN :param replication_instance_arn: Replication instance ARN :param table_mappings: Table mappings :param migration_type: Migration type ('full-load'|'cdc'|'full-load-and-cdc'), full-load by default. :param create_task_kwargs: Extra arguments for DMS replication task creation. :param aws_conn_id: The Airflow connection used for AWS credentials. If this is ``None`` or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node). :param region_name: AWS region_name. If not specified then the default boto3 behaviour is used. :param verify: Whether or not to verify SSL certificates. See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html :param botocore_config: Configuration dictionary (key-values) for botocore client. See: https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html """
[docs]defexecute(self,context:Context):""" Create AWS DMS replication task from Airflow. :return: replication task arn """task_arn=self.hook.create_replication_task(replication_task_id=self.replication_task_id,source_endpoint_arn=self.source_endpoint_arn,target_endpoint_arn=self.target_endpoint_arn,replication_instance_arn=self.replication_instance_arn,migration_type=self.migration_type,table_mappings=self.table_mappings,**self.create_task_kwargs,)self.log.info("DMS replication task(%s) is ready.",self.replication_task_id)returntask_arn
[docs]classDmsDeleteTaskOperator(AwsBaseOperator[DmsHook]):""" Deletes AWS DMS replication task. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:DmsDeleteTaskOperator` :param replication_task_arn: Replication task ARN :param aws_conn_id: The Airflow connection used for AWS credentials. If this is ``None`` or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node). :param region_name: AWS region_name. If not specified then the default boto3 behaviour is used. :param verify: Whether or not to verify SSL certificates. See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html :param botocore_config: Configuration dictionary (key-values) for botocore client. See: https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html """
[docs]defexecute(self,context:Context):""" Delete AWS DMS replication task from Airflow. :return: replication task arn """self.hook.delete_replication_task(replication_task_arn=self.replication_task_arn)self.log.info("DMS replication task(%s) has been deleted.",self.replication_task_arn)
[docs]classDmsDescribeTasksOperator(AwsBaseOperator[DmsHook]):""" Describes AWS DMS replication tasks. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:DmsDescribeTasksOperator` :param describe_tasks_kwargs: Describe tasks command arguments :param aws_conn_id: The Airflow connection used for AWS credentials. If this is ``None`` or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node). :param region_name: AWS region_name. If not specified then the default boto3 behaviour is used. :param verify: Whether or not to verify SSL certificates. See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html :param botocore_config: Configuration dictionary (key-values) for botocore client. See: https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html """
[docs]defexecute(self,context:Context)->tuple[str|None,list]:""" Describe AWS DMS replication tasks from Airflow. :return: Marker and list of replication tasks """returnself.hook.describe_replication_tasks(**self.describe_tasks_kwargs)
[docs]classDmsStartTaskOperator(AwsBaseOperator[DmsHook]):""" Starts AWS DMS replication task. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:DmsStartTaskOperator` :param replication_task_arn: Replication task ARN :param start_replication_task_type: Replication task start type (default='start-replication') ('start-replication'|'resume-processing'|'reload-target') :param start_task_kwargs: Extra start replication task arguments :param aws_conn_id: The Airflow connection used for AWS credentials. If this is ``None`` or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node). :param region_name: AWS region_name. If not specified then the default boto3 behaviour is used. :param verify: Whether or not to verify SSL certificates. See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html :param botocore_config: Configuration dictionary (key-values) for botocore client. See: https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html """
[docs]defexecute(self,context:Context):"""Start AWS DMS replication task from Airflow."""self.hook.start_replication_task(replication_task_arn=self.replication_task_arn,start_replication_task_type=self.start_replication_task_type,**self.start_task_kwargs,)self.log.info("DMS replication task(%s) is starting.",self.replication_task_arn)
[docs]classDmsStopTaskOperator(AwsBaseOperator[DmsHook]):""" Stops AWS DMS replication task. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:DmsStopTaskOperator` :param replication_task_arn: Replication task ARN :param aws_conn_id: The Airflow connection used for AWS credentials. If this is ``None`` or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node). :param region_name: AWS region_name. If not specified then the default boto3 behaviour is used. :param verify: Whether or not to verify SSL certificates. See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html :param botocore_config: Configuration dictionary (key-values) for botocore client. See: https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html """
[docs]defexecute(self,context:Context):"""Stop AWS DMS replication task from Airflow."""self.hook.stop_replication_task(replication_task_arn=self.replication_task_arn)self.log.info("DMS replication task(%s) is stopping.",self.replication_task_arn)
[docs]classDmsDescribeReplicationConfigsOperator(AwsBaseOperator[DmsHook]):""" Describes AWS DMS Serverless replication configurations. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:DmsDescribeReplicationConfigsOperator` :param describe_config_filter: Filters block for filtering results. :param aws_conn_id: The Airflow connection used for AWS credentials. If this is ``None`` or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be """
[docs]defexecute(self,context:Context)->list:""" Describe AWS DMS replication configurations. :return: List of replication configurations """returnself.hook.describe_replication_configs(filters=self.filter)
[docs]classDmsCreateReplicationConfigOperator(AwsBaseOperator[DmsHook]):""" Creates an AWS DMS Serverless replication configuration. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:DmsCreateReplicationConfigOperator` :param replication_config_id: Unique identifier used to create a ReplicationConfigArn. :param source_endpoint_arn: ARN of the source endpoint :param target_endpoint_arn: ARN of the target endpoint :param compute_config: Parameters for provisioning an DMS Serverless replication. :param replication_type: type of DMS Serverless replication :param table_mappings: JSON table mappings :param tags: Key-value tag pairs :param additional_config_kwargs: Additional configuration parameters for DMS Serverless replication. Passed directly to the API :param aws_conn_id: The Airflow connection used for AWS credentials. If this is ``None`` or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be """
[docs]defexecute(self,context:Context)->str:resp=self.hook.create_replication_config(replication_config_id=self.replication_config_id,source_endpoint_arn=self.source_endpoint_arn,target_endpoint_arn=self.target_endpoint_arn,compute_config=self.compute_config,replication_type=self.replication_type,table_mappings=self.table_mappings,additional_config_kwargs=self.additional_config_kwargs,)self.log.info("DMS replication config(%s) has been created.",self.replication_config_id)returnresp
[docs]classDmsDeleteReplicationConfigOperator(AwsBaseOperator[DmsHook]):""" Deletes an AWS DMS Serverless replication configuration. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:DmsDeleteReplicationConfigOperator` :param replication_config_arn: ARN of the replication config :param wait_for_completion: If True, waits for the replication config to be deleted before returning. If False, the operator will return immediately after the request is made. :param deferrable: Run the operator in deferrable mode. :param waiter_delay: The number of seconds to wait between retries (default: 60). :param waiter_max_attempts: The maximum number of attempts to be made (default: 60). :param aws_conn_id: The Airflow connection used for AWS credentials. If this is ``None`` or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be """
[docs]defexecute(self,context:Context)->None:results=self.hook.describe_replications(filters=[{"Name":"replication-config-arn","Values":[self.replication_config_arn]}])current_state=results[0].get("Status","")self.log.info("Current state of replication config(%s) is %s.",self.replication_config_arn,current_state)# replication must be deprovisioned before deletingprovision_status=self.hook.get_provision_status(replication_config_arn=self.replication_config_arn)ifself.deferrable:ifcurrent_state.lower()notinself.VALID_STATES:self.log.info("Deferring until terminal status reached.")self.defer(trigger=DmsReplicationTerminalStatusTrigger(replication_config_arn=self.replication_config_arn,waiter_delay=self.waiter_delay,waiter_max_attempts=self.waiter_max_attempts,aws_conn_id=self.aws_conn_id,),method_name="retry_execution",)ifprovision_statusnotinself.TERMINAL_PROVISION_STATES:# not deprovisioned:self.log.info("Deferring until deprovisioning completes.")self.defer(trigger=DmsReplicationDeprovisionedTrigger(replication_config_arn=self.replication_config_arn,waiter_delay=self.waiter_delay,waiter_max_attempts=self.waiter_max_attempts,aws_conn_id=self.aws_conn_id,),method_name="retry_execution",)self.hook.get_waiter("replication_terminal_status").wait(Filters=[{"Name":"replication-config-arn","Values":[self.replication_config_arn]}],WaiterConfig={"Delay":self.waiter_delay,"MaxAttempts":self.waiter_max_attempts},)self.hook.delete_replication_config(self.replication_config_arn)self.handle_delete_wait()
[docs]defhandle_delete_wait(self):ifself.wait_for_completion:ifself.deferrable:self.log.info("Deferring until replication config is deleted.")self.defer(trigger=DmsReplicationConfigDeletedTrigger(replication_config_arn=self.replication_config_arn,waiter_delay=self.waiter_delay,waiter_max_attempts=self.waiter_max_attempts,aws_conn_id=self.aws_conn_id,),method_name="execute_complete",)else:self.hook.get_waiter("replication_config_deleted").wait(Filters=[{"Name":"replication-config-arn","Values":[self.replication_config_arn]}],WaiterConfig={"Delay":self.waiter_delay,"MaxAttempts":self.waiter_max_attempts},)self.log.info("DMS replication config(%s) deleted.",self.replication_config_arn)
[docs]classDmsDescribeReplicationsOperator(AwsBaseOperator[DmsHook]):""" Describes AWS DMS Serverless replications. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:DmsDescribeReplicationsOperator` :param filter: Filters block for filtering results. :param aws_conn_id: The Airflow connection used for AWS credentials. If this is ``None`` or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be """
[docs]classDmsStartReplicationOperator(AwsBaseOperator[DmsHook]):""" Starts an AWS DMS Serverless replication. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:DmsStartReplicationOperator` :param replication_config_arn: ARN of the replication config :param replication_start_type: Type of replication. :param cdc_start_time: Start time of CDC :param cdc_start_pos: Indicates when to start CDC. :param cdc_stop_pos: Indicates when to stop CDC. :param aws_conn_id: The Airflow connection used for AWS credentials. If this is ``None`` or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be """
ifself.cdc_start_timeandself.cdc_start_pos:raiseAirflowException("Only one of cdc_start_time or cdc_start_pos should be provided.")
[docs]defexecute(self,context:Context):result=self.hook.describe_replications(filters=[{"Name":"replication-config-arn","Values":[self.replication_config_arn]}])current_status=result[0].get("Status","")provision_status=self.hook.get_provision_status(replication_config_arn=self.replication_config_arn)ifprovision_status=="deprovisioning":# wait for deprovisioning to complete before start/restartself.log.info("Replication is deprovisioning. Must wait for deprovisioning before running replication")ifself.deferrable:self.log.info("Deferring until deprovisioning completes.")self.defer(trigger=DmsReplicationDeprovisionedTrigger(replication_config_arn=self.replication_config_arn,waiter_delay=self.waiter_delay,waiter_max_attempts=self.waiter_max_attempts,aws_conn_id=self.aws_conn_id,),method_name="retry_execution",)else:self.hook.get_waiter("replication_deprovisioned").wait(Filters=[{"Name":"replication-config-arn","Values":[self.replication_config_arn]}],WaiterConfig={"Delay":self.waiter_delay,"MaxAttempts":self.waiter_max_attempts},)provision_status=self.hook.get_provision_status(replication_config_arn=self.replication_config_arn)self.log.info("Replication deprovisioning complete. Provision status: %s",provision_status)if(current_status.lower()inself.STARTABLE_STATESandprovision_statusinself.TERMINAL_PROVISION_STATES):resp=self.hook.start_replication(replication_config_arn=self.replication_config_arn,start_replication_type=self.replication_start_type,cdc_start_time=self.cdc_start_time,cdc_start_pos=self.cdc_start_pos,cdc_stop_pos=self.cdc_stop_pos,)current_status=resp.get("Replication",{}).get("Status","Unknown")self.log.info("Replication(%s) started with status %s.",self.replication_config_arn,current_status,)ifself.wait_for_completion:self.log.info("Waiting for %s replication to complete.",self.replication_config_arn)ifself.deferrable:self.log.info("Deferring until %s replication completes.",self.replication_config_arn)self.defer(trigger=DmsReplicationCompleteTrigger(replication_config_arn=self.replication_config_arn,waiter_delay=self.waiter_delay,waiter_max_attempts=self.waiter_max_attempts,aws_conn_id=self.aws_conn_id,),method_name="execute_complete",)self.hook.get_waiter("replication_complete").wait(Filters=[{"Name":"replication-config-arn","Values":[self.replication_config_arn]}],WaiterConfig={"Delay":self.waiter_delay,"MaxAttempts":self.waiter_max_attempts},)self.log.info("Replication(%s) has completed.",self.replication_config_arn)else:self.log.info("Replication(%s) is not in startable state.",self.replication_config_arn)self.log.info("Status: %s Provision status: %s",current_status,provision_status)
[docs]defexecute_complete(self,context,event=None):self.replication_config_arn=event.get("replication_config_arn")self.log.info("Replication(%s) has completed.",self.replication_config_arn)
[docs]classDmsStopReplicationOperator(AwsBaseOperator[DmsHook]):""" Stops an AWS DMS Serverless replication. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:DmsStopReplicationOperator` :param replication_config_arn: ARN of the replication config :param aws_conn_id: The Airflow connection used for AWS credentials. If this is ``None`` or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be """
[docs]defexecute(self,context:Context)->None:results=self.hook.describe_replications(filters=[{"Name":"replication-config-arn","Values":[self.replication_config_arn]}])current_state=results[0].get("Status","")self.log.info("Current state of replication config(%s) is %s.",self.replication_config_arn,current_state)ifcurrent_state.lower()inself.STOPPED_STATES:self.log.info("DMS replication config(%s) is already stopped.",self.replication_config_arn)else:resp=self.hook.stop_replication(self.replication_config_arn)status=resp.get("Replication",{}).get("Status","Unknown")self.log.info("Stopping DMS replication config(%s). Current status: %s",self.replication_config_arn,status)ifself.wait_for_completion:self.log.info("Waiting for %s replication to stop.",self.replication_config_arn)ifself.deferrable:self.log.info("Deferring until %s replication stops.",self.replication_config_arn)self.defer(trigger=DmsReplicationStoppedTrigger(replication_config_arn=self.replication_config_arn,waiter_delay=self.waiter_delay,waiter_max_attempts=self.waiter_max_attempts,aws_conn_id=self.aws_conn_id,),method_name="execute_complete",)self.hook.get_waiter("replication_stopped").wait(Filters=[{"Name":"replication-config-arn","Values":[self.replication_config_arn]}],WaiterConfig={"Delay":self.waiter_delay,"MaxAttempts":self.waiter_max_attempts},)
[docs]defexecute_complete(self,context,event=None):self.replication_config_arn=event.get("replication_config_arn")self.log.info("Replication(%s) has stopped.",self.replication_config_arn)