Source code for airflow.providers.google.leveldb.hooks.leveldb
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Hook for Level DB."""from__future__importannotationsfromairflow.exceptionsimportAirflowException,AirflowOptionalProviderFeatureExceptionfromairflow.hooks.baseimportBaseHooktry:importplyvelfromplyvelimportDBexceptImportErrorase:raiseAirflowOptionalProviderFeatureException(e)
[docs]DB_NOT_INITIALIZED_BEFORE="The `get_conn` method should be called before!"
[docs]classLevelDBHookException(AirflowException):"""Exception specific for LevelDB."""
[docs]classLevelDBHook(BaseHook):""" Plyvel Wrapper to Interact With LevelDB Database. `LevelDB Connection Documentation <https://plyvel.readthedocs.io/en/latest/>`__ """
[docs]defget_conn(self,name:str="/tmp/testdb/",create_if_missing:bool=False,**kwargs)->DB:""" Creates `Plyvel DB <https://plyvel.readthedocs.io/en/latest/api.html#DB>`__. :param name: path to create database e.g. `/tmp/testdb/`) :param create_if_missing: whether a new database should be created if needed :param kwargs: other options of creation plyvel.DB. See more in the link above. :returns: DB """ifself.dbisnotNone:returnself.dbself.db=plyvel.DB(name=name,create_if_missing=create_if_missing,**kwargs)returnself.db
[docs]defrun(self,command:str,key:bytes,value:bytes|None=None,keys:list[bytes]|None=None,values:list[bytes]|None=None,)->bytes|None:""" Execute operation with leveldb. :param command: command of plyvel(python wrap for leveldb) for DB object e.g. ``"put"``, ``"get"``, ``"delete"``, ``"write_batch"``. :param key: key for command(put,get,delete) execution(, e.g. ``b'key'``, ``b'another-key'``) :param value: value for command(put) execution(bytes, e.g. ``b'value'``, ``b'another-value'``) :param keys: keys for command(write_batch) execution(list[bytes], e.g. ``[b'key', b'another-key'])`` :param values: values for command(write_batch) execution e.g. ``[b'value'``, ``b'another-value']`` :returns: value from get or None """ifcommand=="put":ifnotvalue:raiseException("Please provide `value`!")returnself.put(key,value)elifcommand=="get":returnself.get(key)elifcommand=="delete":returnself.delete(key)elifcommand=="write_batch":ifnotkeys:raiseException("Please provide `keys`!")ifnotvalues:raiseException("Please provide `values`!")returnself.write_batch(keys,values)else:raiseLevelDBHookException("Unknown command for LevelDB hook")
[docs]defput(self,key:bytes,value:bytes):""" Put a single value into a leveldb db by key. :param key: key for put execution, e.g. ``b'key'``, ``b'another-key'`` :param value: value for put execution e.g. ``b'value'``, ``b'another-value'`` """ifnotself.db:raiseException(DB_NOT_INITIALIZED_BEFORE)self.db.put(key,value)
[docs]defget(self,key:bytes)->bytes:""" Get a single value into a leveldb db by key. :param key: key for get execution, e.g. ``b'key'``, ``b'another-key'`` :returns: value of key from db.get """ifnotself.db:raiseException(DB_NOT_INITIALIZED_BEFORE)returnself.db.get(key)
[docs]defdelete(self,key:bytes):""" Delete a single value in a leveldb db by key. :param key: key for delete execution, e.g. ``b'key'``, ``b'another-key'`` """ifnotself.db:raiseException(DB_NOT_INITIALIZED_BEFORE)self.db.delete(key)
[docs]defwrite_batch(self,keys:list[bytes],values:list[bytes]):""" Write batch of values in a leveldb db by keys. :param keys: keys for write_batch execution e.g. ``[b'key', b'another-key']`` :param values: values for write_batch execution e.g. ``[b'value', b'another-value']`` """ifnotself.db:raiseException(DB_NOT_INITIALIZED_BEFORE)withself.db.write_batch()asbatch:fori,keyinenumerate(keys):batch.put(key,values[i])