1 # Copyright (c) 2019 AT&T Intellectual Property.
2 # Copyright (c) 2018-2022 Nokia.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 # This source code is part of the near-RT RIC (RAN Intelligent Controller)
18 # platform project (RICP).
21 """The module provides implementation of the syncronous Shared Data Layer (SDL) interface."""
24 from typing import (Any, Callable, Dict, Set, List, Optional, Tuple, Union)
25 from ricsdl.configuration import _Configuration
26 from ricsdl.syncstorage_abc import (SyncStorageAbc, SyncLockAbc)
28 from ricsdl.backend.dbbackend_abc import DbBackendAbc
29 from ricsdl.exceptions import (SdlException, SdlTypeError)
32 def func_arg_checker(exception, start_arg_idx, **types):
33 """Decorator to validate function arguments."""
38 def _validate(*args, **kwds):
39 for idx, arg in enumerate(args[start_arg_idx:], start_arg_idx):
40 if func.__code__.co_varnames[idx] in types and \
41 not isinstance(arg, types[func.__code__.co_varnames[idx]]):
42 raise exception(r"Wrong argument type: '{}'={}. Must be: {}".
43 format(func.__code__.co_varnames[idx], type(arg),
44 types[func.__code__.co_varnames[idx]]))
45 for kwdname, kwdval in kwds.items():
46 if kwdname in types and not isinstance(kwdval, types[kwdname]):
47 raise exception(r"Wrong argument type: '{}'={}. Must be: {}".
48 format(kwdname, type(kwdval), types[kwdname]))
49 return func(*args, **kwds)
50 _validate.__name__ = func.__name__
55 class SyncLock(SyncLockAbc):
57 This class implements Shared Data Layer (SDL) abstract 'SyncLockAbc' class.
59 A lock instance is created per namespace and it is identified by its `name` within a namespace.
62 ns (str): Namespace under which this lock is targeted.
63 name (str): Lock name, identifies the lock key in SDL storage.
64 expiration (int, float): Lock expiration time after which the lock is removed if it hasn't
65 been released earlier by a 'release' method.
66 storage (SyncStorage): Database backend object containing connection to a database.
68 @func_arg_checker(SdlTypeError, 1, ns=str, name=str, expiration=(int, float))
69 def __init__(self, ns: str, name: str, expiration: Union[int, float],
70 storage: 'SyncStorage') -> None:
72 super().__init__(ns, name, expiration)
73 self.__configuration = storage.get_configuration()
74 self.__dbbackendlock = ricsdl.backend.get_backend_lock_instance(self.__configuration,
76 storage.get_backend())
81 "namespace": self._ns,
83 "expiration": self._expiration,
84 "backend lock": str(self.__dbbackendlock)
88 @func_arg_checker(SdlTypeError, 1, retry_interval=(int, float),
89 retry_timeout=(int, float))
90 def acquire(self, retry_interval: Union[int, float] = 0.1,
91 retry_timeout: Union[int, float] = 10) -> bool:
92 return self.__dbbackendlock.acquire(retry_interval, retry_timeout)
94 def release(self) -> None:
95 self.__dbbackendlock.release()
97 def refresh(self) -> None:
98 self.__dbbackendlock.refresh()
100 def get_validity_time(self) -> Union[int, float]:
101 return self.__dbbackendlock.get_validity_time()
104 class SyncStorage(SyncStorageAbc):
106 This class implements Shared Data Layer (SDL) abstract 'SyncStorageAbc' class.
108 This class provides synchronous access to all the namespaces in SDL storage.
109 Data can be written, read and removed based on keys known to clients. Keys are unique within
110 a namespace, namespace identifier is passed as a parameter to all the operations.
113 fake_db_backend (str): Optional parameter. Parameter enables fake DB backend usage for an
114 SDL instance. Fake DB backend is ONLY allowed to use for testing
115 purposes at development phase of SDL clients when more advanced
116 database services are not necessarily needed. Currently value 'dict'
117 is only allowed value for the parameter, which enables dictionary
118 type of fake DB backend.
120 def __init__(self, fake_db_backend=None) -> None:
122 self.__dbbackend = None
123 self.__configuration = _Configuration(fake_db_backend)
124 self.event_separator = self.__configuration.get_event_separator()
125 self.__dbbackend = ricsdl.backend.get_backend_instance(self.__configuration)
133 "configuration": str(self.__configuration),
134 "backend": str(self.__dbbackend)
140 return self.__dbbackend.is_connected()
146 self.__dbbackend.close()
148 @func_arg_checker(SdlTypeError, 1, ns=str, data_map=dict)
149 def set(self, ns: str, data_map: Dict[str, bytes]) -> None:
150 self._validate_key_value_dict(data_map)
151 self.__dbbackend.set(ns, data_map)
153 @func_arg_checker(SdlTypeError, 1, ns=str, key=str, old_data=bytes, new_data=bytes)
154 def set_if(self, ns: str, key: str, old_data: bytes, new_data: bytes) -> bool:
155 return self.__dbbackend.set_if(ns, key, old_data, new_data)
157 @func_arg_checker(SdlTypeError, 1, ns=str, key=str, data=bytes)
158 def set_if_not_exists(self, ns: str, key: str, data: bytes) -> bool:
159 return self.__dbbackend.set_if_not_exists(ns, key, data)
161 @func_arg_checker(SdlTypeError, 1, ns=str, keys=(str, builtins.set))
162 def get(self, ns: str, keys: Union[str, Set[str]]) -> Dict[str, bytes]:
163 disordered = self.__dbbackend.get(ns, list(keys))
164 return {k: disordered[k] for k in sorted(disordered)}
166 @func_arg_checker(SdlTypeError, 1, ns=str, key_pattern=str)
167 def find_keys(self, ns: str, key_pattern: str) -> List[str]:
168 return self.__dbbackend.find_keys(ns, key_pattern)
170 @func_arg_checker(SdlTypeError, 1, ns=str, key_pattern=str)
171 def find_and_get(self, ns: str, key_pattern: str) -> Dict[str, bytes]:
172 disordered = self.__dbbackend.find_and_get(ns, key_pattern)
173 return {k: disordered[k] for k in sorted(disordered)}
175 @func_arg_checker(SdlTypeError, 1, ns=str, keys=(str, builtins.set))
176 def remove(self, ns: str, keys: Union[str, Set[str]]) -> None:
177 self.__dbbackend.remove(ns, list(keys))
179 @func_arg_checker(SdlTypeError, 1, ns=str, key=str, data=bytes)
180 def remove_if(self, ns: str, key: str, data: bytes) -> bool:
181 return self.__dbbackend.remove_if(ns, key, data)
183 @func_arg_checker(SdlTypeError, 1, ns=str)
184 def remove_all(self, ns: str) -> None:
185 keys = self.__dbbackend.find_keys(ns, '*')
187 self.__dbbackend.remove(ns, keys)
189 @func_arg_checker(SdlTypeError, 1, ns=str, group=str, members=(bytes, builtins.set))
190 def add_member(self, ns: str, group: str, members: Union[bytes, Set[bytes]]) -> None:
191 self.__dbbackend.add_member(ns, group, members)
193 @func_arg_checker(SdlTypeError, 1, ns=str, group=str, members=(bytes, builtins.set))
194 def remove_member(self, ns: str, group: str, members: Union[bytes, Set[bytes]]) -> None:
195 self.__dbbackend.remove_member(ns, group, members)
197 @func_arg_checker(SdlTypeError, 1, ns=str, group=str)
198 def remove_group(self, ns: str, group: str) -> None:
199 self.__dbbackend.remove_group(ns, group)
201 @func_arg_checker(SdlTypeError, 1, ns=str, group=str)
202 def get_members(self, ns: str, group: str) -> Set[bytes]:
203 return self.__dbbackend.get_members(ns, group)
205 @func_arg_checker(SdlTypeError, 1, ns=str, group=str, member=bytes)
206 def is_member(self, ns: str, group: str, member: bytes) -> bool:
207 return self.__dbbackend.is_member(ns, group, member)
209 @func_arg_checker(SdlTypeError, 1, ns=str, group=str)
210 def group_size(self, ns: str, group: str) -> int:
211 return self.__dbbackend.group_size(ns, group)
213 @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, data_map=dict)
214 def set_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
215 data_map: Dict[str, bytes]) -> None:
216 self._validate_key_value_dict(data_map)
217 self._validate_channels_events(channels_and_events)
218 for channel, events in channels_and_events.items():
219 channels_and_events[channel] = [events] if isinstance(events, str) else events
220 self.__dbbackend.set_and_publish(ns, channels_and_events, data_map)
222 @func_arg_checker(SdlTypeError,
225 channels_and_events=dict,
229 def set_if_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
230 key: str, old_data: bytes, new_data: bytes) -> bool:
231 self._validate_channels_events(channels_and_events)
232 for channel, events in channels_and_events.items():
233 channels_and_events[channel] = [events] if isinstance(events, str) else events
234 return self.__dbbackend.set_if_and_publish(ns, channels_and_events, key, old_data, new_data)
236 @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, key=str, data=bytes)
237 def set_if_not_exists_and_publish(self, ns: str,
238 channels_and_events: Dict[str, Union[str, List[str]]],
239 key: str, data: bytes) -> bool:
240 self._validate_channels_events(channels_and_events)
241 for channel, events in channels_and_events.items():
242 channels_and_events[channel] = [events] if isinstance(events, str) else events
243 return self.__dbbackend.set_if_not_exists_and_publish(ns, channels_and_events, key, data)
245 @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, keys=(str, builtins.set))
246 def remove_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
247 keys: Union[str, Set[str]]) -> None:
248 self._validate_channels_events(channels_and_events)
249 for channel, events in channels_and_events.items():
250 channels_and_events[channel] = [events] if isinstance(events, str) else events
251 keys = [keys] if isinstance(keys, str) else list(keys)
252 self.__dbbackend.remove_and_publish(ns, channels_and_events, keys)
254 @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, key=str, data=bytes)
255 def remove_if_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
256 key: str, data: bytes) -> bool:
257 self._validate_channels_events(channels_and_events)
258 for channel, events in channels_and_events.items():
259 channels_and_events[channel] = [events] if isinstance(events, str) else events
260 return self.__dbbackend.remove_if_and_publish(ns, channels_and_events, key, data)
262 @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict)
263 def remove_all_and_publish(self, ns: str,
264 channels_and_events: Dict[str, Union[str, List[str]]]) -> None:
265 self._validate_channels_events(channels_and_events)
266 for channel, events in channels_and_events.items():
267 channels_and_events[channel] = [events] if isinstance(events, str) else events
268 self.__dbbackend.remove_all_and_publish(ns, channels_and_events)
270 @func_arg_checker(SdlTypeError, 1, ns=str, cb=Callable, channels=(str, builtins.set))
271 def subscribe_channel(self, ns: str, cb: Callable[[str, List[str]], None],
272 channels: Union[str, Set[str]]) -> None:
273 self._validate_callback(cb)
274 channels = [channels] if isinstance(channels, str) else list(channels)
275 self.__dbbackend.subscribe_channel(ns, cb, channels)
277 @func_arg_checker(SdlTypeError, 1, ns=str, channels=(str, builtins.set))
278 def unsubscribe_channel(self, ns: str, channels: Union[str, Set[str]]) -> None:
279 channels = [channels] if isinstance(channels, str) else list(channels)
280 self.__dbbackend.unsubscribe_channel(ns, channels)
282 def start_event_listener(self) -> None:
283 self.__dbbackend.start_event_listener()
285 def handle_events(self) -> Optional[Tuple[str, List[str]]]:
286 return self.__dbbackend.handle_events()
288 @func_arg_checker(SdlTypeError, 1, ns=str, resource=str, expiration=(int, float))
289 def get_lock_resource(self, ns: str, resource: str, expiration: Union[int, float]) -> SyncLock:
290 return SyncLock(ns, resource, expiration, self)
292 def get_backend(self) -> DbBackendAbc:
293 """Return backend instance."""
294 return self.__dbbackend
296 def get_configuration(self) -> _Configuration:
297 """Return configuration what was valid when the SDL instance was initiated."""
298 return self.__configuration
301 def _validate_key_value_dict(cls, kv):
302 for k, v in kv.items():
303 if not isinstance(k, str):
304 raise SdlTypeError(r"Wrong dict key type: {}={}. Must be: str".format(k, type(k)))
305 if not isinstance(v, bytes):
306 raise SdlTypeError(r"Wrong dict value type: {}={}. Must be: bytes".format(v, type(v)))
308 def _validate_channels_events(self, channels_and_events: Dict[Any, Any]):
309 for channel, events in channels_and_events.items():
310 if not isinstance(channel, str):
311 raise SdlTypeError(r"Wrong channel type: {}={}. Must be: str".format(
312 channel, type(channel)))
313 if not isinstance(events, (list, str)):
314 raise SdlTypeError(r"Wrong event type: {}={}. Must be: str".format(
315 events, type(events)))
316 if isinstance(events, list):
318 if not isinstance(event, str):
319 raise SdlTypeError(r"Wrong event type: {}={}. Must be: str".format(
320 events, type(events)))
321 if self.event_separator in event:
322 raise SdlTypeError(r"Events {} contains illegal substring (\"{}\")".format(
323 events, self.event_separator))
325 if self.event_separator in events:
326 raise SdlTypeError(r"Events {} contains illegal substring (\"{}\")".format(
327 events, self.event_separator))
330 def _validate_callback(cls, cb):
331 param_len = len(inspect.signature(cb).parameters)
334 f"Callback function should take 2 positional argument but {param_len} were given")