b15365a168c93a7de203643ec25542b09905d24d
[ric-plt/sdlpy.git] / ricsdl-package / ricsdl / syncstorage.py
1 # Copyright (c) 2019 AT&T Intellectual Property.
2 # Copyright (c) 2018-2019 Nokia.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 #
17 # This source code is part of the near-RT RIC (RAN Intelligent Controller)
18 # platform project (RICP).
19 #
20
21 """The module provides implementation of the syncronous Shared Data Layer (SDL) interface."""
22 import builtins
23 import inspect
24 from typing import (Any, Callable, Dict, Set, List, Optional, Tuple, Union)
25 from ricsdl.configuration import _Configuration
26 from ricsdl.syncstorage_abc import (SyncStorageAbc, SyncLockAbc)
27 import ricsdl.backend
28 from ricsdl.backend.dbbackend_abc import DbBackendAbc
29 from ricsdl.exceptions import (SdlException, SdlTypeError)
30
31
32 def func_arg_checker(exception, start_arg_idx, **types):
33     """Decorator to validate function arguments."""
34     def _check(func):
35         if not __debug__:
36             return func
37
38         def _validate(*args, **kwds):
39             for idx, arg in enumerate(args[start_arg_idx:], start_arg_idx):
40                 if func.__code__.co_varnames[idx] in types and \
41                         not isinstance(arg, types[func.__code__.co_varnames[idx]]):
42                     raise exception(r"Wrong argument type: '{}'={}. Must be: {}".
43                                     format(func.__code__.co_varnames[idx], type(arg),
44                                            types[func.__code__.co_varnames[idx]]))
45             for kwdname, kwdval in kwds.items():
46                 if kwdname in types and not isinstance(kwdval, types[kwdname]):
47                     raise exception(r"Wrong argument type: '{}'={}. Must be: {}".
48                                     format(kwdname, type(kwdval), types[kwdname]))
49             return func(*args, **kwds)
50         _validate.__name__ = func.__name__
51         return _validate
52     return _check
53
54
55 class SyncLock(SyncLockAbc):
56     """
57     This class implements Shared Data Layer (SDL) abstract 'SyncLockAbc' class.
58
59     A lock instance is created per namespace and it is identified by its `name` within a namespace.
60
61     Args:
62         ns (str): Namespace under which this lock is targeted.
63         name (str): Lock name, identifies the lock key in SDL storage.
64         expiration (int, float): Lock expiration time after which the lock is removed if it hasn't
65                                  been released earlier by a 'release' method.
66         storage (SyncStorage): Database backend object containing connection to a database.
67     """
68     @func_arg_checker(SdlTypeError, 1, ns=str, name=str, expiration=(int, float))
69     def __init__(self, ns: str, name: str, expiration: Union[int, float],
70                  storage: 'SyncStorage') -> None:
71
72         super().__init__(ns, name, expiration)
73         self.__configuration = storage.get_configuration()
74         self.__dbbackendlock = ricsdl.backend.get_backend_lock_instance(self.__configuration,
75                                                                         ns, name, expiration,
76                                                                         storage.get_backend())
77
78     def __str__(self):
79         return str(
80             {
81                 "namespace": self._ns,
82                 "name": self._name,
83                 "expiration": self._expiration,
84                 "backend lock": str(self.__dbbackendlock)
85             }
86         )
87
88     @func_arg_checker(SdlTypeError, 1, retry_interval=(int, float),
89                       retry_timeout=(int, float))
90     def acquire(self, retry_interval: Union[int, float] = 0.1,
91                 retry_timeout: Union[int, float] = 10) -> bool:
92         return self.__dbbackendlock.acquire(retry_interval, retry_timeout)
93
94     def release(self) -> None:
95         self.__dbbackendlock.release()
96
97     def refresh(self) -> None:
98         self.__dbbackendlock.refresh()
99
100     def get_validity_time(self) -> Union[int, float]:
101         return self.__dbbackendlock.get_validity_time()
102
103
104 class SyncStorage(SyncStorageAbc):
105     """
106     This class implements Shared Data Layer (SDL) abstract 'SyncStorageAbc' class.
107
108     This class provides synchronous access to all the namespaces in SDL storage.
109     Data can be written, read and removed based on keys known to clients. Keys are unique within
110     a namespace, namespace identifier is passed as a parameter to all the operations.
111
112     Args:
113         fake_db_backend (str): Optional parameter. Parameter enables fake DB backend usage for an
114                                SDL instance. Fake DB backend is ONLY allowed to use for testing
115                                purposes at development phase of SDL clients when more advanced
116                                database services are not necessarily needed. Currently value 'dict'
117                                is only allowed value for the parameter, which enables dictionary
118                                type of fake DB backend.
119     """
120     def __init__(self, fake_db_backend=None) -> None:
121         super().__init__()
122         self.__configuration = _Configuration(fake_db_backend)
123         self.event_separator = self.__configuration.get_event_separator()
124         self.__dbbackend = ricsdl.backend.get_backend_instance(self.__configuration)
125
126     def __del__(self):
127         self.close()
128
129     def __str__(self):
130         return str(
131             {
132                 "configuration": str(self.__configuration),
133                 "backend": str(self.__dbbackend)
134             }
135         )
136
137     def is_active(self):
138         try:
139             return self.__dbbackend.is_connected()
140         except SdlException:
141             return False
142
143     def close(self):
144         self.__dbbackend.close()
145
146     @func_arg_checker(SdlTypeError, 1, ns=str, data_map=dict)
147     def set(self, ns: str, data_map: Dict[str, bytes]) -> None:
148         self._validate_key_value_dict(data_map)
149         self.__dbbackend.set(ns, data_map)
150
151     @func_arg_checker(SdlTypeError, 1, ns=str, key=str, old_data=bytes, new_data=bytes)
152     def set_if(self, ns: str, key: str, old_data: bytes, new_data: bytes) -> bool:
153         return self.__dbbackend.set_if(ns, key, old_data, new_data)
154
155     @func_arg_checker(SdlTypeError, 1, ns=str, key=str, data=bytes)
156     def set_if_not_exists(self, ns: str, key: str, data: bytes) -> bool:
157         return self.__dbbackend.set_if_not_exists(ns, key, data)
158
159     @func_arg_checker(SdlTypeError, 1, ns=str, keys=(str, builtins.set))
160     def get(self, ns: str, keys: Union[str, Set[str]]) -> Dict[str, bytes]:
161         disordered = self.__dbbackend.get(ns, list(keys))
162         return {k: disordered[k] for k in sorted(disordered)}
163
164     @func_arg_checker(SdlTypeError, 1, ns=str, key_pattern=str)
165     def find_keys(self, ns: str, key_pattern: str) -> List[str]:
166         return self.__dbbackend.find_keys(ns, key_pattern)
167
168     @func_arg_checker(SdlTypeError, 1, ns=str, key_pattern=str)
169     def find_and_get(self, ns: str, key_pattern: str) -> Dict[str, bytes]:
170         disordered = self.__dbbackend.find_and_get(ns, key_pattern)
171         return {k: disordered[k] for k in sorted(disordered)}
172
173     @func_arg_checker(SdlTypeError, 1, ns=str, keys=(str, builtins.set))
174     def remove(self, ns: str, keys: Union[str, Set[str]]) -> None:
175         self.__dbbackend.remove(ns, list(keys))
176
177     @func_arg_checker(SdlTypeError, 1, ns=str, key=str, data=bytes)
178     def remove_if(self, ns: str, key: str, data: bytes) -> bool:
179         return self.__dbbackend.remove_if(ns, key, data)
180
181     @func_arg_checker(SdlTypeError, 1, ns=str)
182     def remove_all(self, ns: str) -> None:
183         keys = self.__dbbackend.find_keys(ns, '*')
184         if keys:
185             self.__dbbackend.remove(ns, keys)
186
187     @func_arg_checker(SdlTypeError, 1, ns=str, group=str, members=(bytes, builtins.set))
188     def add_member(self, ns: str, group: str, members: Union[bytes, Set[bytes]]) -> None:
189         self.__dbbackend.add_member(ns, group, members)
190
191     @func_arg_checker(SdlTypeError, 1, ns=str, group=str, members=(bytes, builtins.set))
192     def remove_member(self, ns: str, group: str, members: Union[bytes, Set[bytes]]) -> None:
193         self.__dbbackend.remove_member(ns, group, members)
194
195     @func_arg_checker(SdlTypeError, 1, ns=str, group=str)
196     def remove_group(self, ns: str, group: str) -> None:
197         self.__dbbackend.remove_group(ns, group)
198
199     @func_arg_checker(SdlTypeError, 1, ns=str, group=str)
200     def get_members(self, ns: str, group: str) -> Set[bytes]:
201         return self.__dbbackend.get_members(ns, group)
202
203     @func_arg_checker(SdlTypeError, 1, ns=str, group=str, member=bytes)
204     def is_member(self, ns: str, group: str, member: bytes) -> bool:
205         return self.__dbbackend.is_member(ns, group, member)
206
207     @func_arg_checker(SdlTypeError, 1, ns=str, group=str)
208     def group_size(self, ns: str, group: str) -> int:
209         return self.__dbbackend.group_size(ns, group)
210
211     @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, data_map=dict)
212     def set_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
213                         data_map: Dict[str, bytes]) -> None:
214         self._validate_key_value_dict(data_map)
215         self._validate_channels_events(channels_and_events)
216         for channel, events in channels_and_events.items():
217             channels_and_events[channel] = [events] if isinstance(events, str) else events
218         self.__dbbackend.set_and_publish(ns, channels_and_events, data_map)
219
220     @func_arg_checker(SdlTypeError,
221                       1,
222                       ns=str,
223                       channels_and_events=dict,
224                       key=str,
225                       old_data=bytes,
226                       new_data=bytes)
227     def set_if_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
228                            key: str, old_data: bytes, new_data: bytes) -> bool:
229         self._validate_channels_events(channels_and_events)
230         for channel, events in channels_and_events.items():
231             channels_and_events[channel] = [events] if isinstance(events, str) else events
232         return self.__dbbackend.set_if_and_publish(ns, channels_and_events, key, old_data, new_data)
233
234     @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, key=str, data=bytes)
235     def set_if_not_exists_and_publish(self, ns: str,
236                                       channels_and_events: Dict[str, Union[str, List[str]]],
237                                       key: str, data: bytes) -> bool:
238         self._validate_channels_events(channels_and_events)
239         for channel, events in channels_and_events.items():
240             channels_and_events[channel] = [events] if isinstance(events, str) else events
241         return self.__dbbackend.set_if_not_exists_and_publish(ns, channels_and_events, key, data)
242
243     @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, keys=(str, builtins.set))
244     def remove_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
245                            keys: Union[str, Set[str]]) -> None:
246         self._validate_channels_events(channels_and_events)
247         for channel, events in channels_and_events.items():
248             channels_and_events[channel] = [events] if isinstance(events, str) else events
249         keys = [keys] if isinstance(keys, str) else list(keys)
250         self.__dbbackend.remove_and_publish(ns, channels_and_events, keys)
251
252     @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, key=str, data=bytes)
253     def remove_if_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
254                               key: str, data: bytes) -> bool:
255         self._validate_channels_events(channels_and_events)
256         for channel, events in channels_and_events.items():
257             channels_and_events[channel] = [events] if isinstance(events, str) else events
258         return self.__dbbackend.remove_if_and_publish(ns, channels_and_events, key, data)
259
260     @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict)
261     def remove_all_and_publish(self, ns: str,
262                                channels_and_events: Dict[str, Union[str, List[str]]]) -> None:
263         self._validate_channels_events(channels_and_events)
264         for channel, events in channels_and_events.items():
265             channels_and_events[channel] = [events] if isinstance(events, str) else events
266         self.__dbbackend.remove_all_and_publish(ns, channels_and_events)
267
268     @func_arg_checker(SdlTypeError, 1, ns=str, cb=Callable, channels=(str, builtins.set))
269     def subscribe_channel(self, ns: str, cb: Callable[[str, List[str]], None],
270                           channels: Union[str, Set[str]]) -> None:
271         self._validate_callback(cb)
272         channels = [channels] if isinstance(channels, str) else list(channels)
273         self.__dbbackend.subscribe_channel(ns, cb, channels)
274
275     @func_arg_checker(SdlTypeError, 1, ns=str, channels=(str, builtins.set))
276     def unsubscribe_channel(self, ns: str, channels: Union[str, Set[str]]) -> None:
277         channels = [channels] if isinstance(channels, str) else list(channels)
278         self.__dbbackend.unsubscribe_channel(ns, channels)
279
280     def start_event_listener(self) -> None:
281         self.__dbbackend.start_event_listener()
282
283     def handle_events(self) -> Optional[Tuple[str, List[str]]]:
284         return self.__dbbackend.handle_events()
285
286     @func_arg_checker(SdlTypeError, 1, ns=str, resource=str, expiration=(int, float))
287     def get_lock_resource(self, ns: str, resource: str, expiration: Union[int, float]) -> SyncLock:
288         return SyncLock(ns, resource, expiration, self)
289
290     def get_backend(self) -> DbBackendAbc:
291         """Return backend instance."""
292         return self.__dbbackend
293
294     def get_configuration(self) -> _Configuration:
295         """Return configuration what was valid when the SDL instance was initiated."""
296         return self.__configuration
297
298     @classmethod
299     def _validate_key_value_dict(cls, kv):
300         for k, v in kv.items():
301             if not isinstance(k, str):
302                 raise SdlTypeError(r"Wrong dict key type: {}={}. Must be: str".format(k, type(k)))
303             if not isinstance(v, bytes):
304                 raise SdlTypeError(r"Wrong dict value type: {}={}. Must be: bytes".format(v, type(v)))
305
306     def _validate_channels_events(self, channels_and_events: Dict[Any, Any]):
307         for channel, events in channels_and_events.items():
308             if not isinstance(channel, str):
309                 raise SdlTypeError(r"Wrong channel type: {}={}. Must be: str".format(
310                     channel, type(channel)))
311             if not isinstance(events, (list, str)):
312                 raise SdlTypeError(r"Wrong event type: {}={}. Must be: str".format(
313                     events, type(events)))
314             if isinstance(events, list):
315                 for event in events:
316                     if not isinstance(event, str):
317                         raise SdlTypeError(r"Wrong event type: {}={}. Must be: str".format(
318                             events, type(events)))
319                     if self.event_separator in event:
320                         raise SdlTypeError(r"Events {} contains illegal substring (\"{}\")".format(
321                             events, self.event_separator))
322             else:
323                 if self.event_separator in events:
324                     raise SdlTypeError(r"Events {} contains illegal substring (\"{}\")".format(
325                         events, self.event_separator))
326
327     @classmethod
328     def _validate_callback(cls, cb):
329         param_len = len(inspect.signature(cb).parameters)
330         if param_len != 2:
331             raise SdlTypeError(
332                 f"Callback function should take 2 positional argument but {param_len} were given")