Read list type of environment variables
[ric-plt/sdlpy.git] / ricsdl-package / ricsdl / syncstorage.py
1 # Copyright (c) 2019 AT&T Intellectual Property.
2 # Copyright (c) 2018-2022 Nokia.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 #
17 # This source code is part of the near-RT RIC (RAN Intelligent Controller)
18 # platform project (RICP).
19 #
20
21 """The module provides implementation of the syncronous Shared Data Layer (SDL) interface."""
22 import builtins
23 import inspect
24 from typing import (Any, Callable, Dict, Set, List, Optional, Tuple, Union)
25 from ricsdl.configuration import _Configuration
26 from ricsdl.syncstorage_abc import (SyncStorageAbc, SyncLockAbc)
27 import ricsdl.backend
28 from ricsdl.backend.dbbackend_abc import DbBackendAbc
29 from ricsdl.exceptions import (SdlException, SdlTypeError)
30
31
32 def func_arg_checker(exception, start_arg_idx, **types):
33     """Decorator to validate function arguments."""
34     def _check(func):
35         if not __debug__:
36             return func
37
38         def _validate(*args, **kwds):
39             for idx, arg in enumerate(args[start_arg_idx:], start_arg_idx):
40                 if func.__code__.co_varnames[idx] in types and \
41                         not isinstance(arg, types[func.__code__.co_varnames[idx]]):
42                     raise exception(r"Wrong argument type: '{}'={}. Must be: {}".
43                                     format(func.__code__.co_varnames[idx], type(arg),
44                                            types[func.__code__.co_varnames[idx]]))
45             for kwdname, kwdval in kwds.items():
46                 if kwdname in types and not isinstance(kwdval, types[kwdname]):
47                     raise exception(r"Wrong argument type: '{}'={}. Must be: {}".
48                                     format(kwdname, type(kwdval), types[kwdname]))
49             return func(*args, **kwds)
50         _validate.__name__ = func.__name__
51         return _validate
52     return _check
53
54
55 class SyncLock(SyncLockAbc):
56     """
57     This class implements Shared Data Layer (SDL) abstract 'SyncLockAbc' class.
58
59     A lock instance is created per namespace and it is identified by its `name` within a namespace.
60
61     Args:
62         ns (str): Namespace under which this lock is targeted.
63         name (str): Lock name, identifies the lock key in SDL storage.
64         expiration (int, float): Lock expiration time after which the lock is removed if it hasn't
65                                  been released earlier by a 'release' method.
66         storage (SyncStorage): Database backend object containing connection to a database.
67     """
68     @func_arg_checker(SdlTypeError, 1, ns=str, name=str, expiration=(int, float))
69     def __init__(self, ns: str, name: str, expiration: Union[int, float],
70                  storage: 'SyncStorage') -> None:
71
72         super().__init__(ns, name, expiration)
73         self.__configuration = storage.get_configuration()
74         self.__dbbackendlock = ricsdl.backend.get_backend_lock_instance(self.__configuration,
75                                                                         ns, name, expiration,
76                                                                         storage.get_backend())
77
78     def __str__(self):
79         return str(
80             {
81                 "namespace": self._ns,
82                 "name": self._name,
83                 "expiration": self._expiration,
84                 "backend lock": str(self.__dbbackendlock)
85             }
86         )
87
88     @func_arg_checker(SdlTypeError, 1, retry_interval=(int, float),
89                       retry_timeout=(int, float))
90     def acquire(self, retry_interval: Union[int, float] = 0.1,
91                 retry_timeout: Union[int, float] = 10) -> bool:
92         return self.__dbbackendlock.acquire(retry_interval, retry_timeout)
93
94     def release(self) -> None:
95         self.__dbbackendlock.release()
96
97     def refresh(self) -> None:
98         self.__dbbackendlock.refresh()
99
100     def get_validity_time(self) -> Union[int, float]:
101         return self.__dbbackendlock.get_validity_time()
102
103
104 class SyncStorage(SyncStorageAbc):
105     """
106     This class implements Shared Data Layer (SDL) abstract 'SyncStorageAbc' class.
107
108     This class provides synchronous access to all the namespaces in SDL storage.
109     Data can be written, read and removed based on keys known to clients. Keys are unique within
110     a namespace, namespace identifier is passed as a parameter to all the operations.
111
112     Args:
113         fake_db_backend (str): Optional parameter. Parameter enables fake DB backend usage for an
114                                SDL instance. Fake DB backend is ONLY allowed to use for testing
115                                purposes at development phase of SDL clients when more advanced
116                                database services are not necessarily needed. Currently value 'dict'
117                                is only allowed value for the parameter, which enables dictionary
118                                type of fake DB backend.
119     """
120     def __init__(self, fake_db_backend=None) -> None:
121         super().__init__()
122         self.__dbbackend = None
123         self.__configuration = _Configuration(fake_db_backend)
124         self.event_separator = self.__configuration.get_event_separator()
125         self.__dbbackend = ricsdl.backend.get_backend_instance(self.__configuration)
126
127     def __del__(self):
128         self.close()
129
130     def __str__(self):
131         return str(
132             {
133                 "configuration": str(self.__configuration),
134                 "backend": str(self.__dbbackend)
135             }
136         )
137
138     def is_active(self):
139         try:
140             return self.__dbbackend.is_connected()
141         except SdlException:
142             return False
143
144     def close(self):
145         if self.__dbbackend:
146             self.__dbbackend.close()
147
148     @func_arg_checker(SdlTypeError, 1, ns=str, data_map=dict)
149     def set(self, ns: str, data_map: Dict[str, bytes]) -> None:
150         self._validate_key_value_dict(data_map)
151         self.__dbbackend.set(ns, data_map)
152
153     @func_arg_checker(SdlTypeError, 1, ns=str, key=str, old_data=bytes, new_data=bytes)
154     def set_if(self, ns: str, key: str, old_data: bytes, new_data: bytes) -> bool:
155         return self.__dbbackend.set_if(ns, key, old_data, new_data)
156
157     @func_arg_checker(SdlTypeError, 1, ns=str, key=str, data=bytes)
158     def set_if_not_exists(self, ns: str, key: str, data: bytes) -> bool:
159         return self.__dbbackend.set_if_not_exists(ns, key, data)
160
161     @func_arg_checker(SdlTypeError, 1, ns=str, keys=(str, builtins.set))
162     def get(self, ns: str, keys: Union[str, Set[str]]) -> Dict[str, bytes]:
163         disordered = self.__dbbackend.get(ns, list(keys))
164         return {k: disordered[k] for k in sorted(disordered)}
165
166     @func_arg_checker(SdlTypeError, 1, ns=str, key_pattern=str)
167     def find_keys(self, ns: str, key_pattern: str) -> List[str]:
168         return self.__dbbackend.find_keys(ns, key_pattern)
169
170     @func_arg_checker(SdlTypeError, 1, ns=str, key_pattern=str)
171     def find_and_get(self, ns: str, key_pattern: str) -> Dict[str, bytes]:
172         disordered = self.__dbbackend.find_and_get(ns, key_pattern)
173         return {k: disordered[k] for k in sorted(disordered)}
174
175     @func_arg_checker(SdlTypeError, 1, ns=str, keys=(str, builtins.set))
176     def remove(self, ns: str, keys: Union[str, Set[str]]) -> None:
177         self.__dbbackend.remove(ns, list(keys))
178
179     @func_arg_checker(SdlTypeError, 1, ns=str, key=str, data=bytes)
180     def remove_if(self, ns: str, key: str, data: bytes) -> bool:
181         return self.__dbbackend.remove_if(ns, key, data)
182
183     @func_arg_checker(SdlTypeError, 1, ns=str)
184     def remove_all(self, ns: str) -> None:
185         keys = self.__dbbackend.find_keys(ns, '*')
186         if keys:
187             self.__dbbackend.remove(ns, keys)
188
189     @func_arg_checker(SdlTypeError, 1, ns=str, group=str, members=(bytes, builtins.set))
190     def add_member(self, ns: str, group: str, members: Union[bytes, Set[bytes]]) -> None:
191         self.__dbbackend.add_member(ns, group, members)
192
193     @func_arg_checker(SdlTypeError, 1, ns=str, group=str, members=(bytes, builtins.set))
194     def remove_member(self, ns: str, group: str, members: Union[bytes, Set[bytes]]) -> None:
195         self.__dbbackend.remove_member(ns, group, members)
196
197     @func_arg_checker(SdlTypeError, 1, ns=str, group=str)
198     def remove_group(self, ns: str, group: str) -> None:
199         self.__dbbackend.remove_group(ns, group)
200
201     @func_arg_checker(SdlTypeError, 1, ns=str, group=str)
202     def get_members(self, ns: str, group: str) -> Set[bytes]:
203         return self.__dbbackend.get_members(ns, group)
204
205     @func_arg_checker(SdlTypeError, 1, ns=str, group=str, member=bytes)
206     def is_member(self, ns: str, group: str, member: bytes) -> bool:
207         return self.__dbbackend.is_member(ns, group, member)
208
209     @func_arg_checker(SdlTypeError, 1, ns=str, group=str)
210     def group_size(self, ns: str, group: str) -> int:
211         return self.__dbbackend.group_size(ns, group)
212
213     @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, data_map=dict)
214     def set_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
215                         data_map: Dict[str, bytes]) -> None:
216         self._validate_key_value_dict(data_map)
217         self._validate_channels_events(channels_and_events)
218         for channel, events in channels_and_events.items():
219             channels_and_events[channel] = [events] if isinstance(events, str) else events
220         self.__dbbackend.set_and_publish(ns, channels_and_events, data_map)
221
222     @func_arg_checker(SdlTypeError,
223                       1,
224                       ns=str,
225                       channels_and_events=dict,
226                       key=str,
227                       old_data=bytes,
228                       new_data=bytes)
229     def set_if_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
230                            key: str, old_data: bytes, new_data: bytes) -> bool:
231         self._validate_channels_events(channels_and_events)
232         for channel, events in channels_and_events.items():
233             channels_and_events[channel] = [events] if isinstance(events, str) else events
234         return self.__dbbackend.set_if_and_publish(ns, channels_and_events, key, old_data, new_data)
235
236     @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, key=str, data=bytes)
237     def set_if_not_exists_and_publish(self, ns: str,
238                                       channels_and_events: Dict[str, Union[str, List[str]]],
239                                       key: str, data: bytes) -> bool:
240         self._validate_channels_events(channels_and_events)
241         for channel, events in channels_and_events.items():
242             channels_and_events[channel] = [events] if isinstance(events, str) else events
243         return self.__dbbackend.set_if_not_exists_and_publish(ns, channels_and_events, key, data)
244
245     @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, keys=(str, builtins.set))
246     def remove_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
247                            keys: Union[str, Set[str]]) -> None:
248         self._validate_channels_events(channels_and_events)
249         for channel, events in channels_and_events.items():
250             channels_and_events[channel] = [events] if isinstance(events, str) else events
251         keys = [keys] if isinstance(keys, str) else list(keys)
252         self.__dbbackend.remove_and_publish(ns, channels_and_events, keys)
253
254     @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, key=str, data=bytes)
255     def remove_if_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
256                               key: str, data: bytes) -> bool:
257         self._validate_channels_events(channels_and_events)
258         for channel, events in channels_and_events.items():
259             channels_and_events[channel] = [events] if isinstance(events, str) else events
260         return self.__dbbackend.remove_if_and_publish(ns, channels_and_events, key, data)
261
262     @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict)
263     def remove_all_and_publish(self, ns: str,
264                                channels_and_events: Dict[str, Union[str, List[str]]]) -> None:
265         self._validate_channels_events(channels_and_events)
266         for channel, events in channels_and_events.items():
267             channels_and_events[channel] = [events] if isinstance(events, str) else events
268         self.__dbbackend.remove_all_and_publish(ns, channels_and_events)
269
270     @func_arg_checker(SdlTypeError, 1, ns=str, cb=Callable, channels=(str, builtins.set))
271     def subscribe_channel(self, ns: str, cb: Callable[[str, List[str]], None],
272                           channels: Union[str, Set[str]]) -> None:
273         self._validate_callback(cb)
274         channels = [channels] if isinstance(channels, str) else list(channels)
275         self.__dbbackend.subscribe_channel(ns, cb, channels)
276
277     @func_arg_checker(SdlTypeError, 1, ns=str, channels=(str, builtins.set))
278     def unsubscribe_channel(self, ns: str, channels: Union[str, Set[str]]) -> None:
279         channels = [channels] if isinstance(channels, str) else list(channels)
280         self.__dbbackend.unsubscribe_channel(ns, channels)
281
282     def start_event_listener(self) -> None:
283         self.__dbbackend.start_event_listener()
284
285     def handle_events(self) -> Optional[Tuple[str, List[str]]]:
286         return self.__dbbackend.handle_events()
287
288     @func_arg_checker(SdlTypeError, 1, ns=str, resource=str, expiration=(int, float))
289     def get_lock_resource(self, ns: str, resource: str, expiration: Union[int, float]) -> SyncLock:
290         return SyncLock(ns, resource, expiration, self)
291
292     def get_backend(self) -> DbBackendAbc:
293         """Return backend instance."""
294         return self.__dbbackend
295
296     def get_configuration(self) -> _Configuration:
297         """Return configuration what was valid when the SDL instance was initiated."""
298         return self.__configuration
299
300     @classmethod
301     def _validate_key_value_dict(cls, kv):
302         for k, v in kv.items():
303             if not isinstance(k, str):
304                 raise SdlTypeError(r"Wrong dict key type: {}={}. Must be: str".format(k, type(k)))
305             if not isinstance(v, bytes):
306                 raise SdlTypeError(r"Wrong dict value type: {}={}. Must be: bytes".format(v, type(v)))
307
308     def _validate_channels_events(self, channels_and_events: Dict[Any, Any]):
309         for channel, events in channels_and_events.items():
310             if not isinstance(channel, str):
311                 raise SdlTypeError(r"Wrong channel type: {}={}. Must be: str".format(
312                     channel, type(channel)))
313             if not isinstance(events, (list, str)):
314                 raise SdlTypeError(r"Wrong event type: {}={}. Must be: str".format(
315                     events, type(events)))
316             if isinstance(events, list):
317                 for event in events:
318                     if not isinstance(event, str):
319                         raise SdlTypeError(r"Wrong event type: {}={}. Must be: str".format(
320                             events, type(events)))
321                     if self.event_separator in event:
322                         raise SdlTypeError(r"Events {} contains illegal substring (\"{}\")".format(
323                             events, self.event_separator))
324             else:
325                 if self.event_separator in events:
326                     raise SdlTypeError(r"Events {} contains illegal substring (\"{}\")".format(
327                         events, self.event_separator))
328
329     @classmethod
330     def _validate_callback(cls, cb):
331         param_len = len(inspect.signature(cb).parameters)
332         if param_len != 2:
333             raise SdlTypeError(
334                 f"Callback function should take 2 positional argument but {param_len} were given")