+ @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, data_map=dict)
+ def set_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
+ data_map: Dict[str, bytes]) -> None:
+ self._validate_key_value_dict(data_map)
+ self._validate_channels_events(channels_and_events)
+ for channel, events in channels_and_events.items():
+ channels_and_events[channel] = [events] if isinstance(events, str) else events
+ self.__dbbackend.set_and_publish(ns, channels_and_events, data_map)
+
+ @func_arg_checker(SdlTypeError,
+ 1,
+ ns=str,
+ channels_and_events=dict,
+ key=str,
+ old_data=bytes,
+ new_data=bytes)
+ def set_if_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
+ key: str, old_data: bytes, new_data: bytes) -> bool:
+ self._validate_channels_events(channels_and_events)
+ for channel, events in channels_and_events.items():
+ channels_and_events[channel] = [events] if isinstance(events, str) else events
+ return self.__dbbackend.set_if_and_publish(ns, channels_and_events, key, old_data, new_data)
+
+ @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, key=str, data=bytes)
+ def set_if_not_exists_and_publish(self, ns: str,
+ channels_and_events: Dict[str, Union[str, List[str]]],
+ key: str, data: bytes) -> bool:
+ self._validate_channels_events(channels_and_events)
+ for channel, events in channels_and_events.items():
+ channels_and_events[channel] = [events] if isinstance(events, str) else events
+ return self.__dbbackend.set_if_not_exists_and_publish(ns, channels_and_events, key, data)
+
+ @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, keys=(str, builtins.set))
+ def remove_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
+ keys: Union[str, Set[str]]) -> None:
+ self._validate_channels_events(channels_and_events)
+ for channel, events in channels_and_events.items():
+ channels_and_events[channel] = [events] if isinstance(events, str) else events
+ keys = [keys] if isinstance(keys, str) else list(keys)
+ self.__dbbackend.remove_and_publish(ns, channels_and_events, keys)
+
+ @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, key=str, data=bytes)
+ def remove_if_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
+ key: str, data: bytes) -> bool:
+ self._validate_channels_events(channels_and_events)
+ for channel, events in channels_and_events.items():
+ channels_and_events[channel] = [events] if isinstance(events, str) else events
+ return self.__dbbackend.remove_if_and_publish(ns, channels_and_events, key, data)
+
+ @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict)
+ def remove_all_and_publish(self, ns: str,
+ channels_and_events: Dict[str, Union[str, List[str]]]) -> None:
+ self._validate_channels_events(channels_and_events)
+ for channel, events in channels_and_events.items():
+ channels_and_events[channel] = [events] if isinstance(events, str) else events
+ self.__dbbackend.remove_all_and_publish(ns, channels_and_events)
+
+ @func_arg_checker(SdlTypeError, 1, ns=str, cb=Callable, channels=(str, builtins.set))
+ def subscribe_channel(self, ns: str, cb: Callable[[str, List[str]], None],
+ channels: Union[str, Set[str]]) -> None:
+ self._validate_callback(cb)
+ channels = [channels] if isinstance(channels, str) else list(channels)
+ self.__dbbackend.subscribe_channel(ns, cb, channels)
+
+ @func_arg_checker(SdlTypeError, 1, ns=str, channels=(str, builtins.set))
+ def unsubscribe_channel(self, ns: str, channels: Union[str, Set[str]]) -> None:
+ channels = [channels] if isinstance(channels, str) else list(channels)
+ self.__dbbackend.unsubscribe_channel(ns, channels)
+
+ def start_event_listener(self) -> None:
+ self.__dbbackend.start_event_listener()
+
+ def handle_events(self) -> Optional[Tuple[str, List[str]]]:
+ return self.__dbbackend.handle_events()
+