+ return self.__getClient(ns).scard(db_key)
+
+ def set_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
+ data_map: Dict[str, bytes]) -> None:
+ db_data_map = self.__add_data_map_ns_prefix(ns, data_map)
+ channels_and_events_prepared = []
+ total_events = 0
+ channels_and_events_prepared, total_events = self._prepare_channels(ns, channels_and_events)
+ with _map_to_sdl_exception():
+ return self.__getClient(ns).execute_command(
+ "MSETMPUB",
+ len(db_data_map),
+ total_events,
+ *[val for data in db_data_map.items() for val in data],
+ *channels_and_events_prepared,
+ )
+
+ def set_if_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]], key: str,
+ old_data: bytes, new_data: bytes) -> bool:
+ db_key = self.__add_key_ns_prefix(ns, key)
+ channels_and_events_prepared = []
+ channels_and_events_prepared, _ = self._prepare_channels(ns, channels_and_events)
+ with _map_to_sdl_exception():
+ ret = self.__getClient(ns).execute_command("SETIEMPUB", db_key, new_data, old_data,
+ *channels_and_events_prepared)
+ return ret == b"OK"
+
+ def set_if_not_exists_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
+ key: str, data: bytes) -> bool:
+ db_key = self.__add_key_ns_prefix(ns, key)
+ channels_and_events_prepared, _ = self._prepare_channels(ns, channels_and_events)
+ with _map_to_sdl_exception():
+ ret = self.__getClient(ns).execute_command("SETNXMPUB", db_key, data,
+ *channels_and_events_prepared)
+ return ret == b"OK"
+
+ def remove_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
+ keys: List[str]) -> None:
+ db_keys = self.__add_keys_ns_prefix(ns, keys)
+ channels_and_events_prepared, total_events = self._prepare_channels(ns, channels_and_events)
+ with _map_to_sdl_exception():
+ return self.__getClient(ns).execute_command(
+ "DELMPUB",
+ len(db_keys),
+ total_events,
+ *db_keys,
+ *channels_and_events_prepared,
+ )
+
+ def remove_if_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]], key: str,
+ data: bytes) -> bool:
+ db_key = self.__add_key_ns_prefix(ns, key)
+ channels_and_events_prepared, _ = self._prepare_channels(ns, channels_and_events)
+ with _map_to_sdl_exception():
+ ret = self.__getClient(ns).execute_command("DELIEMPUB", db_key, data,
+ *channels_and_events_prepared)
+ return bool(ret)
+
+ def remove_all_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]]) -> None:
+ keys = self.__getClient(ns).keys(self.__add_key_ns_prefix(ns, "*"))
+ channels_and_events_prepared, total_events = self._prepare_channels(ns, channels_and_events)
+ with _map_to_sdl_exception():
+ return self.__getClient(ns).execute_command(
+ "DELMPUB",
+ len(keys),
+ total_events,
+ *keys,
+ *channels_and_events_prepared,
+ )
+
+ def subscribe_channel(self, ns: str, cb: Callable[[str, List[str]], None],
+ channels: List[str]) -> None:
+ channels = self.__add_keys_ns_prefix(ns, channels)
+ for channel in channels:
+ with _map_to_sdl_exception():
+ redis_ctx = self.__getClientConn(ns)
+ redis_ctx.redis_pubsub.subscribe(**{channel: cb})
+ if not redis_ctx.pubsub_thread.is_alive() and redis_ctx.run_in_thread:
+ redis_ctx.pubsub_thread = redis_ctx.redis_pubsub.run_in_thread(sleep_time=0.001,
+ daemon=True)
+
+ def unsubscribe_channel(self, ns: str, channels: List[str]) -> None:
+ channels = self.__add_keys_ns_prefix(ns, channels)
+ for channel in channels:
+ with _map_to_sdl_exception():
+ self.__getClientConn(ns).redis_pubsub.unsubscribe(channel)
+
+ def start_event_listener(self) -> None:
+ redis_ctxs = self.__getClientConns()
+ for redis_ctx in redis_ctxs:
+ if redis_ctx.pubsub_thread.is_alive():
+ raise RejectedByBackend("Event loop already started")
+ if redis_ctx.redis_pubsub.subscribed and len(redis_ctx.redis_client.pubsub_channels()) > 0:
+ redis_ctx.pubsub_thread = redis_ctx.redis_pubsub.run_in_thread(sleep_time=0.001, daemon=True)
+ redis_ctx.run_in_thread = True
+
+ def handle_events(self) -> Optional[Tuple[str, List[str]]]:
+ if self.next_client_event >= len(self.clients):
+ self.next_client_event = 0
+ redis_ctx = self.clients[self.next_client_event]
+ self.next_client_event += 1
+ if redis_ctx.pubsub_thread.is_alive() or redis_ctx.run_in_thread:
+ raise RejectedByBackend("Event loop already started")
+ try:
+ return redis_ctx.redis_pubsub.get_message(ignore_subscribe_messages=True)
+ except RuntimeError:
+ return None
+
+ def __create_redis_clients(self, config):
+ clients = list()
+ cfg_params = config.get_params()
+ for i, addr in enumerate(cfg_params.db_cluster_addrs):
+ port = cfg_params.db_ports[i] if i < len(cfg_params.db_ports) else ""
+ sport = cfg_params.db_sentinel_ports[i] if i < len(cfg_params.db_sentinel_ports) else ""
+ name = cfg_params.db_sentinel_master_names[i] if i < len(cfg_params.db_sentinel_master_names) else ""
+
+ client = self.__create_redis_client(addr, port, sport, name)
+ clients.append(client)
+ return clients
+
+ def __create_redis_client(self, addr, port, sentinel_port, master_name):
+ new_sentinel = None
+ new_redis = None
+ if len(sentinel_port) == 0:
+ new_redis = Redis(host=addr, port=port, db=0, max_connections=20)
+ else:
+ sentinel_node = (addr, sentinel_port)
+ new_sentinel = Sentinel([sentinel_node])
+ new_redis = new_sentinel.master_for(master_name)
+
+ new_redis.set_response_callback('SETIE', lambda r: r and str_if_bytes(r) == 'OK' or False)
+ new_redis.set_response_callback('DELIE', lambda r: r and int(r) == 1 or False)
+
+ redis_pubsub = PubSub(self.event_separator, new_redis.connection_pool, ignore_subscribe_messages=True)
+ pubsub_thread = threading.Thread(target=None)
+ run_in_thread = False
+
+ return _RedisConn(new_redis, redis_pubsub, pubsub_thread, run_in_thread)
+
+ def __getClientConns(self):
+ return self.clients
+
+ def __getClientConn(self, ns):
+ clients_cnt = len(self.clients)
+ client_id = self.__get_hash(ns) % clients_cnt
+ return self.clients[client_id]
+
+ def __getClient(self, ns):
+ clients_cnt = len(self.clients)
+ client_id = 0
+ if clients_cnt > 1:
+ client_id = self.__get_hash(ns) % clients_cnt
+ return self.clients[client_id].redis_client
+
+ @classmethod
+ def __get_hash(cls, str):
+ return zlib.crc32(str.encode())