1 # Copyright (c) 2019 AT&T Intellectual Property.
2 # Copyright (c) 2018-2019 Nokia.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 # This source code is part of the near-RT RIC (RAN Intelligent Controller)
18 # platform project (RICP).
22 """The module provides fake implementation of Shared Data Layer (SDL) database backend interface."""
24 from typing import (Callable, Dict, Set, List, Optional, Tuple, Union)
28 from ricsdl.configuration import _Configuration
29 from .dbbackend_abc import DbBackendAbc
30 from .dbbackend_abc import DbBackendLockAbc
33 class FakeDictBackend(DbBackendAbc):
35 A class providing fake implementation of database backend of Shared Data Layer (SDL).
36 This class does not provide working database solution, this class can be used in testing
37 purposes only. Implementation does not provide shared database resource, SDL client sees
38 only its local local 'fake' database, which is a simple Python dictionary. Also keys are
39 stored in database under the same namespace.
42 configuration (_Configuration): SDL configuration, containing credentials to connect to
43 Redis database backend.
45 def __init__(self, configuration: _Configuration) -> None:
48 self._configuration = configuration
49 self._queue = queue.Queue(1)
50 self._channel_cbs = {}
51 self._listen_thread = threading.Thread(target=self._listen, daemon=True)
52 self._run_in_thread = False
61 def is_connected(self):
67 def set(self, ns: str, data_map: Dict[str, bytes]) -> None:
68 self._db.update(data_map.copy())
70 def set_if(self, ns: str, key: str, old_data: bytes, new_data: bytes) -> bool:
71 if key not in self._db:
73 db_data = self._db[key]
74 if db_data == old_data:
75 self._db[key] = new_data
79 def set_if_not_exists(self, ns: str, key: str, data: bytes) -> bool:
80 if key not in self._db:
85 def get(self, ns: str, keys: List[str]) -> Dict[str, bytes]:
92 def find_keys(self, ns: str, key_pattern: str) -> List[str]:
95 if fnmatch.fnmatch(k, key_pattern):
99 def find_and_get(self, ns: str, key_pattern: str) -> Dict[str, bytes]:
101 for key, val in self._db.items():
102 if fnmatch.fnmatch(key, key_pattern):
106 def remove(self, ns: str, keys: List[str]) -> None:
108 self._db.pop(key, None)
110 def remove_if(self, ns: str, key: str, data: bytes) -> bool:
112 db_data = self._db[key]
118 def add_member(self, ns: str, group: str, members: Set[bytes]) -> None:
119 if group in self._db:
120 self._db[group] = self._db[group] | members.copy()
122 self._db[group] = members.copy()
124 def remove_member(self, ns: str, group: str, members: Set[bytes]) -> None:
125 if group not in self._db:
127 for member in members:
128 self._db[group].discard(member)
130 def remove_group(self, ns: str, group: str) -> None:
131 self._db.pop(group, None)
133 def get_members(self, ns: str, group: str) -> Set[bytes]:
134 return self._db.get(group, set())
136 def is_member(self, ns: str, group: str, member: bytes) -> bool:
137 if group not in self._db:
139 if member in self._db[group]:
143 def group_size(self, ns: str, group: str) -> int:
144 if group not in self._db:
146 return len(self._db[group])
148 def set_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
149 data_map: Dict[str, bytes]) -> None:
150 self._db.update(data_map.copy())
151 for channel, events in channels_and_events.items():
152 self._queue.put((channel, events))
154 def set_if_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]], key: str,
155 old_data: bytes, new_data: bytes) -> bool:
156 if self.set_if(ns, key, old_data, new_data):
157 for channel, events in channels_and_events.items():
158 self._queue.put((channel, events))
162 def set_if_not_exists_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
163 key: str, data: bytes) -> bool:
164 if self.set_if_not_exists(ns, key, data):
165 for channel, events in channels_and_events.items():
166 self._queue.put((channel, events))
170 def remove_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
171 keys: List[str]) -> None:
173 self._db.pop(key, None)
174 for channel, events in channels_and_events.items():
175 self._queue.put((channel, events))
177 def remove_if_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]], key: str,
178 data: bytes) -> bool:
179 if self.remove_if(ns, key, data):
180 for channel, events in channels_and_events.items():
181 self._queue.put((channel, events))
185 def remove_all_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]]) -> None:
186 # Note: Since fake db has only one namespace, this deletes all keys
188 for channel, events in channels_and_events.items():
189 self._queue.put((channel, events))
191 def subscribe_channel(self, ns: str,
192 cb: Union[Callable[[str, str], None], Callable[[str, List[str]], None]],
193 channels: List[str]) -> None:
194 for channel in channels:
195 self._channel_cbs[channel] = cb
196 if not self._listen_thread.is_alive() and self._run_in_thread:
197 self._listen_thread.start()
201 message = self._queue.get()
202 cb = self._channel_cbs.get(message[0], None)
204 cb(message[0], message[1][0] if (isinstance(message[1], list) and len(message[1]) == 1) else message[1])
207 def unsubscribe_channel(self, ns: str, channels: List[str]) -> None:
208 for channel in channels:
209 self._channel_cbs.pop(channel, None)
211 def start_event_listener(self) -> None:
212 if self._listen_thread.is_alive():
213 raise Exception("Event loop already started")
214 if len(self._channel_cbs) > 0:
215 self._listen_thread.start()
216 self._run_in_thread = True
218 def handle_events(self) -> Optional[Union[Tuple[str, str], Tuple[str, List[str]]]]:
219 if self._listen_thread.is_alive() or self._run_in_thread:
220 raise Exception("Event loop already started")
222 message = self._queue.get(block=False)
225 cb = self._channel_cbs.get(message[0], None)
226 notifications = message[1][0] if (isinstance(message[1], list) and len(message[1]) == 1) else message[1]
228 cb(message[0], notifications)
229 return (message[0], notifications)
232 class FakeDictBackendLock(DbBackendLockAbc):
234 A class providing fake implementation of database backend lock of Shared Data Layer (SDL).
235 This class does not provide working database solution, this class can be used in testing
236 purposes only. Implementation does not provide shared database resource, SDL client sees
237 only its local local 'fake' database, which is a simple Python dictionary. Also keys are
238 stored in database under the same namespace.
240 ns (str): Namespace under which this lock is targeted.
241 name (str): Lock name, identifies the lock key in a Redis database backend.
242 expiration (int, float): Lock expiration time after which the lock is removed if it hasn't
243 been released earlier by a 'release' method.
244 redis_backend (FakeBackend): Database backend object containing fake databese connection.
247 def __init__(self, ns: str, name: str, expiration: Union[int, float],
248 redis_backend: FakeDictBackend) -> None:
249 super().__init__(ns, name)
252 self._lock_name = name
253 self._lock_expiration = expiration
254 self.redis_backend = redis_backend
259 "lock DB type": "FAKE DB",
260 "lock namespace": self._ns,
261 "lock name": self._lock_name,
262 "lock status": self._lock_status_to_string()
266 def acquire(self, retry_interval: Union[int, float] = 0.1,
267 retry_timeout: Union[int, float] = 10) -> bool:
273 def release(self) -> None:
276 def refresh(self) -> None:
279 def get_validity_time(self) -> Union[int, float]:
280 return self._lock_expiration
282 def _lock_status_to_string(self) -> str: