Add support for notifications
[ric-plt/sdlpy.git] / ricsdl-package / ricsdl / backend / fake_dict_db.py
1 # Copyright (c) 2019 AT&T Intellectual Property.
2 # Copyright (c) 2018-2019 Nokia.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 #
17 # This source code is part of the near-RT RIC (RAN Intelligent Controller)
18 # platform project (RICP).
19 #
20
21
22 """The module provides fake implementation of Shared Data Layer (SDL) database backend interface."""
23 import fnmatch
24 from typing import (Callable, Dict, Set, List, Optional, Tuple, Union)
25 import queue
26 import threading
27 import time
28 from ricsdl.configuration import _Configuration
29 from .dbbackend_abc import DbBackendAbc
30 from .dbbackend_abc import DbBackendLockAbc
31
32
33 class FakeDictBackend(DbBackendAbc):
34     """
35     A class providing fake implementation of database backend of Shared Data Layer (SDL).
36     This class does not provide working database solution, this class can be used in testing
37     purposes only. Implementation does not provide shared database resource, SDL client sees
38     only its local local 'fake' database, which is a simple Python dictionary. Also keys are
39     stored in database under the same namespace.
40
41     Args:
42         configuration (_Configuration): SDL configuration, containing credentials to connect to
43                                         Redis database backend.
44     """
45     def __init__(self, configuration: _Configuration) -> None:
46         super().__init__()
47         self._db = {}
48         self._configuration = configuration
49         self._queue = queue.Queue(1)
50         self._channel_cbs = {}
51         self._listen_thread = threading.Thread(target=self._listen, daemon=True)
52         self._run_in_thread = False
53
54     def __str__(self):
55         return str(
56             {
57                 "DB type": "FAKE DB",
58             }
59         )
60
61     def is_connected(self):
62         return True
63
64     def close(self):
65         pass
66
67     def set(self, ns: str, data_map: Dict[str, bytes]) -> None:
68         self._db.update(data_map.copy())
69
70     def set_if(self, ns: str, key: str, old_data: bytes, new_data: bytes) -> bool:
71         if key not in self._db:
72             return False
73         db_data = self._db[key]
74         if db_data == old_data:
75             self._db[key] = new_data
76             return True
77         return False
78
79     def set_if_not_exists(self, ns: str, key: str, data: bytes) -> bool:
80         if key not in self._db:
81             self._db[key] = data
82             return True
83         return False
84
85     def get(self, ns: str, keys: List[str]) -> Dict[str, bytes]:
86         ret = {}
87         for k in keys:
88             if k in self._db:
89                 ret[k] = self._db[k]
90         return ret
91
92     def find_keys(self, ns: str, key_pattern: str) -> List[str]:
93         ret = []
94         for k in self._db:
95             if fnmatch.fnmatch(k, key_pattern):
96                 ret.append(k)
97         return ret
98
99     def find_and_get(self, ns: str, key_pattern: str) -> Dict[str, bytes]:
100         ret = {}
101         for key, val in self._db.items():
102             if fnmatch.fnmatch(key, key_pattern):
103                 ret[key] = val
104         return ret
105
106     def remove(self, ns: str, keys: List[str]) -> None:
107         for key in keys:
108             self._db.pop(key, None)
109
110     def remove_if(self, ns: str, key: str, data: bytes) -> bool:
111         if key in self._db:
112             db_data = self._db[key]
113             if db_data == data:
114                 self._db.pop(key)
115                 return True
116         return False
117
118     def add_member(self, ns: str, group: str, members: Set[bytes]) -> None:
119         if group in self._db:
120             self._db[group] = self._db[group] | members.copy()
121         else:
122             self._db[group] = members.copy()
123
124     def remove_member(self, ns: str, group: str, members: Set[bytes]) -> None:
125         if group not in self._db:
126             return
127         for member in members:
128             self._db[group].discard(member)
129
130     def remove_group(self, ns: str, group: str) -> None:
131         self._db.pop(group, None)
132
133     def get_members(self, ns: str, group: str) -> Set[bytes]:
134         return self._db.get(group, set())
135
136     def is_member(self, ns: str, group: str, member: bytes) -> bool:
137         if group not in self._db:
138             return False
139         if member in self._db[group]:
140             return True
141         return False
142
143     def group_size(self, ns: str, group: str) -> int:
144         if group not in self._db:
145             return 0
146         return len(self._db[group])
147
148     def set_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
149                         data_map: Dict[str, bytes]) -> None:
150         self._db.update(data_map.copy())
151         for channel, events in channels_and_events.items():
152             for event in events:
153                 self._queue.put((channel, event))
154
155     def set_if_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]], key: str,
156                            old_data: bytes, new_data: bytes) -> bool:
157         if self.set_if(ns, key, old_data, new_data):
158             for channel, events in channels_and_events.items():
159                 for event in events:
160                     self._queue.put((channel, event))
161             return True
162         return False
163
164     def set_if_not_exists_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
165                                       key: str, data: bytes) -> bool:
166         if self.set_if_not_exists(ns, key, data):
167             for channel, events in channels_and_events.items():
168                 for event in events:
169                     self._queue.put((channel, event))
170             return True
171         return False
172
173     def remove_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
174                            keys: List[str]) -> None:
175         for key in keys:
176             self._db.pop(key, None)
177         for channel, events in channels_and_events.items():
178             for event in events:
179                 self._queue.put((channel, event))
180
181     def remove_if_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]], key: str,
182                               data: bytes) -> bool:
183         if self.remove_if(ns, key, data):
184             for channel, events in channels_and_events.items():
185                 for event in events:
186                     self._queue.put((channel, event))
187             return True
188         return False
189
190     def remove_all_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]]) -> None:
191         # Note: Since fake db has only one namespace, this deletes all keys
192         self._db.clear()
193         for channel, events in channels_and_events.items():
194             for event in events:
195                 self._queue.put((channel, event))
196
197     def subscribe_channel(self, ns: str, cb: Callable[[str, str], None],
198                           channels: List[str]) -> None:
199         for channel in channels:
200             self._channel_cbs[channel] = cb
201             if not self._listen_thread.is_alive() and self._run_in_thread:
202                 self._listen_thread.start()
203
204     def _listen(self):
205         while True:
206             message = self._queue.get()
207             cb = self._channel_cbs.get(message[0], None)
208             if cb:
209                 cb(message[0], message[1])
210             time.sleep(0.001)
211
212     def unsubscribe_channel(self, ns: str, channels: List[str]) -> None:
213         for channel in channels:
214             self._channel_cbs.pop(channel, None)
215
216     def start_event_listener(self) -> None:
217         if self._listen_thread.is_alive():
218             raise Exception("Event loop already started")
219         if len(self._channel_cbs) > 0:
220             self._listen_thread.start()
221         self._run_in_thread = True
222
223     def handle_events(self) -> Optional[Tuple[str, str]]:
224         if self._listen_thread.is_alive() or self._run_in_thread:
225             raise Exception("Event loop already started")
226         try:
227             message = self._queue.get(block=False)
228         except queue.Empty:
229             return None
230         cb = self._channel_cbs.get(message[0], None)
231         if cb:
232             cb(message[0], message[1])
233         return (message[0], message[1])
234
235
236 class FakeDictBackendLock(DbBackendLockAbc):
237     """
238     A class providing fake implementation of database backend lock of Shared Data Layer (SDL).
239     This class does not provide working database solution, this class can be used in testing
240     purposes only. Implementation does not provide shared database resource, SDL client sees
241     only its local local 'fake' database, which is a simple Python dictionary. Also keys are
242     stored in database under the same namespace.
243     Args:
244         ns (str): Namespace under which this lock is targeted.
245         name (str): Lock name, identifies the lock key in a Redis database backend.
246         expiration (int, float): Lock expiration time after which the lock is removed if it hasn't
247                                  been released earlier by a 'release' method.
248         redis_backend (FakeBackend): Database backend object containing fake databese connection.
249     """
250
251     def __init__(self, ns: str, name: str, expiration: Union[int, float],
252                  redis_backend: FakeDictBackend) -> None:
253         super().__init__(ns, name)
254         self._locked = False
255         self._ns = ns
256         self._lock_name = name
257         self._lock_expiration = expiration
258         self.redis_backend = redis_backend
259
260     def __str__(self):
261         return str(
262             {
263                 "lock DB type": "FAKE DB",
264                 "lock namespace": self._ns,
265                 "lock name": self._lock_name,
266                 "lock status": self._lock_status_to_string()
267             }
268         )
269
270     def acquire(self, retry_interval: Union[int, float] = 0.1,
271                 retry_timeout: Union[int, float] = 10) -> bool:
272         if self._locked:
273             return False
274         self._locked = True
275         return self._locked
276
277     def release(self) -> None:
278         self._locked = False
279
280     def refresh(self) -> None:
281         pass
282
283     def get_validity_time(self) -> Union[int, float]:
284         return self._lock_expiration
285
286     def _lock_status_to_string(self) -> str:
287         if self._locked:
288             return 'locked'
289         return 'unlocked'