48b5e3d591ed83990da2b6630ff9b784cc5b95fe
[ric-plt/sdlpy.git] / ricsdl-package / ricsdl / syncstorage.py
1 # Copyright (c) 2019 AT&T Intellectual Property.
2 # Copyright (c) 2018-2019 Nokia.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 #
17 # This source code is part of the near-RT RIC (RAN Intelligent Controller)
18 # platform project (RICP).
19 #
20
21 """The module provides implementation of the syncronous Shared Data Layer (SDL) interface."""
22 import builtins
23 import inspect
24 from typing import (Any, Callable, Dict, Set, List, Optional, Tuple, Union)
25 from ricsdl.configuration import _Configuration
26 from ricsdl.syncstorage_abc import (SyncStorageAbc, SyncLockAbc)
27 import ricsdl.backend
28 from ricsdl.backend.dbbackend_abc import DbBackendAbc
29 from ricsdl.exceptions import (SdlException, SdlTypeError)
30
31
32 def func_arg_checker(exception, start_arg_idx, **types):
33     """Decorator to validate function arguments."""
34     def _check(func):
35         if not __debug__:
36             return func
37
38         def _validate(*args, **kwds):
39             for idx, arg in enumerate(args[start_arg_idx:], start_arg_idx):
40                 if func.__code__.co_varnames[idx] in types and \
41                         not isinstance(arg, types[func.__code__.co_varnames[idx]]):
42                     raise exception(r"Wrong argument type: '{}'={}. Must be: {}".
43                                     format(func.__code__.co_varnames[idx], type(arg),
44                                            types[func.__code__.co_varnames[idx]]))
45             for kwdname, kwdval in kwds.items():
46                 if kwdname in types and not isinstance(kwdval, types[kwdname]):
47                     raise exception(r"Wrong argument type: '{}'={}. Must be: {}".
48                                     format(kwdname, type(kwdval), types[kwdname]))
49             return func(*args, **kwds)
50         _validate.__name__ = func.__name__
51         return _validate
52     return _check
53
54
55 class SyncLock(SyncLockAbc):
56     """
57     This class implements Shared Data Layer (SDL) abstract 'SyncLockAbc' class.
58
59     A lock instance is created per namespace and it is identified by its `name` within a namespace.
60
61     Args:
62         ns (str): Namespace under which this lock is targeted.
63         name (str): Lock name, identifies the lock key in SDL storage.
64         expiration (int, float): Lock expiration time after which the lock is removed if it hasn't
65                                  been released earlier by a 'release' method.
66         storage (SyncStorage): Database backend object containing connection to a database.
67     """
68     @func_arg_checker(SdlTypeError, 1, ns=str, name=str, expiration=(int, float))
69     def __init__(self, ns: str, name: str, expiration: Union[int, float],
70                  storage: 'SyncStorage') -> None:
71
72         super().__init__(ns, name, expiration)
73         self.__configuration = storage.get_configuration()
74         self.__dbbackendlock = ricsdl.backend.get_backend_lock_instance(self.__configuration,
75                                                                         ns, name, expiration,
76                                                                         storage.get_backend())
77
78     def __str__(self):
79         return str(
80             {
81                 "namespace": self._ns,
82                 "name": self._name,
83                 "expiration": self._expiration,
84                 "backend lock": str(self.__dbbackendlock)
85             }
86         )
87
88     @func_arg_checker(SdlTypeError, 1, retry_interval=(int, float),
89                       retry_timeout=(int, float))
90     def acquire(self, retry_interval: Union[int, float] = 0.1,
91                 retry_timeout: Union[int, float] = 10) -> bool:
92         return self.__dbbackendlock.acquire(retry_interval, retry_timeout)
93
94     def release(self) -> None:
95         self.__dbbackendlock.release()
96
97     def refresh(self) -> None:
98         self.__dbbackendlock.refresh()
99
100     def get_validity_time(self) -> Union[int, float]:
101         return self.__dbbackendlock.get_validity_time()
102
103
104 class SyncStorage(SyncStorageAbc):
105     """
106     This class implements Shared Data Layer (SDL) abstract 'SyncStorageAbc' class.
107
108     This class provides synchronous access to all the namespaces in SDL storage.
109     Data can be written, read and removed based on keys known to clients. Keys are unique within
110     a namespace, namespace identifier is passed as a parameter to all the operations.
111
112     Args:
113         fake_db_backend (str): Optional parameter. Parameter enables fake DB backend usage for an
114                                SDL instance. Fake DB backend is ONLY allowed to use for testing
115                                purposes at development phase of SDL clients when more advanced
116                                database services are not necessarily needed. Currently value 'dict'
117                                is only allowed value for the parameter, which enables dictionary
118                                type of fake DB backend.
119     """
120     def __init__(self, fake_db_backend=None) -> None:
121         super().__init__()
122         self.__configuration = _Configuration(fake_db_backend)
123         self.__dbbackend = ricsdl.backend.get_backend_instance(self.__configuration)
124
125     def __del__(self):
126         self.close()
127
128     def __str__(self):
129         return str(
130             {
131                 "configuration": str(self.__configuration),
132                 "backend": str(self.__dbbackend)
133             }
134         )
135
136     def is_active(self):
137         try:
138             return self.__dbbackend.is_connected()
139         except SdlException:
140             return False
141
142     def close(self):
143         self.__dbbackend.close()
144
145     @func_arg_checker(SdlTypeError, 1, ns=str, data_map=dict)
146     def set(self, ns: str, data_map: Dict[str, bytes]) -> None:
147         self._validate_key_value_dict(data_map)
148         self.__dbbackend.set(ns, data_map)
149
150     @func_arg_checker(SdlTypeError, 1, ns=str, key=str, old_data=bytes, new_data=bytes)
151     def set_if(self, ns: str, key: str, old_data: bytes, new_data: bytes) -> bool:
152         return self.__dbbackend.set_if(ns, key, old_data, new_data)
153
154     @func_arg_checker(SdlTypeError, 1, ns=str, key=str, data=bytes)
155     def set_if_not_exists(self, ns: str, key: str, data: bytes) -> bool:
156         return self.__dbbackend.set_if_not_exists(ns, key, data)
157
158     @func_arg_checker(SdlTypeError, 1, ns=str, keys=(str, builtins.set))
159     def get(self, ns: str, keys: Union[str, Set[str]]) -> Dict[str, bytes]:
160         disordered = self.__dbbackend.get(ns, list(keys))
161         return {k: disordered[k] for k in sorted(disordered)}
162
163     @func_arg_checker(SdlTypeError, 1, ns=str, key_pattern=str)
164     def find_keys(self, ns: str, key_pattern: str) -> List[str]:
165         return self.__dbbackend.find_keys(ns, key_pattern)
166
167     @func_arg_checker(SdlTypeError, 1, ns=str, key_pattern=str)
168     def find_and_get(self, ns: str, key_pattern: str) -> Dict[str, bytes]:
169         disordered = self.__dbbackend.find_and_get(ns, key_pattern)
170         return {k: disordered[k] for k in sorted(disordered)}
171
172     @func_arg_checker(SdlTypeError, 1, ns=str, keys=(str, builtins.set))
173     def remove(self, ns: str, keys: Union[str, Set[str]]) -> None:
174         self.__dbbackend.remove(ns, list(keys))
175
176     @func_arg_checker(SdlTypeError, 1, ns=str, key=str, data=bytes)
177     def remove_if(self, ns: str, key: str, data: bytes) -> bool:
178         return self.__dbbackend.remove_if(ns, key, data)
179
180     @func_arg_checker(SdlTypeError, 1, ns=str)
181     def remove_all(self, ns: str) -> None:
182         keys = self.__dbbackend.find_keys(ns, '*')
183         if keys:
184             self.__dbbackend.remove(ns, keys)
185
186     @func_arg_checker(SdlTypeError, 1, ns=str, group=str, members=(bytes, builtins.set))
187     def add_member(self, ns: str, group: str, members: Union[bytes, Set[bytes]]) -> None:
188         self.__dbbackend.add_member(ns, group, members)
189
190     @func_arg_checker(SdlTypeError, 1, ns=str, group=str, members=(bytes, builtins.set))
191     def remove_member(self, ns: str, group: str, members: Union[bytes, Set[bytes]]) -> None:
192         self.__dbbackend.remove_member(ns, group, members)
193
194     @func_arg_checker(SdlTypeError, 1, ns=str, group=str)
195     def remove_group(self, ns: str, group: str) -> None:
196         self.__dbbackend.remove_group(ns, group)
197
198     @func_arg_checker(SdlTypeError, 1, ns=str, group=str)
199     def get_members(self, ns: str, group: str) -> Set[bytes]:
200         return self.__dbbackend.get_members(ns, group)
201
202     @func_arg_checker(SdlTypeError, 1, ns=str, group=str, member=bytes)
203     def is_member(self, ns: str, group: str, member: bytes) -> bool:
204         return self.__dbbackend.is_member(ns, group, member)
205
206     @func_arg_checker(SdlTypeError, 1, ns=str, group=str)
207     def group_size(self, ns: str, group: str) -> int:
208         return self.__dbbackend.group_size(ns, group)
209
210     @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, data_map=dict)
211     def set_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
212                         data_map: Dict[str, bytes]) -> None:
213         self._validate_key_value_dict(data_map)
214         self._validate_channels_events(channels_and_events)
215         for channel, events in channels_and_events.items():
216             channels_and_events[channel] = [events] if isinstance(events, str) else events
217         self.__dbbackend.set_and_publish(ns, channels_and_events, data_map)
218
219     @func_arg_checker(SdlTypeError,
220                       1,
221                       ns=str,
222                       channels_and_events=dict,
223                       key=str,
224                       old_data=bytes,
225                       new_data=bytes)
226     def set_if_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
227                            key: str, old_data: bytes, new_data: bytes) -> bool:
228         self._validate_channels_events(channels_and_events)
229         for channel, events in channels_and_events.items():
230             channels_and_events[channel] = [events] if isinstance(events, str) else events
231         return self.__dbbackend.set_if_and_publish(ns, channels_and_events, key, old_data, new_data)
232
233     @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, key=str, data=bytes)
234     def set_if_not_exists_and_publish(self, ns: str,
235                                       channels_and_events: Dict[str, Union[str, List[str]]],
236                                       key: str, data: bytes) -> bool:
237         self._validate_channels_events(channels_and_events)
238         for channel, events in channels_and_events.items():
239             channels_and_events[channel] = [events] if isinstance(events, str) else events
240         return self.__dbbackend.set_if_not_exists_and_publish(ns, channels_and_events, key, data)
241
242     @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, keys=(str, builtins.set))
243     def remove_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
244                            keys: Union[str, Set[str]]) -> None:
245         self._validate_channels_events(channels_and_events)
246         for channel, events in channels_and_events.items():
247             channels_and_events[channel] = [events] if isinstance(events, str) else events
248         keys = [keys] if isinstance(keys, str) else list(keys)
249         self.__dbbackend.remove_and_publish(ns, channels_and_events, keys)
250
251     @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, key=str, data=bytes)
252     def remove_if_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
253                               key: str, data: bytes) -> bool:
254         self._validate_channels_events(channels_and_events)
255         for channel, events in channels_and_events.items():
256             channels_and_events[channel] = [events] if isinstance(events, str) else events
257         return self.__dbbackend.remove_if_and_publish(ns, channels_and_events, key, data)
258
259     @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict)
260     def remove_all_and_publish(self, ns: str,
261                                channels_and_events: Dict[str, Union[str, List[str]]]) -> None:
262         self._validate_channels_events(channels_and_events)
263         for channel, events in channels_and_events.items():
264             channels_and_events[channel] = [events] if isinstance(events, str) else events
265         self.__dbbackend.remove_all_and_publish(ns, channels_and_events)
266
267     @func_arg_checker(SdlTypeError, 1, ns=str, cb=Callable, channels=(str, builtins.set))
268     def subscribe_channel(self, ns: str, cb: Callable[[str, str], None],
269                           channels: Union[str, Set[str]]) -> None:
270         self._validate_callback(cb)
271         channels = [channels] if isinstance(channels, str) else list(channels)
272         self.__dbbackend.subscribe_channel(ns, cb, channels)
273
274     @func_arg_checker(SdlTypeError, 1, ns=str, channels=(str, builtins.set))
275     def unsubscribe_channel(self, ns: str, channels: Union[str, Set[str]]) -> None:
276         channels = [channels] if isinstance(channels, str) else list(channels)
277         self.__dbbackend.unsubscribe_channel(ns, channels)
278
279     def start_event_listener(self) -> None:
280         self.__dbbackend.start_event_listener()
281
282     def handle_events(self) -> Optional[Tuple[str, str]]:
283         return self.__dbbackend.handle_events()
284
285     @func_arg_checker(SdlTypeError, 1, ns=str, resource=str, expiration=(int, float))
286     def get_lock_resource(self, ns: str, resource: str, expiration: Union[int, float]) -> SyncLock:
287         return SyncLock(ns, resource, expiration, self)
288
289     def get_backend(self) -> DbBackendAbc:
290         """Return backend instance."""
291         return self.__dbbackend
292
293     def get_configuration(self) -> _Configuration:
294         """Return configuration what was valid when the SDL instance was initiated."""
295         return self.__configuration
296
297     @classmethod
298     def _validate_key_value_dict(cls, kv):
299         for k, v in kv.items():
300             if not isinstance(k, str):
301                 raise SdlTypeError(r"Wrong dict key type: {}={}. Must be: str".format(k, type(k)))
302             if not isinstance(v, bytes):
303                 raise SdlTypeError(r"Wrong dict value type: {}={}. Must be: bytes".format(v, type(v)))
304
305     @classmethod
306     def _validate_channels_events(cls, channels_and_events: Dict[Any, Any]):
307         for channel, events in channels_and_events.items():
308             if not isinstance(channel, str):
309                 raise SdlTypeError(r"Wrong channel type: {}={}. Must be: str".format(
310                     channel, type(channel)))
311             if not isinstance(events, (list, str)):
312                 raise SdlTypeError(r"Wrong event type: {}={}. Must be: str".format(
313                     events, type(events)))
314             if isinstance(events, list):
315                 for event in events:
316                     if not isinstance(event, str):
317                         raise SdlTypeError(r"Wrong event type: {}={}. Must be: str".format(
318                             events, type(events)))
319
320     @classmethod
321     def _validate_callback(cls, cb):
322         param_len = len(inspect.signature(cb).parameters)
323         if param_len != 2:
324             raise SdlTypeError(
325                 f"Callback function should take 2 positional argument but {param_len} were given")