# Copyright (c) 2019 AT&T Intellectual Property. # Copyright (c) 2018-2019 Nokia. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # This source code is part of the near-RT RIC (RAN Intelligent Controller) # platform project (RICP). # """The module provides Shared Data Layer (SDL) database backend interface.""" from typing import (Callable, Dict, Set, List, Optional, Tuple, Union) from abc import ABC, abstractmethod class DbBackendAbc(ABC): """An abstract Shared Data Layer (SDL) class providing database backend interface.""" @abstractmethod def is_connected(self): """Test database backend connection.""" pass @abstractmethod def close(self): """Close database backend connection.""" pass @abstractmethod def set(self, ns: str, data_map: Dict[str, bytes]) -> None: """Write key value data mapping to database under a namespace.""" pass @abstractmethod def set_if(self, ns: str, key: str, old_data: bytes, new_data: bytes) -> bool: """"Write key value to database under a namespace if the old value is expected one.""" pass @abstractmethod def set_if_not_exists(self, ns: str, key: str, data: bytes) -> bool: """"Write key value to database under a namespace if key doesn't exist.""" pass @abstractmethod def get(self, ns: str, keys: List[str]) -> Dict[str, bytes]: """"Return values of the keys under a namespace.""" pass @abstractmethod def find_keys(self, ns: str, key_pattern: str) -> List[str]: """"Return all the keys matching search pattern under a namespace in database.""" pass @abstractmethod def find_and_get(self, ns: str, key_pattern: str) -> Dict[str, bytes]: """ Return all the keys with their values matching search pattern under a namespace in database. """ pass @abstractmethod def remove(self, ns: str, keys: List[str]) -> None: """Remove keys and their data from database.""" pass @abstractmethod def remove_if(self, ns: str, key: str, data: bytes) -> bool: """ Remove key and its data from database if if the current data value is expected one. """ pass @abstractmethod def add_member(self, ns: str, group: str, members: Set[bytes]) -> None: """Add new members to a group under a namespace in database.""" pass @abstractmethod def remove_member(self, ns: str, group: str, members: Set[bytes]) -> None: """Remove members from a group under a namespace in database.""" pass @abstractmethod def remove_group(self, ns: str, group: str) -> None: """Remove a group under a namespace in database along with it's members.""" pass @abstractmethod def get_members(self, ns: str, group: str) -> Set[bytes]: """Get all the members of a group under a namespace in database.""" pass @abstractmethod def is_member(self, ns: str, group: str, member: bytes) -> bool: """Validate if a given member is in the group under a namespace in database.""" pass @abstractmethod def group_size(self, ns: str, group: str) -> int: """Return the number of members in a group under a namespace in database.""" pass @abstractmethod def set_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]], data_map: Dict[str, bytes]) -> None: """Publish event to channel after writing data.""" pass @abstractmethod def set_if_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]], key: str, old_data: bytes, new_data: bytes) -> bool: """ Publish event to channel after writing key value to database under a namespace if the old value is expected one. """ pass @abstractmethod def set_if_not_exists_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]], key: str, data: bytes) -> bool: """" Publish event to channel after writing key value to database under a namespace if key doesn't exist. """ pass @abstractmethod def remove_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]], keys: List[str]) -> None: """Publish event to channel after removing data.""" pass @abstractmethod def remove_if_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]], key: str, data: bytes) -> bool: """ Publish event to channel after removing key and its data from database if the current data value is expected one. """ pass @abstractmethod def remove_all_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]]) -> None: """ Publish event to channel after removing all keys in namespace. """ pass @abstractmethod def subscribe_channel(self, ns: str, cb: Callable[[str, List[str]], None], channels: List[str]) -> None: """ This takes a callback function and one or many channels to be subscribed. When an event is received for the given channel, the given callback function shall be called with channel and notification(s) as parameter. """ pass @abstractmethod def unsubscribe_channel(self, ns: str, channels: List[str]) -> None: """Unsubscribes from channel and removes set callback function.""" pass @abstractmethod def start_event_listener(self) -> None: """ start_event_listener creates an event loop in a separate thread for handling notifications from subscriptions. """ pass @abstractmethod def handle_events(self) -> Optional[Tuple[str, List[str]]]: """ handle_events is a non-blocking function that returns a tuple containing channel name and message(s) received from notification. """ pass class DbBackendLockAbc(ABC): """ An abstract Shared Data Layer (SDL) class providing database backend lock interface. Args: ns (str): Namespace under which this lock is targeted. name (str): Lock name, identifies the lock key in a database backend. """ def __init__(self, ns: str, name: str) -> None: self._ns = ns self._lock_name = name super().__init__() @abstractmethod def acquire(self, retry_interval: Union[int, float] = 0.1, retry_timeout: Union[int, float] = 10) -> bool: """Acquire a database lock.""" pass @abstractmethod def release(self) -> None: """Release a database lock.""" pass @abstractmethod def refresh(self) -> None: """Refresh the remaining validity time of the database lock back to a initial value.""" pass @abstractmethod def get_validity_time(self) -> Union[int, float]: """Return remaining validity time of the lock in seconds.""" pass