X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=ricsdl-package%2Fricsdl%2Fsyncstorage.py;h=b15365a168c93a7de203643ec25542b09905d24d;hb=77c5b120496bdcb798d0e02b719c177e8f48d4e9;hp=92bb88a6f93bdcb3aab48d86c3fb1c256d503751;hpb=dada8463c0fd4c3b90eedc54b6c913f0fa0e7272;p=ric-plt%2Fsdlpy.git diff --git a/ricsdl-package/ricsdl/syncstorage.py b/ricsdl-package/ricsdl/syncstorage.py old mode 100644 new mode 100755 index 92bb88a..b15365a --- a/ricsdl-package/ricsdl/syncstorage.py +++ b/ricsdl-package/ricsdl/syncstorage.py @@ -20,12 +20,13 @@ """The module provides implementation of the syncronous Shared Data Layer (SDL) interface.""" import builtins -from typing import (Dict, Set, List, Union) +import inspect +from typing import (Any, Callable, Dict, Set, List, Optional, Tuple, Union) from ricsdl.configuration import _Configuration from ricsdl.syncstorage_abc import (SyncStorageAbc, SyncLockAbc) import ricsdl.backend from ricsdl.backend.dbbackend_abc import DbBackendAbc -from ricsdl.exceptions import SdlTypeError +from ricsdl.exceptions import (SdlException, SdlTypeError) def func_arg_checker(exception, start_arg_idx, **types): @@ -69,7 +70,9 @@ class SyncLock(SyncLockAbc): storage: 'SyncStorage') -> None: super().__init__(ns, name, expiration) - self.__dbbackendlock = ricsdl.backend.get_backend_lock_instance(ns, name, expiration, + self.__configuration = storage.get_configuration() + self.__dbbackendlock = ricsdl.backend.get_backend_lock_instance(self.__configuration, + ns, name, expiration, storage.get_backend()) def __str__(self): @@ -107,11 +110,17 @@ class SyncStorage(SyncStorageAbc): a namespace, namespace identifier is passed as a parameter to all the operations. Args: - None + fake_db_backend (str): Optional parameter. Parameter enables fake DB backend usage for an + SDL instance. Fake DB backend is ONLY allowed to use for testing + purposes at development phase of SDL clients when more advanced + database services are not necessarily needed. Currently value 'dict' + is only allowed value for the parameter, which enables dictionary + type of fake DB backend. """ - def __init__(self) -> None: + def __init__(self, fake_db_backend=None) -> None: super().__init__() - self.__configuration = _Configuration() + self.__configuration = _Configuration(fake_db_backend) + self.event_separator = self.__configuration.get_event_separator() self.__dbbackend = ricsdl.backend.get_backend_instance(self.__configuration) def __del__(self): @@ -125,11 +134,18 @@ class SyncStorage(SyncStorageAbc): } ) + def is_active(self): + try: + return self.__dbbackend.is_connected() + except SdlException: + return False + def close(self): self.__dbbackend.close() @func_arg_checker(SdlTypeError, 1, ns=str, data_map=dict) def set(self, ns: str, data_map: Dict[str, bytes]) -> None: + self._validate_key_value_dict(data_map) self.__dbbackend.set(ns, data_map) @func_arg_checker(SdlTypeError, 1, ns=str, key=str, old_data=bytes, new_data=bytes) @@ -142,15 +158,17 @@ class SyncStorage(SyncStorageAbc): @func_arg_checker(SdlTypeError, 1, ns=str, keys=(str, builtins.set)) def get(self, ns: str, keys: Union[str, Set[str]]) -> Dict[str, bytes]: - return self.__dbbackend.get(ns, list(keys)) + disordered = self.__dbbackend.get(ns, list(keys)) + return {k: disordered[k] for k in sorted(disordered)} - @func_arg_checker(SdlTypeError, 1, ns=str, key_prefix=str) - def find_keys(self, ns: str, key_prefix: str) -> List[str]: - return self.__dbbackend.find_keys(ns, key_prefix) + @func_arg_checker(SdlTypeError, 1, ns=str, key_pattern=str) + def find_keys(self, ns: str, key_pattern: str) -> List[str]: + return self.__dbbackend.find_keys(ns, key_pattern) - @func_arg_checker(SdlTypeError, 1, ns=str, key_prefix=str, atomic=bool) - def find_and_get(self, ns: str, key_prefix: str, atomic: bool) -> Dict[str, bytes]: - return self.__dbbackend.find_and_get(ns, key_prefix, atomic) + @func_arg_checker(SdlTypeError, 1, ns=str, key_pattern=str) + def find_and_get(self, ns: str, key_pattern: str) -> Dict[str, bytes]: + disordered = self.__dbbackend.find_and_get(ns, key_pattern) + return {k: disordered[k] for k in sorted(disordered)} @func_arg_checker(SdlTypeError, 1, ns=str, keys=(str, builtins.set)) def remove(self, ns: str, keys: Union[str, Set[str]]) -> None: @@ -162,7 +180,7 @@ class SyncStorage(SyncStorageAbc): @func_arg_checker(SdlTypeError, 1, ns=str) def remove_all(self, ns: str) -> None: - keys = self.__dbbackend.find_keys(ns, '') + keys = self.__dbbackend.find_keys(ns, '*') if keys: self.__dbbackend.remove(ns, keys) @@ -190,6 +208,81 @@ class SyncStorage(SyncStorageAbc): def group_size(self, ns: str, group: str) -> int: return self.__dbbackend.group_size(ns, group) + @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, data_map=dict) + def set_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]], + data_map: Dict[str, bytes]) -> None: + self._validate_key_value_dict(data_map) + self._validate_channels_events(channels_and_events) + for channel, events in channels_and_events.items(): + channels_and_events[channel] = [events] if isinstance(events, str) else events + self.__dbbackend.set_and_publish(ns, channels_and_events, data_map) + + @func_arg_checker(SdlTypeError, + 1, + ns=str, + channels_and_events=dict, + key=str, + old_data=bytes, + new_data=bytes) + def set_if_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]], + key: str, old_data: bytes, new_data: bytes) -> bool: + self._validate_channels_events(channels_and_events) + for channel, events in channels_and_events.items(): + channels_and_events[channel] = [events] if isinstance(events, str) else events + return self.__dbbackend.set_if_and_publish(ns, channels_and_events, key, old_data, new_data) + + @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, key=str, data=bytes) + def set_if_not_exists_and_publish(self, ns: str, + channels_and_events: Dict[str, Union[str, List[str]]], + key: str, data: bytes) -> bool: + self._validate_channels_events(channels_and_events) + for channel, events in channels_and_events.items(): + channels_and_events[channel] = [events] if isinstance(events, str) else events + return self.__dbbackend.set_if_not_exists_and_publish(ns, channels_and_events, key, data) + + @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, keys=(str, builtins.set)) + def remove_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]], + keys: Union[str, Set[str]]) -> None: + self._validate_channels_events(channels_and_events) + for channel, events in channels_and_events.items(): + channels_and_events[channel] = [events] if isinstance(events, str) else events + keys = [keys] if isinstance(keys, str) else list(keys) + self.__dbbackend.remove_and_publish(ns, channels_and_events, keys) + + @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, key=str, data=bytes) + def remove_if_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]], + key: str, data: bytes) -> bool: + self._validate_channels_events(channels_and_events) + for channel, events in channels_and_events.items(): + channels_and_events[channel] = [events] if isinstance(events, str) else events + return self.__dbbackend.remove_if_and_publish(ns, channels_and_events, key, data) + + @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict) + def remove_all_and_publish(self, ns: str, + channels_and_events: Dict[str, Union[str, List[str]]]) -> None: + self._validate_channels_events(channels_and_events) + for channel, events in channels_and_events.items(): + channels_and_events[channel] = [events] if isinstance(events, str) else events + self.__dbbackend.remove_all_and_publish(ns, channels_and_events) + + @func_arg_checker(SdlTypeError, 1, ns=str, cb=Callable, channels=(str, builtins.set)) + def subscribe_channel(self, ns: str, cb: Callable[[str, List[str]], None], + channels: Union[str, Set[str]]) -> None: + self._validate_callback(cb) + channels = [channels] if isinstance(channels, str) else list(channels) + self.__dbbackend.subscribe_channel(ns, cb, channels) + + @func_arg_checker(SdlTypeError, 1, ns=str, channels=(str, builtins.set)) + def unsubscribe_channel(self, ns: str, channels: Union[str, Set[str]]) -> None: + channels = [channels] if isinstance(channels, str) else list(channels) + self.__dbbackend.unsubscribe_channel(ns, channels) + + def start_event_listener(self) -> None: + self.__dbbackend.start_event_listener() + + def handle_events(self) -> Optional[Tuple[str, List[str]]]: + return self.__dbbackend.handle_events() + @func_arg_checker(SdlTypeError, 1, ns=str, resource=str, expiration=(int, float)) def get_lock_resource(self, ns: str, resource: str, expiration: Union[int, float]) -> SyncLock: return SyncLock(ns, resource, expiration, self) @@ -197,3 +290,43 @@ class SyncStorage(SyncStorageAbc): def get_backend(self) -> DbBackendAbc: """Return backend instance.""" return self.__dbbackend + + def get_configuration(self) -> _Configuration: + """Return configuration what was valid when the SDL instance was initiated.""" + return self.__configuration + + @classmethod + def _validate_key_value_dict(cls, kv): + for k, v in kv.items(): + if not isinstance(k, str): + raise SdlTypeError(r"Wrong dict key type: {}={}. Must be: str".format(k, type(k))) + if not isinstance(v, bytes): + raise SdlTypeError(r"Wrong dict value type: {}={}. Must be: bytes".format(v, type(v))) + + def _validate_channels_events(self, channels_and_events: Dict[Any, Any]): + for channel, events in channels_and_events.items(): + if not isinstance(channel, str): + raise SdlTypeError(r"Wrong channel type: {}={}. Must be: str".format( + channel, type(channel))) + if not isinstance(events, (list, str)): + raise SdlTypeError(r"Wrong event type: {}={}. Must be: str".format( + events, type(events))) + if isinstance(events, list): + for event in events: + if not isinstance(event, str): + raise SdlTypeError(r"Wrong event type: {}={}. Must be: str".format( + events, type(events))) + if self.event_separator in event: + raise SdlTypeError(r"Events {} contains illegal substring (\"{}\")".format( + events, self.event_separator)) + else: + if self.event_separator in events: + raise SdlTypeError(r"Events {} contains illegal substring (\"{}\")".format( + events, self.event_separator)) + + @classmethod + def _validate_callback(cls, cb): + param_len = len(inspect.signature(cb).parameters) + if param_len != 2: + raise SdlTypeError( + f"Callback function should take 2 positional argument but {param_len} were given")