data_map: Dict[str, bytes]) -> None:
self._db.update(data_map.copy())
for channel, events in channels_and_events.items():
- for event in events:
- self._queue.put((channel, event))
+ self._queue.put((channel, events))
def set_if_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]], key: str,
old_data: bytes, new_data: bytes) -> bool:
if self.set_if(ns, key, old_data, new_data):
for channel, events in channels_and_events.items():
- for event in events:
- self._queue.put((channel, event))
+ self._queue.put((channel, events))
return True
return False
key: str, data: bytes) -> bool:
if self.set_if_not_exists(ns, key, data):
for channel, events in channels_and_events.items():
- for event in events:
- self._queue.put((channel, event))
+ self._queue.put((channel, events))
return True
return False
for key in keys:
self._db.pop(key, None)
for channel, events in channels_and_events.items():
- for event in events:
- self._queue.put((channel, event))
+ self._queue.put((channel, events))
def remove_if_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]], key: str,
data: bytes) -> bool:
if self.remove_if(ns, key, data):
for channel, events in channels_and_events.items():
- for event in events:
- self._queue.put((channel, event))
+ self._queue.put((channel, events))
return True
return False
# Note: Since fake db has only one namespace, this deletes all keys
self._db.clear()
for channel, events in channels_and_events.items():
- for event in events:
- self._queue.put((channel, event))
+ self._queue.put((channel, events))
- def subscribe_channel(self, ns: str, cb: Callable[[str, str], None],
+ def subscribe_channel(self, ns: str,
+ cb: Union[Callable[[str, str], None], Callable[[str, List[str]], None]],
channels: List[str]) -> None:
for channel in channels:
self._channel_cbs[channel] = cb
message = self._queue.get()
cb = self._channel_cbs.get(message[0], None)
if cb:
- cb(message[0], message[1])
+ cb(message[0], message[1][0] if (isinstance(message[1], list) and len(message[1]) == 1) else message[1])
time.sleep(0.001)
def unsubscribe_channel(self, ns: str, channels: List[str]) -> None:
self._listen_thread.start()
self._run_in_thread = True
- def handle_events(self) -> Optional[Tuple[str, str]]:
+ def handle_events(self) -> Optional[Union[Tuple[str, str], Tuple[str, List[str]]]]:
if self._listen_thread.is_alive() or self._run_in_thread:
raise Exception("Event loop already started")
try:
except queue.Empty:
return None
cb = self._channel_cbs.get(message[0], None)
+ notifications = message[1][0] if (isinstance(message[1], list) and len(message[1]) == 1) else message[1]
if cb:
- cb(message[0], message[1])
- return (message[0], message[1])
+ cb(message[0], notifications)
+ return (message[0], notifications)
class FakeDictBackendLock(DbBackendLockAbc):