Source code for airflow.providers.amazon.aws.hooks.neptune
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
[docs]class NeptuneHook(AwsBaseHook):
"""
Interact with Amazon Neptune.
Additional arguments (such as ``aws_conn_id``) may be specified and
are passed down to the underlying AwsBaseHook.
.. seealso::
- :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
"""
[docs] AVAILABLE_STATES = ["available"]
[docs] STOPPED_STATES = ["stopped"]
[docs] ERROR_STATES = [
"cloning-failed",
"inaccessible-encryption-credentials",
"inaccessible-encryption-credentials-recoverable",
"migration-failed",
]
def __init__(self, *args, **kwargs):
kwargs["client_type"] = "neptune"
super().__init__(*args, **kwargs)
[docs] def wait_for_cluster_availability(self, cluster_id: str, delay: int = 30, max_attempts: int = 60) -> str:
"""
Wait for Neptune cluster to start.
:param cluster_id: The ID of the cluster to wait for.
:param delay: Time in seconds to delay between polls.
:param max_attempts: Maximum number of attempts to poll for completion.
:return: The status of the cluster.
"""
self.get_waiter("cluster_available").wait(
DBClusterIdentifier=cluster_id, WaiterConfig={"Delay": delay, "MaxAttempts": max_attempts}
)
status = self.get_cluster_status(cluster_id)
self.log.info("Finished waiting for cluster %s. Status is now %s", cluster_id, status)
return status
[docs] def wait_for_cluster_stopped(self, cluster_id: str, delay: int = 30, max_attempts: int = 60) -> str:
"""
Wait for Neptune cluster to stop.
:param cluster_id: The ID of the cluster to wait for.
:param delay: Time in seconds to delay between polls.
:param max_attempts: Maximum number of attempts to poll for completion.
:return: The status of the cluster.
"""
self.get_waiter("cluster_stopped").wait(
DBClusterIdentifier=cluster_id, WaiterConfig={"Delay": delay, "MaxAttempts": max_attempts}
)
status = self.get_cluster_status(cluster_id)
self.log.info("Finished waiting for cluster %s. Status is now %s", cluster_id, status)
return status
[docs] def get_cluster_status(self, cluster_id: str) -> str:
"""
Get the status of a Neptune cluster.
:param cluster_id: The ID of the cluster to get the status of.
:return: The status of the cluster.
"""
return self.conn.describe_db_clusters(DBClusterIdentifier=cluster_id)["DBClusters"][0]["Status"]
[docs] def get_db_instance_status(self, instance_id: str) -> str:
"""
Get the status of a Neptune instance.
:param instance_id: The ID of the instance to get the status of.
:return: The status of the instance.
"""
return self.conn.describe_db_instances(DBInstanceIdentifier=instance_id)["DBInstances"][0][
"DBInstanceStatus"
]
[docs] def wait_for_cluster_instance_availability(
self, cluster_id: str, delay: int = 30, max_attempts: int = 60
) -> None:
"""
Wait for Neptune instances in a cluster to be available.
:param cluster_id: The cluster ID of the instances to wait for.
:param delay: Time in seconds to delay between polls.
:param max_attempts: Maximum number of attempts to poll for completion.
:return: The status of the instances.
"""
filters = [{"Name": "db-cluster-id", "Values": [cluster_id]}]
self.log.info("Waiting for instances in cluster %s.", cluster_id)
self.get_waiter("db_instance_available").wait(
Filters=filters, WaiterConfig={"Delay": delay, "MaxAttempts": max_attempts}
)
self.log.info("Finished waiting for instances in cluster %s.", cluster_id)