for channel, events in channels_and_events.items():
self._queue.put((channel, events))
- def subscribe_channel(self, ns: str,
- cb: Union[Callable[[str, str], None], Callable[[str, List[str]], None]],
+ def subscribe_channel(self, ns: str, cb: Callable[[str, List[str]], None],
channels: List[str]) -> None:
for channel in channels:
self._channel_cbs[channel] = cb
message = self._queue.get()
cb = self._channel_cbs.get(message[0], None)
if cb:
- cb(message[0], message[1][0] if (isinstance(message[1], list) and len(message[1]) == 1) else message[1])
+ cb(message[0], message[1])
time.sleep(0.001)
def unsubscribe_channel(self, ns: str, channels: List[str]) -> None:
self._listen_thread.start()
self._run_in_thread = True
- def handle_events(self) -> Optional[Union[Tuple[str, str], Tuple[str, List[str]]]]:
+ def handle_events(self) -> Optional[Tuple[str, List[str]]]:
if self._listen_thread.is_alive() or self._run_in_thread:
raise Exception("Event loop already started")
try:
except queue.Empty:
return None
cb = self._channel_cbs.get(message[0], None)
- notifications = message[1][0] if (isinstance(message[1], list) and len(message[1]) == 1) else message[1]
if cb:
- cb(message[0], notifications)
- return (message[0], notifications)
+ cb(message[0], message[1])
+ return (message[0], message[1])
class FakeDictBackendLock(DbBackendLockAbc):