92bb88a6f93bdcb3aab48d86c3fb1c256d503751
[ric-plt/sdlpy.git] / ricsdl-package / ricsdl / syncstorage.py
1 # Copyright (c) 2019 AT&T Intellectual Property.
2 # Copyright (c) 2018-2019 Nokia.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 #
17 # This source code is part of the near-RT RIC (RAN Intelligent Controller)
18 # platform project (RICP).
19 #
20
21 """The module provides implementation of the syncronous Shared Data Layer (SDL) interface."""
22 import builtins
23 from typing import (Dict, Set, List, Union)
24 from ricsdl.configuration import _Configuration
25 from ricsdl.syncstorage_abc import (SyncStorageAbc, SyncLockAbc)
26 import ricsdl.backend
27 from ricsdl.backend.dbbackend_abc import DbBackendAbc
28 from ricsdl.exceptions import SdlTypeError
29
30
31 def func_arg_checker(exception, start_arg_idx, **types):
32     """Decorator to validate function arguments."""
33     def _check(func):
34         if not __debug__:
35             return func
36
37         def _validate(*args, **kwds):
38             for idx, arg in enumerate(args[start_arg_idx:], start_arg_idx):
39                 if func.__code__.co_varnames[idx] in types and \
40                         not isinstance(arg, types[func.__code__.co_varnames[idx]]):
41                     raise exception(r"Wrong argument type: '{}'={}. Must be: {}".
42                                     format(func.__code__.co_varnames[idx], type(arg),
43                                            types[func.__code__.co_varnames[idx]]))
44             for kwdname, kwdval in kwds.items():
45                 if kwdname in types and not isinstance(kwdval, types[kwdname]):
46                     raise exception(r"Wrong argument type: '{}'={}. Must be: {}".
47                                     format(kwdname, type(kwdval), types[kwdname]))
48             return func(*args, **kwds)
49         _validate.__name__ = func.__name__
50         return _validate
51     return _check
52
53
54 class SyncLock(SyncLockAbc):
55     """
56     This class implements Shared Data Layer (SDL) abstract 'SyncLockAbc' class.
57
58     A lock instance is created per namespace and it is identified by its `name` within a namespace.
59
60     Args:
61         ns (str): Namespace under which this lock is targeted.
62         name (str): Lock name, identifies the lock key in SDL storage.
63         expiration (int, float): Lock expiration time after which the lock is removed if it hasn't
64                                  been released earlier by a 'release' method.
65         storage (SyncStorage): Database backend object containing connection to a database.
66     """
67     @func_arg_checker(SdlTypeError, 1, ns=str, name=str, expiration=(int, float))
68     def __init__(self, ns: str, name: str, expiration: Union[int, float],
69                  storage: 'SyncStorage') -> None:
70
71         super().__init__(ns, name, expiration)
72         self.__dbbackendlock = ricsdl.backend.get_backend_lock_instance(ns, name, expiration,
73                                                                         storage.get_backend())
74
75     def __str__(self):
76         return str(
77             {
78                 "namespace": self._ns,
79                 "name": self._name,
80                 "expiration": self._expiration,
81                 "backend lock": str(self.__dbbackendlock)
82             }
83         )
84
85     @func_arg_checker(SdlTypeError, 1, retry_interval=(int, float),
86                       retry_timeout=(int, float))
87     def acquire(self, retry_interval: Union[int, float] = 0.1,
88                 retry_timeout: Union[int, float] = 10) -> bool:
89         return self.__dbbackendlock.acquire(retry_interval, retry_timeout)
90
91     def release(self) -> None:
92         self.__dbbackendlock.release()
93
94     def refresh(self) -> None:
95         self.__dbbackendlock.refresh()
96
97     def get_validity_time(self) -> Union[int, float]:
98         return self.__dbbackendlock.get_validity_time()
99
100
101 class SyncStorage(SyncStorageAbc):
102     """
103     This class implements Shared Data Layer (SDL) abstract 'SyncStorageAbc' class.
104
105     This class provides synchronous access to all the namespaces in SDL storage.
106     Data can be written, read and removed based on keys known to clients. Keys are unique within
107     a namespace, namespace identifier is passed as a parameter to all the operations.
108
109     Args:
110         None
111     """
112     def __init__(self) -> None:
113         super().__init__()
114         self.__configuration = _Configuration()
115         self.__dbbackend = ricsdl.backend.get_backend_instance(self.__configuration)
116
117     def __del__(self):
118         self.close()
119
120     def __str__(self):
121         return str(
122             {
123                 "configuration": str(self.__configuration),
124                 "backend": str(self.__dbbackend)
125             }
126         )
127
128     def close(self):
129         self.__dbbackend.close()
130
131     @func_arg_checker(SdlTypeError, 1, ns=str, data_map=dict)
132     def set(self, ns: str, data_map: Dict[str, bytes]) -> None:
133         self.__dbbackend.set(ns, data_map)
134
135     @func_arg_checker(SdlTypeError, 1, ns=str, key=str, old_data=bytes, new_data=bytes)
136     def set_if(self, ns: str, key: str, old_data: bytes, new_data: bytes) -> bool:
137         return self.__dbbackend.set_if(ns, key, old_data, new_data)
138
139     @func_arg_checker(SdlTypeError, 1, ns=str, key=str, data=bytes)
140     def set_if_not_exists(self, ns: str, key: str, data: bytes) -> bool:
141         return self.__dbbackend.set_if_not_exists(ns, key, data)
142
143     @func_arg_checker(SdlTypeError, 1, ns=str, keys=(str, builtins.set))
144     def get(self, ns: str, keys: Union[str, Set[str]]) -> Dict[str, bytes]:
145         return self.__dbbackend.get(ns, list(keys))
146
147     @func_arg_checker(SdlTypeError, 1, ns=str, key_prefix=str)
148     def find_keys(self, ns: str, key_prefix: str) -> List[str]:
149         return self.__dbbackend.find_keys(ns, key_prefix)
150
151     @func_arg_checker(SdlTypeError, 1, ns=str, key_prefix=str, atomic=bool)
152     def find_and_get(self, ns: str, key_prefix: str, atomic: bool) -> Dict[str, bytes]:
153         return self.__dbbackend.find_and_get(ns, key_prefix, atomic)
154
155     @func_arg_checker(SdlTypeError, 1, ns=str, keys=(str, builtins.set))
156     def remove(self, ns: str, keys: Union[str, Set[str]]) -> None:
157         self.__dbbackend.remove(ns, list(keys))
158
159     @func_arg_checker(SdlTypeError, 1, ns=str, key=str, data=bytes)
160     def remove_if(self, ns: str, key: str, data: bytes) -> bool:
161         return self.__dbbackend.remove_if(ns, key, data)
162
163     @func_arg_checker(SdlTypeError, 1, ns=str)
164     def remove_all(self, ns: str) -> None:
165         keys = self.__dbbackend.find_keys(ns, '')
166         if keys:
167             self.__dbbackend.remove(ns, keys)
168
169     @func_arg_checker(SdlTypeError, 1, ns=str, group=str, members=(bytes, builtins.set))
170     def add_member(self, ns: str, group: str, members: Union[bytes, Set[bytes]]) -> None:
171         self.__dbbackend.add_member(ns, group, members)
172
173     @func_arg_checker(SdlTypeError, 1, ns=str, group=str, members=(bytes, builtins.set))
174     def remove_member(self, ns: str, group: str, members: Union[bytes, Set[bytes]]) -> None:
175         self.__dbbackend.remove_member(ns, group, members)
176
177     @func_arg_checker(SdlTypeError, 1, ns=str, group=str)
178     def remove_group(self, ns: str, group: str) -> None:
179         self.__dbbackend.remove_group(ns, group)
180
181     @func_arg_checker(SdlTypeError, 1, ns=str, group=str)
182     def get_members(self, ns: str, group: str) -> Set[bytes]:
183         return self.__dbbackend.get_members(ns, group)
184
185     @func_arg_checker(SdlTypeError, 1, ns=str, group=str, member=bytes)
186     def is_member(self, ns: str, group: str, member: bytes) -> bool:
187         return self.__dbbackend.is_member(ns, group, member)
188
189     @func_arg_checker(SdlTypeError, 1, ns=str, group=str)
190     def group_size(self, ns: str, group: str) -> int:
191         return self.__dbbackend.group_size(ns, group)
192
193     @func_arg_checker(SdlTypeError, 1, ns=str, resource=str, expiration=(int, float))
194     def get_lock_resource(self, ns: str, resource: str, expiration: Union[int, float]) -> SyncLock:
195         return SyncLock(ns, resource, expiration, self)
196
197     def get_backend(self) -> DbBackendAbc:
198         """Return backend instance."""
199         return self.__dbbackend