request.cls.dl_redis = [b'1', b'2']
request.cls.dm = {'a': b'1', 'b': b'2'}
request.cls.dm_redis = {'{some-ns},a': b'1', '{some-ns},b': b'2'}
+ request.cls.dm_redis_flat = ['{some-ns},a', b'1', '{some-ns},b', b'2']
request.cls.key = 'a'
request.cls.key_redis = '{some-ns},a'
request.cls.keys = ['a', 'b']
request.cls.group_redis = '{some-ns},some-group'
request.cls.groupmembers = set([b'm1', b'm2'])
request.cls.groupmember = b'm1'
+ request.cls.channels = ['abs', 'gma']
+ request.cls.channels_and_events = {'abs': ['cbn']}
+ request.cls.channels_and_events_redis = ['{some-ns},abs', 'cbn']
request.cls.configuration = Mock()
mock_conf_params = _Configuration.Params(db_host=None,
db_sentinel_master_name=None,
db_type=DbBackendType.REDIS)
request.cls.configuration.get_params.return_value = mock_conf_params
- with patch('ricsdl.backend.redis.Redis') as mock_redis:
+ with patch('ricsdl.backend.redis.Redis') as mock_redis, patch(
+ 'ricsdl.backend.redis.PubSub') as mock_pubsub:
db = ricsdl.backend.get_backend_instance(request.cls.configuration)
request.cls.mock_redis = mock_redis.return_value
+ request.cls.mock_pubsub = mock_pubsub.return_value
request.cls.db = db
yield
with pytest.raises(ricsdl.exceptions.RejectedByBackend):
self.db.group_size(self.ns, self.group)
+ def test_set_and_publish_success(self):
+ self.db.set_and_publish(self.ns, self.channels_and_events, self.dm)
+ self.mock_redis.execute_command.assert_called_once_with('MSETMPUB', len(self.dm),
+ len(self.channels_and_events),
+ *self.dm_redis_flat,
+ *self.channels_and_events_redis)
+
+ def test_set_and_publish_can_map_redis_exception_to_sdl_exception(self):
+ self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.set_and_publish(self.ns, self.channels_and_events, self.dm)
+
+ def test_set_if_and_publish_success(self):
+ self.mock_redis.execute_command.return_value = b"OK"
+ ret = self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
+ self.new_data)
+ self.mock_redis.execute_command.assert_called_once_with('SETIEPUB', self.key_redis,
+ self.new_data, self.old_data,
+ *self.channels_and_events_redis)
+ assert ret is True
+
+ def test_set_if_and_publish_returns_false_if_existing_key_value_not_expected(self):
+ self.mock_redis.execute_command.return_value = None
+ ret = self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
+ self.new_data)
+ self.mock_redis.execute_command.assert_called_once_with('SETIEPUB', self.key_redis,
+ self.new_data, self.old_data,
+ *self.channels_and_events_redis)
+ assert ret is False
+
+ def test_set_if_and_publish_can_map_redis_exception_to_sdl_exception(self):
+ self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
+ self.new_data)
+
+ def test_set_if_not_exists_and_publish_success(self):
+ self.mock_redis.execute_command.return_value = b"OK"
+ ret = self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
+ self.new_data)
+ self.mock_redis.execute_command.assert_called_once_with('SETNXPUB', self.key_redis,
+ self.new_data,
+ *self.channels_and_events_redis)
+ assert ret is True
+
+ def test_set_if_not_exists_and_publish_returns_false_if_key_already_exists(self):
+ self.mock_redis.execute_command.return_value = None
+ ret = self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
+ self.new_data)
+ self.mock_redis.execute_command.assert_called_once_with('SETNXPUB', self.key_redis,
+ self.new_data,
+ *self.channels_and_events_redis)
+ assert ret is False
+
+ def set_if_not_exists_and_publish_can_map_redis_exception_to_sdl_exception(self):
+ self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
+ self.new_data)
+
+ def test_remove_and_publish_success(self):
+ self.db.remove_and_publish(self.ns, self.channels_and_events, self.key)
+ self.mock_redis.execute_command.assert_called_once_with('DELMPUB', len(self.key),
+ len(self.channels_and_events),
+ self.key_redis,
+ *self.channels_and_events_redis)
+
+ def test_remove_if_and_publish_success(self):
+ self.mock_redis.execute_command.return_value = 1
+ ret = self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
+ self.new_data)
+ self.mock_redis.execute_command.assert_called_once_with('DELIEPUB', self.key_redis,
+ self.new_data,
+ *self.channels_and_events_redis)
+ assert ret is True
+
+ def test_remove_if_and_publish_returns_false_if_data_does_not_match(self):
+ self.mock_redis.execute_command.return_value = 0
+ ret = self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
+ self.new_data)
+ self.mock_redis.execute_command.assert_called_once_with('DELIEPUB', self.key_redis,
+ self.new_data,
+ *self.channels_and_events_redis)
+ assert ret is False
+
+ def test_remove_if_and_publish_can_map_redis_exception_to_sdl_exception(self):
+ self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
+ self.new_data)
+
+ def test_remove_all_and_publish_success(self):
+ self.mock_redis.keys.return_value = ['{some-ns},a']
+ self.db.remove_all_and_publish(self.ns, self.channels_and_events)
+ self.mock_redis.keys.assert_called_once()
+ self.mock_redis.execute_command.assert_called_once_with('DELMPUB', len(self.key),
+ len(self.channels_and_events),
+ self.key_redis,
+ *self.channels_and_events_redis)
+
+ def test_remove_all_and_publish_can_map_redis_exception_to_sdl_exception(self):
+ self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.remove_all_and_publish(self.ns, self.channels_and_events)
+
+ def test_subscribe_channel_success(self):
+ cb = Mock()
+ self.db.subscribe_channel(self.ns, cb, self.channels)
+ for channel in self.channels:
+ self.mock_pubsub.subscribe.assert_any_call(**{f'{{some-ns}},{channel}': cb})
+
+ def test_subscribe_channel_with_thread_success(self):
+ cb = Mock()
+ self.db.pubsub_thread.is_alive = Mock()
+ self.db.pubsub_thread.is_alive.return_value = False
+ self.db._run_in_thread = True
+ self.db.subscribe_channel(self.ns, cb, self.channels)
+ self.mock_pubsub.run_in_thread.assert_called_once_with(daemon=True, sleep_time=0.001)
+
+ def test_subscribe_can_map_redis_exception_to_sdl_exeception(self):
+ self.mock_pubsub.subscribe.side_effect = redis_exceptions.ResponseError('redis error!')
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.subscribe_channel(self.ns, Mock(), self.channels)
+
+ def test_unsubscribe_channel_success(self):
+ self.db.unsubscribe_channel(self.ns, [self.channels[0]])
+ self.mock_pubsub.unsubscribe.assert_called_with('{some-ns},abs')
+
+ def test_unsubscribe_channel_can_map_redis_exception_to_sdl_exeception(self):
+ self.mock_pubsub.unsubscribe.side_effect = redis_exceptions.ResponseError('redis error!')
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.unsubscribe_channel(self.ns, [self.channels[0]])
+
+ def test_start_event_listener_success(self):
+ self.db.start_event_listener()
+ assert self.db._run_in_thread
+
+ def test_start_event_listener_subscribe_first(self):
+ self.mock_pubsub.run_in_thread.return_value = Mock()
+ self.mock_redis.pubsub_channels.return_value = [b'{some-ns},abs']
+ self.db.subscribe_channel(self.ns, Mock(), self.channels)
+ self.db.start_event_listener()
+ self.mock_pubsub.run_in_thread.assert_called_once_with(daemon=True, sleep_time=0.001)
+
+ def test_start_event_listener_fail(self):
+ self.db.pubsub_thread.is_alive = Mock()
+ self.db.pubsub_thread.is_alive.return_value = True
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.start_event_listener()
+
+ def test_handle_events_success(self):
+ self.db.handle_events()
+ self.mock_pubsub.get_message.assert_called_once_with(ignore_subscribe_messages=True)
+
+ def test_handle_events_fail_already_started(self):
+ self.db.pubsub_thread.is_alive = Mock()
+ self.db.pubsub_thread.is_alive.return_value = True
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.handle_events()
+
+ def test_handle_events_fail_already_set(self):
+ self.db._run_in_thread = True
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.handle_events()
+
def test_get_redis_connection_function_success(self):
ret = self.db.get_redis_connection()
assert ret is self.mock_redis
with pytest.raises(SystemExit):
with _map_to_sdl_exception():
raise SystemExit('Fatal error')
+
+
+class TestRedisClient:
+ @classmethod
+ def setup_class(cls):
+ cls.pubsub = ricsdl.backend.redis.PubSub(Mock())
+ cls.pubsub.channels = {b'{some-ns},abs': Mock()}
+
+ def test_handle_pubsub_message(self):
+ assert self.pubsub.handle_message([b'message', b'{some-ns},abs', b'cbn']) == ('abs', 'cbn')
+ self.pubsub.channels.get(b'{some-ns},abs').assert_called_once_with('abs', 'cbn')