1 # Copyright (c) 2019 AT&T Intellectual Property.
2 # Copyright (c) 2018-2019 Nokia.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 # This source code is part of the near-RT RIC (RAN Intelligent Controller)
18 # platform project (RICP).
21 """The module provides implementation of the syncronous Shared Data Layer (SDL) interface."""
24 from typing import (Any, Callable, Dict, Set, List, Optional, Tuple, Union)
25 from ricsdl.configuration import _Configuration
26 from ricsdl.syncstorage_abc import (SyncStorageAbc, SyncLockAbc)
28 from ricsdl.backend.dbbackend_abc import DbBackendAbc
29 from ricsdl.exceptions import (SdlException, SdlTypeError)
32 def func_arg_checker(exception, start_arg_idx, **types):
33 """Decorator to validate function arguments."""
38 def _validate(*args, **kwds):
39 for idx, arg in enumerate(args[start_arg_idx:], start_arg_idx):
40 if func.__code__.co_varnames[idx] in types and \
41 not isinstance(arg, types[func.__code__.co_varnames[idx]]):
42 raise exception(r"Wrong argument type: '{}'={}. Must be: {}".
43 format(func.__code__.co_varnames[idx], type(arg),
44 types[func.__code__.co_varnames[idx]]))
45 for kwdname, kwdval in kwds.items():
46 if kwdname in types and not isinstance(kwdval, types[kwdname]):
47 raise exception(r"Wrong argument type: '{}'={}. Must be: {}".
48 format(kwdname, type(kwdval), types[kwdname]))
49 return func(*args, **kwds)
50 _validate.__name__ = func.__name__
55 class SyncLock(SyncLockAbc):
57 This class implements Shared Data Layer (SDL) abstract 'SyncLockAbc' class.
59 A lock instance is created per namespace and it is identified by its `name` within a namespace.
62 ns (str): Namespace under which this lock is targeted.
63 name (str): Lock name, identifies the lock key in SDL storage.
64 expiration (int, float): Lock expiration time after which the lock is removed if it hasn't
65 been released earlier by a 'release' method.
66 storage (SyncStorage): Database backend object containing connection to a database.
68 @func_arg_checker(SdlTypeError, 1, ns=str, name=str, expiration=(int, float))
69 def __init__(self, ns: str, name: str, expiration: Union[int, float],
70 storage: 'SyncStorage') -> None:
72 super().__init__(ns, name, expiration)
73 self.__configuration = storage.get_configuration()
74 self.__dbbackendlock = ricsdl.backend.get_backend_lock_instance(self.__configuration,
76 storage.get_backend())
81 "namespace": self._ns,
83 "expiration": self._expiration,
84 "backend lock": str(self.__dbbackendlock)
88 @func_arg_checker(SdlTypeError, 1, retry_interval=(int, float),
89 retry_timeout=(int, float))
90 def acquire(self, retry_interval: Union[int, float] = 0.1,
91 retry_timeout: Union[int, float] = 10) -> bool:
92 return self.__dbbackendlock.acquire(retry_interval, retry_timeout)
94 def release(self) -> None:
95 self.__dbbackendlock.release()
97 def refresh(self) -> None:
98 self.__dbbackendlock.refresh()
100 def get_validity_time(self) -> Union[int, float]:
101 return self.__dbbackendlock.get_validity_time()
104 class SyncStorage(SyncStorageAbc):
106 This class implements Shared Data Layer (SDL) abstract 'SyncStorageAbc' class.
108 This class provides synchronous access to all the namespaces in SDL storage.
109 Data can be written, read and removed based on keys known to clients. Keys are unique within
110 a namespace, namespace identifier is passed as a parameter to all the operations.
113 fake_db_backend (str): Optional parameter. Parameter enables fake DB backend usage for an
114 SDL instance. Fake DB backend is ONLY allowed to use for testing
115 purposes at development phase of SDL clients when more advanced
116 database services are not necessarily needed. Currently value 'dict'
117 is only allowed value for the parameter, which enables dictionary
118 type of fake DB backend.
120 def __init__(self, fake_db_backend=None) -> None:
122 self.__configuration = _Configuration(fake_db_backend)
123 self.__dbbackend = ricsdl.backend.get_backend_instance(self.__configuration)
131 "configuration": str(self.__configuration),
132 "backend": str(self.__dbbackend)
138 return self.__dbbackend.is_connected()
143 self.__dbbackend.close()
145 @func_arg_checker(SdlTypeError, 1, ns=str, data_map=dict)
146 def set(self, ns: str, data_map: Dict[str, bytes]) -> None:
147 self._validate_key_value_dict(data_map)
148 self.__dbbackend.set(ns, data_map)
150 @func_arg_checker(SdlTypeError, 1, ns=str, key=str, old_data=bytes, new_data=bytes)
151 def set_if(self, ns: str, key: str, old_data: bytes, new_data: bytes) -> bool:
152 return self.__dbbackend.set_if(ns, key, old_data, new_data)
154 @func_arg_checker(SdlTypeError, 1, ns=str, key=str, data=bytes)
155 def set_if_not_exists(self, ns: str, key: str, data: bytes) -> bool:
156 return self.__dbbackend.set_if_not_exists(ns, key, data)
158 @func_arg_checker(SdlTypeError, 1, ns=str, keys=(str, builtins.set))
159 def get(self, ns: str, keys: Union[str, Set[str]]) -> Dict[str, bytes]:
160 disordered = self.__dbbackend.get(ns, list(keys))
161 return {k: disordered[k] for k in sorted(disordered)}
163 @func_arg_checker(SdlTypeError, 1, ns=str, key_pattern=str)
164 def find_keys(self, ns: str, key_pattern: str) -> List[str]:
165 return self.__dbbackend.find_keys(ns, key_pattern)
167 @func_arg_checker(SdlTypeError, 1, ns=str, key_pattern=str)
168 def find_and_get(self, ns: str, key_pattern: str) -> Dict[str, bytes]:
169 disordered = self.__dbbackend.find_and_get(ns, key_pattern)
170 return {k: disordered[k] for k in sorted(disordered)}
172 @func_arg_checker(SdlTypeError, 1, ns=str, keys=(str, builtins.set))
173 def remove(self, ns: str, keys: Union[str, Set[str]]) -> None:
174 self.__dbbackend.remove(ns, list(keys))
176 @func_arg_checker(SdlTypeError, 1, ns=str, key=str, data=bytes)
177 def remove_if(self, ns: str, key: str, data: bytes) -> bool:
178 return self.__dbbackend.remove_if(ns, key, data)
180 @func_arg_checker(SdlTypeError, 1, ns=str)
181 def remove_all(self, ns: str) -> None:
182 keys = self.__dbbackend.find_keys(ns, '*')
184 self.__dbbackend.remove(ns, keys)
186 @func_arg_checker(SdlTypeError, 1, ns=str, group=str, members=(bytes, builtins.set))
187 def add_member(self, ns: str, group: str, members: Union[bytes, Set[bytes]]) -> None:
188 self.__dbbackend.add_member(ns, group, members)
190 @func_arg_checker(SdlTypeError, 1, ns=str, group=str, members=(bytes, builtins.set))
191 def remove_member(self, ns: str, group: str, members: Union[bytes, Set[bytes]]) -> None:
192 self.__dbbackend.remove_member(ns, group, members)
194 @func_arg_checker(SdlTypeError, 1, ns=str, group=str)
195 def remove_group(self, ns: str, group: str) -> None:
196 self.__dbbackend.remove_group(ns, group)
198 @func_arg_checker(SdlTypeError, 1, ns=str, group=str)
199 def get_members(self, ns: str, group: str) -> Set[bytes]:
200 return self.__dbbackend.get_members(ns, group)
202 @func_arg_checker(SdlTypeError, 1, ns=str, group=str, member=bytes)
203 def is_member(self, ns: str, group: str, member: bytes) -> bool:
204 return self.__dbbackend.is_member(ns, group, member)
206 @func_arg_checker(SdlTypeError, 1, ns=str, group=str)
207 def group_size(self, ns: str, group: str) -> int:
208 return self.__dbbackend.group_size(ns, group)
210 @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, data_map=dict)
211 def set_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
212 data_map: Dict[str, bytes]) -> None:
213 self._validate_key_value_dict(data_map)
214 self._validate_channels_events(channels_and_events)
215 for channel, events in channels_and_events.items():
216 channels_and_events[channel] = [events] if isinstance(events, str) else events
217 self.__dbbackend.set_and_publish(ns, channels_and_events, data_map)
219 @func_arg_checker(SdlTypeError,
222 channels_and_events=dict,
226 def set_if_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
227 key: str, old_data: bytes, new_data: bytes) -> bool:
228 self._validate_channels_events(channels_and_events)
229 for channel, events in channels_and_events.items():
230 channels_and_events[channel] = [events] if isinstance(events, str) else events
231 return self.__dbbackend.set_if_and_publish(ns, channels_and_events, key, old_data, new_data)
233 @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, key=str, data=bytes)
234 def set_if_not_exists_and_publish(self, ns: str,
235 channels_and_events: Dict[str, Union[str, List[str]]],
236 key: str, data: bytes) -> bool:
237 self._validate_channels_events(channels_and_events)
238 for channel, events in channels_and_events.items():
239 channels_and_events[channel] = [events] if isinstance(events, str) else events
240 return self.__dbbackend.set_if_not_exists_and_publish(ns, channels_and_events, key, data)
242 @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, keys=(str, builtins.set))
243 def remove_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
244 keys: Union[str, Set[str]]) -> None:
245 self._validate_channels_events(channels_and_events)
246 for channel, events in channels_and_events.items():
247 channels_and_events[channel] = [events] if isinstance(events, str) else events
248 keys = [keys] if isinstance(keys, str) else list(keys)
249 self.__dbbackend.remove_and_publish(ns, channels_and_events, keys)
251 @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, key=str, data=bytes)
252 def remove_if_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
253 key: str, data: bytes) -> bool:
254 self._validate_channels_events(channels_and_events)
255 for channel, events in channels_and_events.items():
256 channels_and_events[channel] = [events] if isinstance(events, str) else events
257 return self.__dbbackend.remove_if_and_publish(ns, channels_and_events, key, data)
259 @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict)
260 def remove_all_and_publish(self, ns: str,
261 channels_and_events: Dict[str, Union[str, List[str]]]) -> None:
262 self._validate_channels_events(channels_and_events)
263 for channel, events in channels_and_events.items():
264 channels_and_events[channel] = [events] if isinstance(events, str) else events
265 self.__dbbackend.remove_all_and_publish(ns, channels_and_events)
267 @func_arg_checker(SdlTypeError, 1, ns=str, cb=Callable, channels=(str, builtins.set))
268 def subscribe_channel(self, ns: str, cb: Callable[[str, str], None],
269 channels: Union[str, Set[str]]) -> None:
270 self._validate_callback(cb)
271 channels = [channels] if isinstance(channels, str) else list(channels)
272 self.__dbbackend.subscribe_channel(ns, cb, channels)
274 @func_arg_checker(SdlTypeError, 1, ns=str, channels=(str, builtins.set))
275 def unsubscribe_channel(self, ns: str, channels: Union[str, Set[str]]) -> None:
276 channels = [channels] if isinstance(channels, str) else list(channels)
277 self.__dbbackend.unsubscribe_channel(ns, channels)
279 def start_event_listener(self) -> None:
280 self.__dbbackend.start_event_listener()
282 def handle_events(self) -> Optional[Tuple[str, str]]:
283 return self.__dbbackend.handle_events()
285 @func_arg_checker(SdlTypeError, 1, ns=str, resource=str, expiration=(int, float))
286 def get_lock_resource(self, ns: str, resource: str, expiration: Union[int, float]) -> SyncLock:
287 return SyncLock(ns, resource, expiration, self)
289 def get_backend(self) -> DbBackendAbc:
290 """Return backend instance."""
291 return self.__dbbackend
293 def get_configuration(self) -> _Configuration:
294 """Return configuration what was valid when the SDL instance was initiated."""
295 return self.__configuration
298 def _validate_key_value_dict(cls, kv):
299 for k, v in kv.items():
300 if not isinstance(k, str):
301 raise SdlTypeError(r"Wrong dict key type: {}={}. Must be: str".format(k, type(k)))
302 if not isinstance(v, bytes):
303 raise SdlTypeError(r"Wrong dict value type: {}={}. Must be: bytes".format(v, type(v)))
306 def _validate_channels_events(cls, channels_and_events: Dict[Any, Any]):
307 for channel, events in channels_and_events.items():
308 if not isinstance(channel, str):
309 raise SdlTypeError(r"Wrong channel type: {}={}. Must be: str".format(
310 channel, type(channel)))
311 if not isinstance(events, (list, str)):
312 raise SdlTypeError(r"Wrong event type: {}={}. Must be: str".format(
313 events, type(events)))
314 if isinstance(events, list):
316 if not isinstance(event, str):
317 raise SdlTypeError(r"Wrong event type: {}={}. Must be: str".format(
318 events, type(events)))
321 def _validate_callback(cls, cb):
322 param_len = len(inspect.signature(cb).parameters)
325 f"Callback function should take 2 positional argument but {param_len} were given")