#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module allows connecting to a ArangoDB."""
from __future__ import annotations
from functools import cached_property
from typing import TYPE_CHECKING, Any
from arango import AQLQueryExecuteError, ArangoClient as ArangoDBClient
from arango.cursor import Cursor
from arango.exceptions import (
DocumentDeleteError,
DocumentInsertError,
DocumentReplaceError,
DocumentUpdateError,
)
from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
if TYPE_CHECKING:
from arango.database import StandardDatabase
from airflow.models import Connection
[docs]class ArangoDBHook(BaseHook):
"""
Interact with ArangoDB.
Performs a connection to ArangoDB and retrieves client.
:param arangodb_conn_id: Reference to :ref:`ArangoDB connection id <howto/connection:arangodb>`.
"""
[docs] conn_name_attr = "arangodb_conn_id"
[docs] default_conn_name = "arangodb_default"
def __init__(self, arangodb_conn_id: str = default_conn_name, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.arangodb_conn_id = arangodb_conn_id
@cached_property
[docs] def client(self) -> ArangoDBClient:
"""Initiates a new ArangoDB connection (cached)."""
return ArangoDBClient(hosts=self.hosts)
@cached_property
[docs] def db_conn(self) -> StandardDatabase:
"""Connect to an ArangoDB database and return the database API wrapper."""
return self.client.db(name=self.database, username=self.username, password=self.password)
@cached_property
def _conn(self) -> Connection:
return self.get_connection(self.arangodb_conn_id)
@property
[docs] def hosts(self) -> list[str]:
if not self._conn.host:
raise AirflowException(f"No ArangoDB Host(s) provided in connection: {self.arangodb_conn_id!r}.")
return self._conn.host.split(",")
@property
[docs] def database(self) -> str:
if not self._conn.schema:
raise AirflowException(f"No ArangoDB Database provided in connection: {self.arangodb_conn_id!r}.")
return self._conn.schema
@property
[docs] def username(self) -> str:
if not self._conn.login:
raise AirflowException(f"No ArangoDB Username provided in connection: {self.arangodb_conn_id!r}.")
return self._conn.login
@property
[docs] def password(self) -> str:
return self._conn.password or ""
[docs] def get_conn(self) -> ArangoDBClient:
"""Initiate a new ArangoDB connection (cached)."""
return self.client
[docs] def query(self, query, **kwargs) -> Cursor:
"""
Create an ArangoDB session and execute the AQL query in the session.
:param query: AQL query
"""
try:
if self.db_conn:
result = self.db_conn.aql.execute(query, **kwargs)
if not isinstance(result, Cursor):
raise AirflowException("Failed to execute AQLQuery, expected result to be of type Cursor")
return result
else:
raise AirflowException(
f"Failed to execute AQLQuery, error connecting to database: {self.database}"
)
except AQLQueryExecuteError as error:
raise AirflowException(f"Failed to execute AQLQuery, error: {error}")
[docs] def create_collection(self, name):
if not self.db_conn.has_collection(name):
self.log.info("Collection '%s' does not exist. Creating a new collection.", name)
self.db_conn.create_collection(name)
return True
else:
self.log.info("Collection already exists: %s", name)
return False
[docs] def delete_collection(self, name):
if self.db_conn.has_collection(name):
self.db_conn.delete_collection(name)
return True
else:
self.log.info("Collection does not exist: %s", name)
return False
[docs] def create_database(self, name):
if not self.db_conn.has_database(name):
self.db_conn.create_database(name)
return True
else:
self.log.info("Database already exists: %s", name)
return False
[docs] def create_graph(self, name):
if not self.db_conn.has_graph(name):
self.db_conn.create_graph(name)
return True
else:
self.log.info("Graph already exists: %s", name)
return False
[docs] def insert_documents(self, collection_name, documents):
if not self.db_conn.has_collection(collection_name):
self.create_collection(collection_name)
try:
collection = self.db_conn.collection(collection_name)
collection.insert_many(documents, silent=True)
except DocumentInsertError as e:
self.log.error("Failed to insert documents: %s", str(e))
raise
[docs] def update_documents(self, collection_name, documents):
if not self.db_conn.has_collection(collection_name):
raise AirflowException(f"Collection does not exist: {collection_name}")
try:
collection = self.db_conn.collection(collection_name)
collection.update_many(documents, silent=True)
except DocumentUpdateError as e:
self.log.error("Failed to update documents: %s", str(e))
raise
[docs] def replace_documents(self, collection_name, documents):
if not self.db_conn.has_collection(collection_name):
raise AirflowException(f"Collection does not exist: {collection_name}")
try:
collection = self.db_conn.collection(collection_name)
collection.replace_many(documents, silent=True)
except DocumentReplaceError as e:
self.log.error("Failed to replace documents: %s", str(e))
raise
[docs] def delete_documents(self, collection_name, documents):
if not self.db_conn.has_collection(collection_name):
raise AirflowException(f"Collection does not exist: {collection_name}")
try:
collection = self.db_conn.collection(collection_name)
collection.delete_many(documents, silent=True)
except DocumentDeleteError as e:
self.log.error("Failed to delete documents: %s", str(e))
raise
@classmethod
[docs] def get_ui_field_behaviour(cls) -> dict[str, Any]:
return {
"hidden_fields": ["port", "extra"],
"relabeling": {
"host": "ArangoDB Host URL or comma separated list of URLs (coordinators in a cluster)",
"schema": "ArangoDB Database",
"login": "ArangoDB Username",
"password": "ArangoDB Password",
},
"placeholders": {
"host": 'eg."http://127.0.0.1:8529" or "http://127.0.0.1:8529,http://127.0.0.1:8530"'
" (coordinators in a cluster)",
"schema": "_system",
"login": "root",
"password": "password",
},
}