1 # Copyright (c) 2019 AT&T Intellectual Property.
2 # Copyright (c) 2018-2019 Nokia.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 # This source code is part of the near-RT RIC (RAN Intelligent Controller)
18 # platform project (RICP).
21 """The module provides implementation of the syncronous Shared Data Layer (SDL) interface."""
24 from typing import (Any, Callable, Dict, Set, List, Optional, Tuple, Union)
25 from ricsdl.configuration import _Configuration
26 from ricsdl.syncstorage_abc import (SyncStorageAbc, SyncLockAbc)
28 from ricsdl.backend.dbbackend_abc import DbBackendAbc
29 from ricsdl.exceptions import (SdlException, SdlTypeError)
32 def func_arg_checker(exception, start_arg_idx, **types):
33 """Decorator to validate function arguments."""
38 def _validate(*args, **kwds):
39 for idx, arg in enumerate(args[start_arg_idx:], start_arg_idx):
40 if func.__code__.co_varnames[idx] in types and \
41 not isinstance(arg, types[func.__code__.co_varnames[idx]]):
42 raise exception(r"Wrong argument type: '{}'={}. Must be: {}".
43 format(func.__code__.co_varnames[idx], type(arg),
44 types[func.__code__.co_varnames[idx]]))
45 for kwdname, kwdval in kwds.items():
46 if kwdname in types and not isinstance(kwdval, types[kwdname]):
47 raise exception(r"Wrong argument type: '{}'={}. Must be: {}".
48 format(kwdname, type(kwdval), types[kwdname]))
49 return func(*args, **kwds)
50 _validate.__name__ = func.__name__
55 class SyncLock(SyncLockAbc):
57 This class implements Shared Data Layer (SDL) abstract 'SyncLockAbc' class.
59 A lock instance is created per namespace and it is identified by its `name` within a namespace.
62 ns (str): Namespace under which this lock is targeted.
63 name (str): Lock name, identifies the lock key in SDL storage.
64 expiration (int, float): Lock expiration time after which the lock is removed if it hasn't
65 been released earlier by a 'release' method.
66 storage (SyncStorage): Database backend object containing connection to a database.
68 @func_arg_checker(SdlTypeError, 1, ns=str, name=str, expiration=(int, float))
69 def __init__(self, ns: str, name: str, expiration: Union[int, float],
70 storage: 'SyncStorage') -> None:
72 super().__init__(ns, name, expiration)
73 self.__configuration = storage.get_configuration()
74 self.__dbbackendlock = ricsdl.backend.get_backend_lock_instance(self.__configuration,
76 storage.get_backend())
81 "namespace": self._ns,
83 "expiration": self._expiration,
84 "backend lock": str(self.__dbbackendlock)
88 @func_arg_checker(SdlTypeError, 1, retry_interval=(int, float),
89 retry_timeout=(int, float))
90 def acquire(self, retry_interval: Union[int, float] = 0.1,
91 retry_timeout: Union[int, float] = 10) -> bool:
92 return self.__dbbackendlock.acquire(retry_interval, retry_timeout)
94 def release(self) -> None:
95 self.__dbbackendlock.release()
97 def refresh(self) -> None:
98 self.__dbbackendlock.refresh()
100 def get_validity_time(self) -> Union[int, float]:
101 return self.__dbbackendlock.get_validity_time()
104 class SyncStorage(SyncStorageAbc):
106 This class implements Shared Data Layer (SDL) abstract 'SyncStorageAbc' class.
108 This class provides synchronous access to all the namespaces in SDL storage.
109 Data can be written, read and removed based on keys known to clients. Keys are unique within
110 a namespace, namespace identifier is passed as a parameter to all the operations.
113 fake_db_backend (str): Optional parameter. Parameter enables fake DB backend usage for an
114 SDL instance. Fake DB backend is ONLY allowed to use for testing
115 purposes at development phase of SDL clients when more advanced
116 database services are not necessarily needed. Currently value 'dict'
117 is only allowed value for the parameter, which enables dictionary
118 type of fake DB backend.
120 def __init__(self, fake_db_backend=None) -> None:
122 self.__configuration = _Configuration(fake_db_backend)
123 self.event_separator = self.__configuration.get_event_separator()
124 self.__dbbackend = ricsdl.backend.get_backend_instance(self.__configuration)
132 "configuration": str(self.__configuration),
133 "backend": str(self.__dbbackend)
139 return self.__dbbackend.is_connected()
144 self.__dbbackend.close()
146 @func_arg_checker(SdlTypeError, 1, ns=str, data_map=dict)
147 def set(self, ns: str, data_map: Dict[str, bytes]) -> None:
148 self._validate_key_value_dict(data_map)
149 self.__dbbackend.set(ns, data_map)
151 @func_arg_checker(SdlTypeError, 1, ns=str, key=str, old_data=bytes, new_data=bytes)
152 def set_if(self, ns: str, key: str, old_data: bytes, new_data: bytes) -> bool:
153 return self.__dbbackend.set_if(ns, key, old_data, new_data)
155 @func_arg_checker(SdlTypeError, 1, ns=str, key=str, data=bytes)
156 def set_if_not_exists(self, ns: str, key: str, data: bytes) -> bool:
157 return self.__dbbackend.set_if_not_exists(ns, key, data)
159 @func_arg_checker(SdlTypeError, 1, ns=str, keys=(str, builtins.set))
160 def get(self, ns: str, keys: Union[str, Set[str]]) -> Dict[str, bytes]:
161 disordered = self.__dbbackend.get(ns, list(keys))
162 return {k: disordered[k] for k in sorted(disordered)}
164 @func_arg_checker(SdlTypeError, 1, ns=str, key_pattern=str)
165 def find_keys(self, ns: str, key_pattern: str) -> List[str]:
166 return self.__dbbackend.find_keys(ns, key_pattern)
168 @func_arg_checker(SdlTypeError, 1, ns=str, key_pattern=str)
169 def find_and_get(self, ns: str, key_pattern: str) -> Dict[str, bytes]:
170 disordered = self.__dbbackend.find_and_get(ns, key_pattern)
171 return {k: disordered[k] for k in sorted(disordered)}
173 @func_arg_checker(SdlTypeError, 1, ns=str, keys=(str, builtins.set))
174 def remove(self, ns: str, keys: Union[str, Set[str]]) -> None:
175 self.__dbbackend.remove(ns, list(keys))
177 @func_arg_checker(SdlTypeError, 1, ns=str, key=str, data=bytes)
178 def remove_if(self, ns: str, key: str, data: bytes) -> bool:
179 return self.__dbbackend.remove_if(ns, key, data)
181 @func_arg_checker(SdlTypeError, 1, ns=str)
182 def remove_all(self, ns: str) -> None:
183 keys = self.__dbbackend.find_keys(ns, '*')
185 self.__dbbackend.remove(ns, keys)
187 @func_arg_checker(SdlTypeError, 1, ns=str, group=str, members=(bytes, builtins.set))
188 def add_member(self, ns: str, group: str, members: Union[bytes, Set[bytes]]) -> None:
189 self.__dbbackend.add_member(ns, group, members)
191 @func_arg_checker(SdlTypeError, 1, ns=str, group=str, members=(bytes, builtins.set))
192 def remove_member(self, ns: str, group: str, members: Union[bytes, Set[bytes]]) -> None:
193 self.__dbbackend.remove_member(ns, group, members)
195 @func_arg_checker(SdlTypeError, 1, ns=str, group=str)
196 def remove_group(self, ns: str, group: str) -> None:
197 self.__dbbackend.remove_group(ns, group)
199 @func_arg_checker(SdlTypeError, 1, ns=str, group=str)
200 def get_members(self, ns: str, group: str) -> Set[bytes]:
201 return self.__dbbackend.get_members(ns, group)
203 @func_arg_checker(SdlTypeError, 1, ns=str, group=str, member=bytes)
204 def is_member(self, ns: str, group: str, member: bytes) -> bool:
205 return self.__dbbackend.is_member(ns, group, member)
207 @func_arg_checker(SdlTypeError, 1, ns=str, group=str)
208 def group_size(self, ns: str, group: str) -> int:
209 return self.__dbbackend.group_size(ns, group)
211 @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, data_map=dict)
212 def set_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
213 data_map: Dict[str, bytes]) -> None:
214 self._validate_key_value_dict(data_map)
215 self._validate_channels_events(channels_and_events)
216 for channel, events in channels_and_events.items():
217 channels_and_events[channel] = [events] if isinstance(events, str) else events
218 self.__dbbackend.set_and_publish(ns, channels_and_events, data_map)
220 @func_arg_checker(SdlTypeError,
223 channels_and_events=dict,
227 def set_if_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
228 key: str, old_data: bytes, new_data: bytes) -> bool:
229 self._validate_channels_events(channels_and_events)
230 for channel, events in channels_and_events.items():
231 channels_and_events[channel] = [events] if isinstance(events, str) else events
232 return self.__dbbackend.set_if_and_publish(ns, channels_and_events, key, old_data, new_data)
234 @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, key=str, data=bytes)
235 def set_if_not_exists_and_publish(self, ns: str,
236 channels_and_events: Dict[str, Union[str, List[str]]],
237 key: str, data: bytes) -> bool:
238 self._validate_channels_events(channels_and_events)
239 for channel, events in channels_and_events.items():
240 channels_and_events[channel] = [events] if isinstance(events, str) else events
241 return self.__dbbackend.set_if_not_exists_and_publish(ns, channels_and_events, key, data)
243 @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, keys=(str, builtins.set))
244 def remove_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
245 keys: Union[str, Set[str]]) -> None:
246 self._validate_channels_events(channels_and_events)
247 for channel, events in channels_and_events.items():
248 channels_and_events[channel] = [events] if isinstance(events, str) else events
249 keys = [keys] if isinstance(keys, str) else list(keys)
250 self.__dbbackend.remove_and_publish(ns, channels_and_events, keys)
252 @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, key=str, data=bytes)
253 def remove_if_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
254 key: str, data: bytes) -> bool:
255 self._validate_channels_events(channels_and_events)
256 for channel, events in channels_and_events.items():
257 channels_and_events[channel] = [events] if isinstance(events, str) else events
258 return self.__dbbackend.remove_if_and_publish(ns, channels_and_events, key, data)
260 @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict)
261 def remove_all_and_publish(self, ns: str,
262 channels_and_events: Dict[str, Union[str, List[str]]]) -> None:
263 self._validate_channels_events(channels_and_events)
264 for channel, events in channels_and_events.items():
265 channels_and_events[channel] = [events] if isinstance(events, str) else events
266 self.__dbbackend.remove_all_and_publish(ns, channels_and_events)
268 @func_arg_checker(SdlTypeError, 1, ns=str, cb=Callable, channels=(str, builtins.set))
269 def subscribe_channel(self, ns: str,
270 cb: Union[Callable[[str, str], None], Callable[[str, List[str]], None]],
271 channels: Union[str, Set[str]]) -> None:
272 self._validate_callback(cb)
273 channels = [channels] if isinstance(channels, str) else list(channels)
274 self.__dbbackend.subscribe_channel(ns, cb, channels)
276 @func_arg_checker(SdlTypeError, 1, ns=str, channels=(str, builtins.set))
277 def unsubscribe_channel(self, ns: str, channels: Union[str, Set[str]]) -> None:
278 channels = [channels] if isinstance(channels, str) else list(channels)
279 self.__dbbackend.unsubscribe_channel(ns, channels)
281 def start_event_listener(self) -> None:
282 self.__dbbackend.start_event_listener()
284 def handle_events(self) -> Optional[Union[Tuple[str, str], Tuple[str, List[str]]]]:
285 return self.__dbbackend.handle_events()
287 @func_arg_checker(SdlTypeError, 1, ns=str, resource=str, expiration=(int, float))
288 def get_lock_resource(self, ns: str, resource: str, expiration: Union[int, float]) -> SyncLock:
289 return SyncLock(ns, resource, expiration, self)
291 def get_backend(self) -> DbBackendAbc:
292 """Return backend instance."""
293 return self.__dbbackend
295 def get_configuration(self) -> _Configuration:
296 """Return configuration what was valid when the SDL instance was initiated."""
297 return self.__configuration
300 def _validate_key_value_dict(cls, kv):
301 for k, v in kv.items():
302 if not isinstance(k, str):
303 raise SdlTypeError(r"Wrong dict key type: {}={}. Must be: str".format(k, type(k)))
304 if not isinstance(v, bytes):
305 raise SdlTypeError(r"Wrong dict value type: {}={}. Must be: bytes".format(v, type(v)))
307 def _validate_channels_events(self, channels_and_events: Dict[Any, Any]):
308 for channel, events in channels_and_events.items():
309 if not isinstance(channel, str):
310 raise SdlTypeError(r"Wrong channel type: {}={}. Must be: str".format(
311 channel, type(channel)))
312 if not isinstance(events, (list, str)):
313 raise SdlTypeError(r"Wrong event type: {}={}. Must be: str".format(
314 events, type(events)))
315 if isinstance(events, list):
317 if not isinstance(event, str):
318 raise SdlTypeError(r"Wrong event type: {}={}. Must be: str".format(
319 events, type(events)))
320 if self.event_separator in event:
321 raise SdlTypeError(r"Events {} contains illegal substring (\"{}\")".format(
322 events, self.event_separator))
324 if self.event_separator in events:
325 raise SdlTypeError(r"Events {} contains illegal substring (\"{}\")".format(
326 events, self.event_separator))
329 def _validate_callback(cls, cb):
330 param_len = len(inspect.signature(cb).parameters)
333 f"Callback function should take 2 positional argument but {param_len} were given")