from redis import Redis
from redis.sentinel import Sentinel
from redis.lock import Lock
-from redis._compat import nativestr
+from redis.utils import str_if_bytes
from redis import exceptions as redis_exceptions
from ricsdl.configuration import _Configuration
from ricsdl.exceptions import (
class PubSub(redis.client.PubSub):
+ def __init__(self, event_separator, connection_pool, ignore_subscribe_messages=False):
+ super().__init__(connection_pool, shard_hint=None, ignore_subscribe_messages=ignore_subscribe_messages)
+ self.event_separator = event_separator
+
def handle_message(self, response, ignore_subscribe_messages=False):
"""
Parses a pub/sub message. If the channel or pattern was subscribed to
Adapted from: https://github.com/andymccurdy/redis-py/blob/master/redis/client.py
"""
- message_type = nativestr(response[0])
+ message_type = str_if_bytes(response[0])
if message_type == 'pmessage':
message = {
'type': message_type,
# message
message_channel = self._strip_ns_from_bin_key('', message['channel'])
message_data = message['data'].decode('utf-8')
- handler(message_channel, message_data)
- return message_channel, message_data
+ messages = message_data.split(self.event_separator)
+ handler(message_channel, messages)
+ return message_channel, messages
elif message_type != 'pong':
# this is a subscribe/unsubscribe message. ignore if we don't
# want them
def __init__(self, configuration: _Configuration) -> None:
super().__init__()
self.next_client_event = 0
+ self.event_separator = configuration.get_event_separator()
self.clients = list()
with _map_to_sdl_exception():
self.clients = self.__create_redis_clients(configuration)
*channels_and_events_prepared,
)
- def subscribe_channel(self, ns: str, cb: Callable[[str, str], None],
+ def subscribe_channel(self, ns: str, cb: Callable[[str, List[str]], None],
channels: List[str]) -> None:
channels = self.__add_keys_ns_prefix(ns, channels)
for channel in channels:
redis_ctx.pubsub_thread = redis_ctx.redis_pubsub.run_in_thread(sleep_time=0.001, daemon=True)
redis_ctx.run_in_thread = True
- def handle_events(self) -> Optional[Tuple[str, str]]:
+ def handle_events(self) -> Optional[Tuple[str, List[str]]]:
if self.next_client_event >= len(self.clients):
self.next_client_event = 0
redis_ctx = self.clients[self.next_client_event]
new_sentinel = Sentinel([sentinel_node])
new_redis = new_sentinel.master_for(master_name)
- new_redis.set_response_callback('SETIE', lambda r: r and nativestr(r) == 'OK' or False)
+ new_redis.set_response_callback('SETIE', lambda r: r and str_if_bytes(r) == 'OK' or False)
new_redis.set_response_callback('DELIE', lambda r: r and int(r) == 1 or False)
- redis_pubsub = PubSub(new_redis.connection_pool, ignore_subscribe_messages=True)
+ redis_pubsub = PubSub(self.event_separator, new_redis.connection_pool, ignore_subscribe_messages=True)
pubsub_thread = threading.Thread(target=None)
run_in_thread = False
ret_keys.append(nskey[1])
return ret_keys
- @classmethod
- def _prepare_channels(cls, ns: str, channels_and_events: Dict[str,
- List[str]]) -> Tuple[List, int]:
+ def _prepare_channels(self, ns: str,
+ channels_and_events: Dict[str, List[str]]) -> Tuple[List, int]:
channels_and_events_prepared = []
- total_events = 0
for channel, events in channels_and_events.items():
+ one_channel_join_events = None
for event in events:
- channels_and_events_prepared.append(cls.__add_key_ns_prefix(ns, channel))
- channels_and_events_prepared.append(event)
- total_events += 1
- return channels_and_events_prepared, total_events
+ if one_channel_join_events is None:
+ channels_and_events_prepared.append(self.__add_key_ns_prefix(ns, channel))
+ one_channel_join_events = event
+ else:
+ one_channel_join_events = one_channel_join_events + self.event_separator + event
+ channels_and_events_prepared.append(one_channel_join_events)
+ pairs_cnt = int(len(channels_and_events_prepared) / 2)
+ return channels_and_events_prepared, pairs_cnt
def get_redis_connection(self, ns: str):
"""Return existing Redis database connection valid for the namespace."""