H release step 2 of 2
[ric-plt/sdlpy.git] / ricsdl-package / ricsdl / backend / fake_dict_db.py
1 # Copyright (c) 2019 AT&T Intellectual Property.
2 # Copyright (c) 2018-2019 Nokia.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 #
17 # This source code is part of the near-RT RIC (RAN Intelligent Controller)
18 # platform project (RICP).
19 #
20
21
22 """The module provides fake implementation of Shared Data Layer (SDL) database backend interface."""
23 import fnmatch
24 from typing import (Callable, Dict, Set, List, Optional, Tuple, Union)
25 import queue
26 import threading
27 import time
28 from ricsdl.configuration import _Configuration
29 from .dbbackend_abc import DbBackendAbc
30 from .dbbackend_abc import DbBackendLockAbc
31
32
33 class FakeDictBackend(DbBackendAbc):
34     """
35     A class providing fake implementation of database backend of Shared Data Layer (SDL).
36     This class does not provide working database solution, this class can be used in testing
37     purposes only. Implementation does not provide shared database resource, SDL client sees
38     only its local local 'fake' database, which is a simple Python dictionary. Also keys are
39     stored in database under the same namespace.
40
41     Args:
42         configuration (_Configuration): SDL configuration, containing credentials to connect to
43                                         Redis database backend.
44     """
45     def __init__(self, configuration: _Configuration) -> None:
46         super().__init__()
47         self._db = {}
48         self._configuration = configuration
49         self._queue = queue.Queue(1)
50         self._channel_cbs = {}
51         self._listen_thread = threading.Thread(target=self._listen, daemon=True)
52         self._run_in_thread = False
53
54     def __str__(self):
55         return str(
56             {
57                 "DB type": "FAKE DB",
58             }
59         )
60
61     def is_connected(self):
62         return True
63
64     def close(self):
65         pass
66
67     def set(self, ns: str, data_map: Dict[str, bytes]) -> None:
68         self._db.update(data_map.copy())
69
70     def set_if(self, ns: str, key: str, old_data: bytes, new_data: bytes) -> bool:
71         if key not in self._db:
72             return False
73         db_data = self._db[key]
74         if db_data == old_data:
75             self._db[key] = new_data
76             return True
77         return False
78
79     def set_if_not_exists(self, ns: str, key: str, data: bytes) -> bool:
80         if key not in self._db:
81             self._db[key] = data
82             return True
83         return False
84
85     def get(self, ns: str, keys: List[str]) -> Dict[str, bytes]:
86         ret = {}
87         for k in keys:
88             if k in self._db:
89                 ret[k] = self._db[k]
90         return ret
91
92     def find_keys(self, ns: str, key_pattern: str) -> List[str]:
93         ret = []
94         for k in self._db:
95             if fnmatch.fnmatch(k, key_pattern):
96                 ret.append(k)
97         return ret
98
99     def find_and_get(self, ns: str, key_pattern: str) -> Dict[str, bytes]:
100         ret = {}
101         for key, val in self._db.items():
102             if fnmatch.fnmatch(key, key_pattern):
103                 ret[key] = val
104         return ret
105
106     def remove(self, ns: str, keys: List[str]) -> None:
107         for key in keys:
108             self._db.pop(key, None)
109
110     def remove_if(self, ns: str, key: str, data: bytes) -> bool:
111         if key in self._db:
112             db_data = self._db[key]
113             if db_data == data:
114                 self._db.pop(key)
115                 return True
116         return False
117
118     def add_member(self, ns: str, group: str, members: Set[bytes]) -> None:
119         if group in self._db:
120             self._db[group] = self._db[group] | members.copy()
121         else:
122             self._db[group] = members.copy()
123
124     def remove_member(self, ns: str, group: str, members: Set[bytes]) -> None:
125         if group not in self._db:
126             return
127         for member in members:
128             self._db[group].discard(member)
129
130     def remove_group(self, ns: str, group: str) -> None:
131         self._db.pop(group, None)
132
133     def get_members(self, ns: str, group: str) -> Set[bytes]:
134         return self._db.get(group, set())
135
136     def is_member(self, ns: str, group: str, member: bytes) -> bool:
137         if group not in self._db:
138             return False
139         if member in self._db[group]:
140             return True
141         return False
142
143     def group_size(self, ns: str, group: str) -> int:
144         if group not in self._db:
145             return 0
146         return len(self._db[group])
147
148     def set_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
149                         data_map: Dict[str, bytes]) -> None:
150         self._db.update(data_map.copy())
151         for channel, events in channels_and_events.items():
152             self._queue.put((channel, events))
153
154     def set_if_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]], key: str,
155                            old_data: bytes, new_data: bytes) -> bool:
156         if self.set_if(ns, key, old_data, new_data):
157             for channel, events in channels_and_events.items():
158                 self._queue.put((channel, events))
159             return True
160         return False
161
162     def set_if_not_exists_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
163                                       key: str, data: bytes) -> bool:
164         if self.set_if_not_exists(ns, key, data):
165             for channel, events in channels_and_events.items():
166                 self._queue.put((channel, events))
167             return True
168         return False
169
170     def remove_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
171                            keys: List[str]) -> None:
172         for key in keys:
173             self._db.pop(key, None)
174         for channel, events in channels_and_events.items():
175             self._queue.put((channel, events))
176
177     def remove_if_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]], key: str,
178                               data: bytes) -> bool:
179         if self.remove_if(ns, key, data):
180             for channel, events in channels_and_events.items():
181                 self._queue.put((channel, events))
182             return True
183         return False
184
185     def remove_all_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]]) -> None:
186         # Note: Since fake db has only one namespace, this deletes all keys
187         self._db.clear()
188         for channel, events in channels_and_events.items():
189             self._queue.put((channel, events))
190
191     def subscribe_channel(self, ns: str, cb: Callable[[str, List[str]], None],
192                           channels: List[str]) -> None:
193         for channel in channels:
194             self._channel_cbs[channel] = cb
195             if not self._listen_thread.is_alive() and self._run_in_thread:
196                 self._listen_thread.start()
197
198     def _listen(self):
199         while True:
200             message = self._queue.get()
201             cb = self._channel_cbs.get(message[0], None)
202             if cb:
203                 cb(message[0], message[1])
204             time.sleep(0.001)
205
206     def unsubscribe_channel(self, ns: str, channels: List[str]) -> None:
207         for channel in channels:
208             self._channel_cbs.pop(channel, None)
209
210     def start_event_listener(self) -> None:
211         if self._listen_thread.is_alive():
212             raise Exception("Event loop already started")
213         if len(self._channel_cbs) > 0:
214             self._listen_thread.start()
215         self._run_in_thread = True
216
217     def handle_events(self) -> Optional[Tuple[str, List[str]]]:
218         if self._listen_thread.is_alive() or self._run_in_thread:
219             raise Exception("Event loop already started")
220         try:
221             message = self._queue.get(block=False)
222         except queue.Empty:
223             return None
224         cb = self._channel_cbs.get(message[0], None)
225         if cb:
226             cb(message[0], message[1])
227         return (message[0], message[1])
228
229
230 class FakeDictBackendLock(DbBackendLockAbc):
231     """
232     A class providing fake implementation of database backend lock of Shared Data Layer (SDL).
233     This class does not provide working database solution, this class can be used in testing
234     purposes only. Implementation does not provide shared database resource, SDL client sees
235     only its local local 'fake' database, which is a simple Python dictionary. Also keys are
236     stored in database under the same namespace.
237     Args:
238         ns (str): Namespace under which this lock is targeted.
239         name (str): Lock name, identifies the lock key in a Redis database backend.
240         expiration (int, float): Lock expiration time after which the lock is removed if it hasn't
241                                  been released earlier by a 'release' method.
242         redis_backend (FakeBackend): Database backend object containing fake databese connection.
243     """
244
245     def __init__(self, ns: str, name: str, expiration: Union[int, float],
246                  redis_backend: FakeDictBackend) -> None:
247         super().__init__(ns, name)
248         self._locked = False
249         self._ns = ns
250         self._lock_name = name
251         self._lock_expiration = expiration
252         self.redis_backend = redis_backend
253
254     def __str__(self):
255         return str(
256             {
257                 "lock DB type": "FAKE DB",
258                 "lock namespace": self._ns,
259                 "lock name": self._lock_name,
260                 "lock status": self._lock_status_to_string()
261             }
262         )
263
264     def acquire(self, retry_interval: Union[int, float] = 0.1,
265                 retry_timeout: Union[int, float] = 10) -> bool:
266         if self._locked:
267             return False
268         self._locked = True
269         return self._locked
270
271     def release(self) -> None:
272         self._locked = False
273
274     def refresh(self) -> None:
275         pass
276
277     def get_validity_time(self) -> Union[int, float]:
278         return self._lock_expiration
279
280     def _lock_status_to_string(self) -> str:
281         if self._locked:
282             return 'locked'
283         return 'unlocked'