1 # Copyright (c) 2019 AT&T Intellectual Property.
2 # Copyright (c) 2018-2019 Nokia.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 # This source code is part of the near-RT RIC (RAN Intelligent Controller)
18 # platform project (RICP).
22 """The module provides Shared Data Layer (SDL) database backend interface."""
24 from typing import (Callable, Dict, Set, List, Optional, Tuple, Union)
25 from abc import ABC, abstractmethod
28 class DbBackendAbc(ABC):
29 """An abstract Shared Data Layer (SDL) class providing database backend interface."""
32 def is_connected(self):
33 """Test database backend connection."""
38 """Close database backend connection."""
42 def set(self, ns: str, data_map: Dict[str, bytes]) -> None:
43 """Write key value data mapping to database under a namespace."""
47 def set_if(self, ns: str, key: str, old_data: bytes, new_data: bytes) -> bool:
48 """"Write key value to database under a namespace if the old value is expected one."""
52 def set_if_not_exists(self, ns: str, key: str, data: bytes) -> bool:
53 """"Write key value to database under a namespace if key doesn't exist."""
57 def get(self, ns: str, keys: List[str]) -> Dict[str, bytes]:
58 """"Return values of the keys under a namespace."""
62 def find_keys(self, ns: str, key_pattern: str) -> List[str]:
63 """"Return all the keys matching search pattern under a namespace in database."""
67 def find_and_get(self, ns: str, key_pattern: str) -> Dict[str, bytes]:
69 Return all the keys with their values matching search pattern under a namespace in
75 def remove(self, ns: str, keys: List[str]) -> None:
76 """Remove keys and their data from database."""
80 def remove_if(self, ns: str, key: str, data: bytes) -> bool:
82 Remove key and its data from database if if the current data value is expected
88 def add_member(self, ns: str, group: str, members: Set[bytes]) -> None:
89 """Add new members to a group under a namespace in database."""
93 def remove_member(self, ns: str, group: str, members: Set[bytes]) -> None:
94 """Remove members from a group under a namespace in database."""
98 def remove_group(self, ns: str, group: str) -> None:
99 """Remove a group under a namespace in database along with it's members."""
103 def get_members(self, ns: str, group: str) -> Set[bytes]:
104 """Get all the members of a group under a namespace in database."""
108 def is_member(self, ns: str, group: str, member: bytes) -> bool:
109 """Validate if a given member is in the group under a namespace in database."""
113 def group_size(self, ns: str, group: str) -> int:
114 """Return the number of members in a group under a namespace in database."""
118 def set_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
119 data_map: Dict[str, bytes]) -> None:
120 """Publish event to channel after writing data."""
124 def set_if_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]], key: str,
125 old_data: bytes, new_data: bytes) -> bool:
127 Publish event to channel after writing key value to database under a namespace
128 if the old value is expected one.
133 def set_if_not_exists_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
134 key: str, data: bytes) -> bool:
136 Publish event to channel after writing key value to database under a namespace if
142 def remove_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
143 keys: List[str]) -> None:
144 """Publish event to channel after removing data."""
148 def remove_if_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]], key: str,
149 data: bytes) -> bool:
151 Publish event to channel after removing key and its data from database if the
152 current data value is expected one.
157 def remove_all_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]]) -> None:
159 Publish event to channel after removing all keys in namespace.
164 def subscribe_channel(self, ns: str, cb: Callable[[str, str], None],
165 channels: List[str]) -> None:
167 This takes a callback function and one or many channels to be subscribed.
168 When an event is received for the given channel, the given callback function
169 shall be called with channel and notifications as parameter.
174 def unsubscribe_channel(self, ns: str, channels: List[str]) -> None:
175 """Unsubscribes from channel and removes set callback function."""
179 def start_event_listener(self) -> None:
181 start_event_listener creates an event loop in a separate thread for handling
182 notifications from subscriptions.
187 def handle_events(self) -> Optional[Tuple[str, str]]:
189 handle_events is a non-blocking function that returns a tuple containing channel
190 name and message received from notification.
195 class DbBackendLockAbc(ABC):
197 An abstract Shared Data Layer (SDL) class providing database backend lock interface.
199 ns (str): Namespace under which this lock is targeted.
200 name (str): Lock name, identifies the lock key in a database backend.
202 def __init__(self, ns: str, name: str) -> None:
204 self._lock_name = name
208 def acquire(self, retry_interval: Union[int, float] = 0.1,
209 retry_timeout: Union[int, float] = 10) -> bool:
210 """Acquire a database lock."""
214 def release(self) -> None:
215 """Release a database lock."""
219 def refresh(self) -> None:
220 """Refresh the remaining validity time of the database lock back to a initial value."""
224 def get_validity_time(self) -> Union[int, float]:
225 """Return remaining validity time of the lock in seconds."""