1 # Copyright (c) 2019 AT&T Intellectual Property.
2 # Copyright (c) 2018-2019 Nokia.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 # This source code is part of the near-RT RIC (RAN Intelligent Controller)
18 # platform project (RICP).
22 """The module provides fake implementation of Shared Data Layer (SDL) database backend interface."""
24 from typing import (Callable, Dict, Set, List, Optional, Tuple, Union)
28 from ricsdl.configuration import _Configuration
29 from .dbbackend_abc import DbBackendAbc
30 from .dbbackend_abc import DbBackendLockAbc
33 class FakeDictBackend(DbBackendAbc):
35 A class providing fake implementation of database backend of Shared Data Layer (SDL).
36 This class does not provide working database solution, this class can be used in testing
37 purposes only. Implementation does not provide shared database resource, SDL client sees
38 only its local local 'fake' database, which is a simple Python dictionary. Also keys are
39 stored in database under the same namespace.
42 configuration (_Configuration): SDL configuration, containing credentials to connect to
43 Redis database backend.
45 def __init__(self, configuration: _Configuration) -> None:
48 self._configuration = configuration
49 self._queue = queue.Queue(1)
50 self._channel_cbs = {}
51 self._listen_thread = threading.Thread(target=self._listen, daemon=True)
52 self._run_in_thread = False
61 def is_connected(self):
67 def set(self, ns: str, data_map: Dict[str, bytes]) -> None:
68 self._db.update(data_map.copy())
70 def set_if(self, ns: str, key: str, old_data: bytes, new_data: bytes) -> bool:
71 if key not in self._db:
73 db_data = self._db[key]
74 if db_data == old_data:
75 self._db[key] = new_data
79 def set_if_not_exists(self, ns: str, key: str, data: bytes) -> bool:
80 if key not in self._db:
85 def get(self, ns: str, keys: List[str]) -> Dict[str, bytes]:
92 def find_keys(self, ns: str, key_pattern: str) -> List[str]:
95 if fnmatch.fnmatch(k, key_pattern):
99 def find_and_get(self, ns: str, key_pattern: str) -> Dict[str, bytes]:
101 for key, val in self._db.items():
102 if fnmatch.fnmatch(key, key_pattern):
106 def remove(self, ns: str, keys: List[str]) -> None:
108 self._db.pop(key, None)
110 def remove_if(self, ns: str, key: str, data: bytes) -> bool:
112 db_data = self._db[key]
118 def add_member(self, ns: str, group: str, members: Set[bytes]) -> None:
119 if group in self._db:
120 self._db[group] = self._db[group] | members.copy()
122 self._db[group] = members.copy()
124 def remove_member(self, ns: str, group: str, members: Set[bytes]) -> None:
125 if group not in self._db:
127 for member in members:
128 self._db[group].discard(member)
130 def remove_group(self, ns: str, group: str) -> None:
131 self._db.pop(group, None)
133 def get_members(self, ns: str, group: str) -> Set[bytes]:
134 return self._db.get(group, set())
136 def is_member(self, ns: str, group: str, member: bytes) -> bool:
137 if group not in self._db:
139 if member in self._db[group]:
143 def group_size(self, ns: str, group: str) -> int:
144 if group not in self._db:
146 return len(self._db[group])
148 def set_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
149 data_map: Dict[str, bytes]) -> None:
150 self._db.update(data_map.copy())
151 for channel, events in channels_and_events.items():
153 self._queue.put((channel, event))
155 def set_if_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]], key: str,
156 old_data: bytes, new_data: bytes) -> bool:
157 if self.set_if(ns, key, old_data, new_data):
158 for channel, events in channels_and_events.items():
160 self._queue.put((channel, event))
164 def set_if_not_exists_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
165 key: str, data: bytes) -> bool:
166 if self.set_if_not_exists(ns, key, data):
167 for channel, events in channels_and_events.items():
169 self._queue.put((channel, event))
173 def remove_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
174 keys: List[str]) -> None:
176 self._db.pop(key, None)
177 for channel, events in channels_and_events.items():
179 self._queue.put((channel, event))
181 def remove_if_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]], key: str,
182 data: bytes) -> bool:
183 if self.remove_if(ns, key, data):
184 for channel, events in channels_and_events.items():
186 self._queue.put((channel, event))
190 def remove_all_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]]) -> None:
191 # Note: Since fake db has only one namespace, this deletes all keys
193 for channel, events in channels_and_events.items():
195 self._queue.put((channel, event))
197 def subscribe_channel(self, ns: str, cb: Callable[[str, str], None],
198 channels: List[str]) -> None:
199 for channel in channels:
200 self._channel_cbs[channel] = cb
201 if not self._listen_thread.is_alive() and self._run_in_thread:
202 self._listen_thread.start()
206 message = self._queue.get()
207 cb = self._channel_cbs.get(message[0], None)
209 cb(message[0], message[1])
212 def unsubscribe_channel(self, ns: str, channels: List[str]) -> None:
213 for channel in channels:
214 self._channel_cbs.pop(channel, None)
216 def start_event_listener(self) -> None:
217 if self._listen_thread.is_alive():
218 raise Exception("Event loop already started")
219 if len(self._channel_cbs) > 0:
220 self._listen_thread.start()
221 self._run_in_thread = True
223 def handle_events(self) -> Optional[Tuple[str, str]]:
224 if self._listen_thread.is_alive() or self._run_in_thread:
225 raise Exception("Event loop already started")
227 message = self._queue.get(block=False)
230 cb = self._channel_cbs.get(message[0], None)
232 cb(message[0], message[1])
233 return (message[0], message[1])
236 class FakeDictBackendLock(DbBackendLockAbc):
238 A class providing fake implementation of database backend lock of Shared Data Layer (SDL).
239 This class does not provide working database solution, this class can be used in testing
240 purposes only. Implementation does not provide shared database resource, SDL client sees
241 only its local local 'fake' database, which is a simple Python dictionary. Also keys are
242 stored in database under the same namespace.
244 ns (str): Namespace under which this lock is targeted.
245 name (str): Lock name, identifies the lock key in a Redis database backend.
246 expiration (int, float): Lock expiration time after which the lock is removed if it hasn't
247 been released earlier by a 'release' method.
248 redis_backend (FakeBackend): Database backend object containing fake databese connection.
251 def __init__(self, ns: str, name: str, expiration: Union[int, float],
252 redis_backend: FakeDictBackend) -> None:
253 super().__init__(ns, name)
256 self._lock_name = name
257 self._lock_expiration = expiration
258 self.redis_backend = redis_backend
263 "lock DB type": "FAKE DB",
264 "lock namespace": self._ns,
265 "lock name": self._lock_name,
266 "lock status": self._lock_status_to_string()
270 def acquire(self, retry_interval: Union[int, float] = 0.1,
271 retry_timeout: Union[int, float] = 10) -> bool:
277 def release(self) -> None:
280 def refresh(self) -> None:
283 def get_validity_time(self) -> Union[int, float]:
284 return self._lock_expiration
286 def _lock_status_to_string(self) -> str: