1 # Copyright (c) 2019 AT&T Intellectual Property.
2 # Copyright (c) 2018-2019 Nokia.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 # This source code is part of the near-RT RIC (RAN Intelligent Controller)
18 # platform project (RICP).
21 """The module provides implementation of the syncronous Shared Data Layer (SDL) interface."""
24 from typing import (Any, Callable, Dict, Set, List, Optional, Tuple, Union)
25 from ricsdl.configuration import _Configuration
26 from ricsdl.syncstorage_abc import (SyncStorageAbc, SyncLockAbc)
28 from ricsdl.backend.dbbackend_abc import DbBackendAbc
29 from ricsdl.exceptions import (SdlException, SdlTypeError)
32 def func_arg_checker(exception, start_arg_idx, **types):
33 """Decorator to validate function arguments."""
38 def _validate(*args, **kwds):
39 for idx, arg in enumerate(args[start_arg_idx:], start_arg_idx):
40 if func.__code__.co_varnames[idx] in types and \
41 not isinstance(arg, types[func.__code__.co_varnames[idx]]):
42 raise exception(r"Wrong argument type: '{}'={}. Must be: {}".
43 format(func.__code__.co_varnames[idx], type(arg),
44 types[func.__code__.co_varnames[idx]]))
45 for kwdname, kwdval in kwds.items():
46 if kwdname in types and not isinstance(kwdval, types[kwdname]):
47 raise exception(r"Wrong argument type: '{}'={}. Must be: {}".
48 format(kwdname, type(kwdval), types[kwdname]))
49 return func(*args, **kwds)
50 _validate.__name__ = func.__name__
55 class SyncLock(SyncLockAbc):
57 This class implements Shared Data Layer (SDL) abstract 'SyncLockAbc' class.
59 A lock instance is created per namespace and it is identified by its `name` within a namespace.
62 ns (str): Namespace under which this lock is targeted.
63 name (str): Lock name, identifies the lock key in SDL storage.
64 expiration (int, float): Lock expiration time after which the lock is removed if it hasn't
65 been released earlier by a 'release' method.
66 storage (SyncStorage): Database backend object containing connection to a database.
68 @func_arg_checker(SdlTypeError, 1, ns=str, name=str, expiration=(int, float))
69 def __init__(self, ns: str, name: str, expiration: Union[int, float],
70 storage: 'SyncStorage') -> None:
72 super().__init__(ns, name, expiration)
73 self.__configuration = storage.get_configuration()
74 self.__dbbackendlock = ricsdl.backend.get_backend_lock_instance(self.__configuration,
76 storage.get_backend())
81 "namespace": self._ns,
83 "expiration": self._expiration,
84 "backend lock": str(self.__dbbackendlock)
88 @func_arg_checker(SdlTypeError, 1, retry_interval=(int, float),
89 retry_timeout=(int, float))
90 def acquire(self, retry_interval: Union[int, float] = 0.1,
91 retry_timeout: Union[int, float] = 10) -> bool:
92 return self.__dbbackendlock.acquire(retry_interval, retry_timeout)
94 def release(self) -> None:
95 self.__dbbackendlock.release()
97 def refresh(self) -> None:
98 self.__dbbackendlock.refresh()
100 def get_validity_time(self) -> Union[int, float]:
101 return self.__dbbackendlock.get_validity_time()
104 class SyncStorage(SyncStorageAbc):
106 This class implements Shared Data Layer (SDL) abstract 'SyncStorageAbc' class.
108 This class provides synchronous access to all the namespaces in SDL storage.
109 Data can be written, read and removed based on keys known to clients. Keys are unique within
110 a namespace, namespace identifier is passed as a parameter to all the operations.
113 fake_db_backend (str): Optional parameter. Parameter enables fake DB backend usage for an
114 SDL instance. Fake DB backend is ONLY allowed to use for testing
115 purposes at development phase of SDL clients when more advanced
116 database services are not necessarily needed. Currently value 'dict'
117 is only allowed value for the parameter, which enables dictionary
118 type of fake DB backend.
120 def __init__(self, fake_db_backend=None) -> None:
122 self.__configuration = _Configuration(fake_db_backend)
123 self.event_separator = self.__configuration.get_event_separator()
124 self.__dbbackend = ricsdl.backend.get_backend_instance(self.__configuration)
132 "configuration": str(self.__configuration),
133 "backend": str(self.__dbbackend)
139 return self.__dbbackend.is_connected()
144 self.__dbbackend.close()
146 @func_arg_checker(SdlTypeError, 1, ns=str, data_map=dict)
147 def set(self, ns: str, data_map: Dict[str, bytes]) -> None:
148 self._validate_key_value_dict(data_map)
149 self.__dbbackend.set(ns, data_map)
151 @func_arg_checker(SdlTypeError, 1, ns=str, key=str, old_data=bytes, new_data=bytes)
152 def set_if(self, ns: str, key: str, old_data: bytes, new_data: bytes) -> bool:
153 return self.__dbbackend.set_if(ns, key, old_data, new_data)
155 @func_arg_checker(SdlTypeError, 1, ns=str, key=str, data=bytes)
156 def set_if_not_exists(self, ns: str, key: str, data: bytes) -> bool:
157 return self.__dbbackend.set_if_not_exists(ns, key, data)
159 @func_arg_checker(SdlTypeError, 1, ns=str, keys=(str, builtins.set))
160 def get(self, ns: str, keys: Union[str, Set[str]]) -> Dict[str, bytes]:
161 disordered = self.__dbbackend.get(ns, list(keys))
162 return {k: disordered[k] for k in sorted(disordered)}
164 @func_arg_checker(SdlTypeError, 1, ns=str, key_pattern=str)
165 def find_keys(self, ns: str, key_pattern: str) -> List[str]:
166 return self.__dbbackend.find_keys(ns, key_pattern)
168 @func_arg_checker(SdlTypeError, 1, ns=str, key_pattern=str)
169 def find_and_get(self, ns: str, key_pattern: str) -> Dict[str, bytes]:
170 disordered = self.__dbbackend.find_and_get(ns, key_pattern)
171 return {k: disordered[k] for k in sorted(disordered)}
173 @func_arg_checker(SdlTypeError, 1, ns=str, keys=(str, builtins.set))
174 def remove(self, ns: str, keys: Union[str, Set[str]]) -> None:
175 self.__dbbackend.remove(ns, list(keys))
177 @func_arg_checker(SdlTypeError, 1, ns=str, key=str, data=bytes)
178 def remove_if(self, ns: str, key: str, data: bytes) -> bool:
179 return self.__dbbackend.remove_if(ns, key, data)
181 @func_arg_checker(SdlTypeError, 1, ns=str)
182 def remove_all(self, ns: str) -> None:
183 keys = self.__dbbackend.find_keys(ns, '*')
185 self.__dbbackend.remove(ns, keys)
187 @func_arg_checker(SdlTypeError, 1, ns=str, group=str, members=(bytes, builtins.set))
188 def add_member(self, ns: str, group: str, members: Union[bytes, Set[bytes]]) -> None:
189 self.__dbbackend.add_member(ns, group, members)
191 @func_arg_checker(SdlTypeError, 1, ns=str, group=str, members=(bytes, builtins.set))
192 def remove_member(self, ns: str, group: str, members: Union[bytes, Set[bytes]]) -> None:
193 self.__dbbackend.remove_member(ns, group, members)
195 @func_arg_checker(SdlTypeError, 1, ns=str, group=str)
196 def remove_group(self, ns: str, group: str) -> None:
197 self.__dbbackend.remove_group(ns, group)
199 @func_arg_checker(SdlTypeError, 1, ns=str, group=str)
200 def get_members(self, ns: str, group: str) -> Set[bytes]:
201 return self.__dbbackend.get_members(ns, group)
203 @func_arg_checker(SdlTypeError, 1, ns=str, group=str, member=bytes)
204 def is_member(self, ns: str, group: str, member: bytes) -> bool:
205 return self.__dbbackend.is_member(ns, group, member)
207 @func_arg_checker(SdlTypeError, 1, ns=str, group=str)
208 def group_size(self, ns: str, group: str) -> int:
209 return self.__dbbackend.group_size(ns, group)
211 @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, data_map=dict)
212 def set_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
213 data_map: Dict[str, bytes]) -> None:
214 self._validate_key_value_dict(data_map)
215 self._validate_channels_events(channels_and_events)
216 for channel, events in channels_and_events.items():
217 channels_and_events[channel] = [events] if isinstance(events, str) else events
218 self.__dbbackend.set_and_publish(ns, channels_and_events, data_map)
220 @func_arg_checker(SdlTypeError,
223 channels_and_events=dict,
227 def set_if_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
228 key: str, old_data: bytes, new_data: bytes) -> bool:
229 self._validate_channels_events(channels_and_events)
230 for channel, events in channels_and_events.items():
231 channels_and_events[channel] = [events] if isinstance(events, str) else events
232 return self.__dbbackend.set_if_and_publish(ns, channels_and_events, key, old_data, new_data)
234 @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, key=str, data=bytes)
235 def set_if_not_exists_and_publish(self, ns: str,
236 channels_and_events: Dict[str, Union[str, List[str]]],
237 key: str, data: bytes) -> bool:
238 self._validate_channels_events(channels_and_events)
239 for channel, events in channels_and_events.items():
240 channels_and_events[channel] = [events] if isinstance(events, str) else events
241 return self.__dbbackend.set_if_not_exists_and_publish(ns, channels_and_events, key, data)
243 @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, keys=(str, builtins.set))
244 def remove_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
245 keys: Union[str, Set[str]]) -> None:
246 self._validate_channels_events(channels_and_events)
247 for channel, events in channels_and_events.items():
248 channels_and_events[channel] = [events] if isinstance(events, str) else events
249 keys = [keys] if isinstance(keys, str) else list(keys)
250 self.__dbbackend.remove_and_publish(ns, channels_and_events, keys)
252 @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, key=str, data=bytes)
253 def remove_if_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
254 key: str, data: bytes) -> bool:
255 self._validate_channels_events(channels_and_events)
256 for channel, events in channels_and_events.items():
257 channels_and_events[channel] = [events] if isinstance(events, str) else events
258 return self.__dbbackend.remove_if_and_publish(ns, channels_and_events, key, data)
260 @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict)
261 def remove_all_and_publish(self, ns: str,
262 channels_and_events: Dict[str, Union[str, List[str]]]) -> None:
263 self._validate_channels_events(channels_and_events)
264 for channel, events in channels_and_events.items():
265 channels_and_events[channel] = [events] if isinstance(events, str) else events
266 self.__dbbackend.remove_all_and_publish(ns, channels_and_events)
268 @func_arg_checker(SdlTypeError, 1, ns=str, cb=Callable, channels=(str, builtins.set))
269 def subscribe_channel(self, ns: str, cb: Callable[[str, List[str]], None],
270 channels: Union[str, Set[str]]) -> None:
271 self._validate_callback(cb)
272 channels = [channels] if isinstance(channels, str) else list(channels)
273 self.__dbbackend.subscribe_channel(ns, cb, channels)
275 @func_arg_checker(SdlTypeError, 1, ns=str, channels=(str, builtins.set))
276 def unsubscribe_channel(self, ns: str, channels: Union[str, Set[str]]) -> None:
277 channels = [channels] if isinstance(channels, str) else list(channels)
278 self.__dbbackend.unsubscribe_channel(ns, channels)
280 def start_event_listener(self) -> None:
281 self.__dbbackend.start_event_listener()
283 def handle_events(self) -> Optional[Tuple[str, List[str]]]:
284 return self.__dbbackend.handle_events()
286 @func_arg_checker(SdlTypeError, 1, ns=str, resource=str, expiration=(int, float))
287 def get_lock_resource(self, ns: str, resource: str, expiration: Union[int, float]) -> SyncLock:
288 return SyncLock(ns, resource, expiration, self)
290 def get_backend(self) -> DbBackendAbc:
291 """Return backend instance."""
292 return self.__dbbackend
294 def get_configuration(self) -> _Configuration:
295 """Return configuration what was valid when the SDL instance was initiated."""
296 return self.__configuration
299 def _validate_key_value_dict(cls, kv):
300 for k, v in kv.items():
301 if not isinstance(k, str):
302 raise SdlTypeError(r"Wrong dict key type: {}={}. Must be: str".format(k, type(k)))
303 if not isinstance(v, bytes):
304 raise SdlTypeError(r"Wrong dict value type: {}={}. Must be: bytes".format(v, type(v)))
306 def _validate_channels_events(self, channels_and_events: Dict[Any, Any]):
307 for channel, events in channels_and_events.items():
308 if not isinstance(channel, str):
309 raise SdlTypeError(r"Wrong channel type: {}={}. Must be: str".format(
310 channel, type(channel)))
311 if not isinstance(events, (list, str)):
312 raise SdlTypeError(r"Wrong event type: {}={}. Must be: str".format(
313 events, type(events)))
314 if isinstance(events, list):
316 if not isinstance(event, str):
317 raise SdlTypeError(r"Wrong event type: {}={}. Must be: str".format(
318 events, type(events)))
319 if self.event_separator in event:
320 raise SdlTypeError(r"Events {} contains illegal substring (\"{}\")".format(
321 events, self.event_separator))
323 if self.event_separator in events:
324 raise SdlTypeError(r"Events {} contains illegal substring (\"{}\")".format(
325 events, self.event_separator))
328 def _validate_callback(cls, cb):
329 param_len = len(inspect.signature(cb).parameters)
332 f"Callback function should take 2 positional argument but {param_len} were given")