Pack notifications to be compatible with SDL golang
[ric-plt/sdlpy.git] / ricsdl-package / ricsdl / backend / fake_dict_db.py
1 # Copyright (c) 2019 AT&T Intellectual Property.
2 # Copyright (c) 2018-2019 Nokia.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 #
17 # This source code is part of the near-RT RIC (RAN Intelligent Controller)
18 # platform project (RICP).
19 #
20
21
22 """The module provides fake implementation of Shared Data Layer (SDL) database backend interface."""
23 import fnmatch
24 from typing import (Callable, Dict, Set, List, Optional, Tuple, Union)
25 import queue
26 import threading
27 import time
28 from ricsdl.configuration import _Configuration
29 from .dbbackend_abc import DbBackendAbc
30 from .dbbackend_abc import DbBackendLockAbc
31
32
33 class FakeDictBackend(DbBackendAbc):
34     """
35     A class providing fake implementation of database backend of Shared Data Layer (SDL).
36     This class does not provide working database solution, this class can be used in testing
37     purposes only. Implementation does not provide shared database resource, SDL client sees
38     only its local local 'fake' database, which is a simple Python dictionary. Also keys are
39     stored in database under the same namespace.
40
41     Args:
42         configuration (_Configuration): SDL configuration, containing credentials to connect to
43                                         Redis database backend.
44     """
45     def __init__(self, configuration: _Configuration) -> None:
46         super().__init__()
47         self._db = {}
48         self._configuration = configuration
49         self._queue = queue.Queue(1)
50         self._channel_cbs = {}
51         self._listen_thread = threading.Thread(target=self._listen, daemon=True)
52         self._run_in_thread = False
53
54     def __str__(self):
55         return str(
56             {
57                 "DB type": "FAKE DB",
58             }
59         )
60
61     def is_connected(self):
62         return True
63
64     def close(self):
65         pass
66
67     def set(self, ns: str, data_map: Dict[str, bytes]) -> None:
68         self._db.update(data_map.copy())
69
70     def set_if(self, ns: str, key: str, old_data: bytes, new_data: bytes) -> bool:
71         if key not in self._db:
72             return False
73         db_data = self._db[key]
74         if db_data == old_data:
75             self._db[key] = new_data
76             return True
77         return False
78
79     def set_if_not_exists(self, ns: str, key: str, data: bytes) -> bool:
80         if key not in self._db:
81             self._db[key] = data
82             return True
83         return False
84
85     def get(self, ns: str, keys: List[str]) -> Dict[str, bytes]:
86         ret = {}
87         for k in keys:
88             if k in self._db:
89                 ret[k] = self._db[k]
90         return ret
91
92     def find_keys(self, ns: str, key_pattern: str) -> List[str]:
93         ret = []
94         for k in self._db:
95             if fnmatch.fnmatch(k, key_pattern):
96                 ret.append(k)
97         return ret
98
99     def find_and_get(self, ns: str, key_pattern: str) -> Dict[str, bytes]:
100         ret = {}
101         for key, val in self._db.items():
102             if fnmatch.fnmatch(key, key_pattern):
103                 ret[key] = val
104         return ret
105
106     def remove(self, ns: str, keys: List[str]) -> None:
107         for key in keys:
108             self._db.pop(key, None)
109
110     def remove_if(self, ns: str, key: str, data: bytes) -> bool:
111         if key in self._db:
112             db_data = self._db[key]
113             if db_data == data:
114                 self._db.pop(key)
115                 return True
116         return False
117
118     def add_member(self, ns: str, group: str, members: Set[bytes]) -> None:
119         if group in self._db:
120             self._db[group] = self._db[group] | members.copy()
121         else:
122             self._db[group] = members.copy()
123
124     def remove_member(self, ns: str, group: str, members: Set[bytes]) -> None:
125         if group not in self._db:
126             return
127         for member in members:
128             self._db[group].discard(member)
129
130     def remove_group(self, ns: str, group: str) -> None:
131         self._db.pop(group, None)
132
133     def get_members(self, ns: str, group: str) -> Set[bytes]:
134         return self._db.get(group, set())
135
136     def is_member(self, ns: str, group: str, member: bytes) -> bool:
137         if group not in self._db:
138             return False
139         if member in self._db[group]:
140             return True
141         return False
142
143     def group_size(self, ns: str, group: str) -> int:
144         if group not in self._db:
145             return 0
146         return len(self._db[group])
147
148     def set_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
149                         data_map: Dict[str, bytes]) -> None:
150         self._db.update(data_map.copy())
151         for channel, events in channels_and_events.items():
152             self._queue.put((channel, events))
153
154     def set_if_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]], key: str,
155                            old_data: bytes, new_data: bytes) -> bool:
156         if self.set_if(ns, key, old_data, new_data):
157             for channel, events in channels_and_events.items():
158                 self._queue.put((channel, events))
159             return True
160         return False
161
162     def set_if_not_exists_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
163                                       key: str, data: bytes) -> bool:
164         if self.set_if_not_exists(ns, key, data):
165             for channel, events in channels_and_events.items():
166                 self._queue.put((channel, events))
167             return True
168         return False
169
170     def remove_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
171                            keys: List[str]) -> None:
172         for key in keys:
173             self._db.pop(key, None)
174         for channel, events in channels_and_events.items():
175             self._queue.put((channel, events))
176
177     def remove_if_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]], key: str,
178                               data: bytes) -> bool:
179         if self.remove_if(ns, key, data):
180             for channel, events in channels_and_events.items():
181                 self._queue.put((channel, events))
182             return True
183         return False
184
185     def remove_all_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]]) -> None:
186         # Note: Since fake db has only one namespace, this deletes all keys
187         self._db.clear()
188         for channel, events in channels_and_events.items():
189             self._queue.put((channel, events))
190
191     def subscribe_channel(self, ns: str,
192                           cb: Union[Callable[[str, str], None], Callable[[str, List[str]], None]],
193                           channels: List[str]) -> None:
194         for channel in channels:
195             self._channel_cbs[channel] = cb
196             if not self._listen_thread.is_alive() and self._run_in_thread:
197                 self._listen_thread.start()
198
199     def _listen(self):
200         while True:
201             message = self._queue.get()
202             cb = self._channel_cbs.get(message[0], None)
203             if cb:
204                 cb(message[0], message[1][0] if (isinstance(message[1], list) and len(message[1]) == 1) else message[1])
205             time.sleep(0.001)
206
207     def unsubscribe_channel(self, ns: str, channels: List[str]) -> None:
208         for channel in channels:
209             self._channel_cbs.pop(channel, None)
210
211     def start_event_listener(self) -> None:
212         if self._listen_thread.is_alive():
213             raise Exception("Event loop already started")
214         if len(self._channel_cbs) > 0:
215             self._listen_thread.start()
216         self._run_in_thread = True
217
218     def handle_events(self) -> Optional[Union[Tuple[str, str], Tuple[str, List[str]]]]:
219         if self._listen_thread.is_alive() or self._run_in_thread:
220             raise Exception("Event loop already started")
221         try:
222             message = self._queue.get(block=False)
223         except queue.Empty:
224             return None
225         cb = self._channel_cbs.get(message[0], None)
226         notifications = message[1][0] if (isinstance(message[1], list) and len(message[1]) == 1) else message[1]
227         if cb:
228             cb(message[0], notifications)
229         return (message[0], notifications)
230
231
232 class FakeDictBackendLock(DbBackendLockAbc):
233     """
234     A class providing fake implementation of database backend lock of Shared Data Layer (SDL).
235     This class does not provide working database solution, this class can be used in testing
236     purposes only. Implementation does not provide shared database resource, SDL client sees
237     only its local local 'fake' database, which is a simple Python dictionary. Also keys are
238     stored in database under the same namespace.
239     Args:
240         ns (str): Namespace under which this lock is targeted.
241         name (str): Lock name, identifies the lock key in a Redis database backend.
242         expiration (int, float): Lock expiration time after which the lock is removed if it hasn't
243                                  been released earlier by a 'release' method.
244         redis_backend (FakeBackend): Database backend object containing fake databese connection.
245     """
246
247     def __init__(self, ns: str, name: str, expiration: Union[int, float],
248                  redis_backend: FakeDictBackend) -> None:
249         super().__init__(ns, name)
250         self._locked = False
251         self._ns = ns
252         self._lock_name = name
253         self._lock_expiration = expiration
254         self.redis_backend = redis_backend
255
256     def __str__(self):
257         return str(
258             {
259                 "lock DB type": "FAKE DB",
260                 "lock namespace": self._ns,
261                 "lock name": self._lock_name,
262                 "lock status": self._lock_status_to_string()
263             }
264         )
265
266     def acquire(self, retry_interval: Union[int, float] = 0.1,
267                 retry_timeout: Union[int, float] = 10) -> bool:
268         if self._locked:
269             return False
270         self._locked = True
271         return self._locked
272
273     def release(self) -> None:
274         self._locked = False
275
276     def refresh(self) -> None:
277         pass
278
279     def get_validity_time(self) -> Union[int, float]:
280         return self._lock_expiration
281
282     def _lock_status_to_string(self) -> str:
283         if self._locked:
284             return 'locked'
285         return 'unlocked'