+ def test_set_and_publish_success(self):
+ self.db.set_and_publish(self.ns, self.channels_and_events, self.dm)
+ self.mock_redis.execute_command.assert_called_once_with('MSETMPUB', len(self.dm),
+ len(self.channels_and_events),
+ *self.dm_redis_flat,
+ *self.channels_and_events_redis)
+
+ def test_set_and_publish_can_map_redis_exception_to_sdl_exception(self):
+ self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.set_and_publish(self.ns, self.channels_and_events, self.dm)
+
+ def test_set_if_and_publish_success(self):
+ self.mock_redis.execute_command.return_value = b"OK"
+ ret = self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
+ self.new_data)
+ self.mock_redis.execute_command.assert_called_once_with('SETIEMPUB', self.key_redis,
+ self.new_data, self.old_data,
+ *self.channels_and_events_redis)
+ assert ret is True
+
+ def test_set_if_and_publish_returns_false_if_existing_key_value_not_expected(self):
+ self.mock_redis.execute_command.return_value = None
+ ret = self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
+ self.new_data)
+ self.mock_redis.execute_command.assert_called_once_with('SETIEMPUB', self.key_redis,
+ self.new_data, self.old_data,
+ *self.channels_and_events_redis)
+ assert ret is False
+
+ def test_set_if_and_publish_can_map_redis_exception_to_sdl_exception(self):
+ self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
+ self.new_data)
+
+ def test_set_if_not_exists_and_publish_success(self):
+ self.mock_redis.execute_command.return_value = b"OK"
+ ret = self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
+ self.new_data)
+ self.mock_redis.execute_command.assert_called_once_with('SETNXMPUB', self.key_redis,
+ self.new_data,
+ *self.channels_and_events_redis)
+ assert ret is True
+
+ def test_set_if_not_exists_and_publish_returns_false_if_key_already_exists(self):
+ self.mock_redis.execute_command.return_value = None
+ ret = self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
+ self.new_data)
+ self.mock_redis.execute_command.assert_called_once_with('SETNXMPUB', self.key_redis,
+ self.new_data,
+ *self.channels_and_events_redis)
+ assert ret is False
+
+ def set_if_not_exists_and_publish_can_map_redis_exception_to_sdl_exception(self):
+ self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
+ self.new_data)
+
+ def test_remove_and_publish_success(self):
+ self.db.remove_and_publish(self.ns, self.channels_and_events, self.key)
+ self.mock_redis.execute_command.assert_called_once_with('DELMPUB', len(self.key),
+ len(self.channels_and_events),
+ self.key_redis,
+ *self.channels_and_events_redis)
+
+ def test_remove_if_and_publish_success(self):
+ self.mock_redis.execute_command.return_value = 1
+ ret = self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
+ self.new_data)
+ self.mock_redis.execute_command.assert_called_once_with('DELIEMPUB', self.key_redis,
+ self.new_data,
+ *self.channels_and_events_redis)
+ assert ret is True
+
+ def test_remove_if_and_publish_returns_false_if_data_does_not_match(self):
+ self.mock_redis.execute_command.return_value = 0
+ ret = self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
+ self.new_data)
+ self.mock_redis.execute_command.assert_called_once_with('DELIEMPUB', self.key_redis,
+ self.new_data,
+ *self.channels_and_events_redis)
+ assert ret is False
+
+ def test_remove_if_and_publish_can_map_redis_exception_to_sdl_exception(self):
+ self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
+ self.new_data)
+
+ def test_remove_all_and_publish_success(self):
+ self.mock_redis.keys.return_value = ['{some-ns},a']
+ self.db.remove_all_and_publish(self.ns, self.channels_and_events)
+ self.mock_redis.keys.assert_called_once()
+ self.mock_redis.execute_command.assert_called_once_with('DELMPUB', len(self.key),
+ len(self.channels_and_events),
+ self.key_redis,
+ *self.channels_and_events_redis)
+
+ def test_remove_all_and_publish_can_map_redis_exception_to_sdl_exception(self):
+ self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.remove_all_and_publish(self.ns, self.channels_and_events)
+
+ def test_subscribe_channel_success(self):
+ cb = Mock()
+ self.db.subscribe_channel(self.ns, cb, self.channels)
+ for channel in self.channels:
+ self.mock_pubsub.subscribe.assert_any_call(**{f'{{some-ns}},{channel}': cb})
+
+ def test_subscribe_channel_with_thread_success(self):
+ cb = Mock()
+ # Call first start_event_listener() to enable run_in_thread flag. When subscribe_channel()
+ # is called thread is started if the flag is enabled. In real-life scenario it's highly
+ # advisable at first to subscribe to some events by calling subscribe_channel() and only
+ # after it start threads by calling start_event_listener().
+ self.db.start_event_listener()
+ self.db.subscribe_channel(self.ns, cb, self.channels)
+ self.mock_pubsub.run_in_thread.assert_called_once_with(daemon=True, sleep_time=0.001)
+
+ def test_subscribe_can_map_redis_exception_to_sdl_exeception(self):
+ self.mock_pubsub.subscribe.side_effect = redis_exceptions.ResponseError('redis error!')
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.subscribe_channel(self.ns, Mock(), self.channels)
+
+ def test_unsubscribe_channel_success(self):
+ self.db.unsubscribe_channel(self.ns, [self.channels[0]])
+ self.mock_pubsub.unsubscribe.assert_called_with('{some-ns},ch1')
+
+ def test_unsubscribe_channel_can_map_redis_exception_to_sdl_exeception(self):
+ self.mock_pubsub.unsubscribe.side_effect = redis_exceptions.ResponseError('redis error!')
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.unsubscribe_channel(self.ns, [self.channels[0]])
+
+ def test_subscribe_and_start_event_listener(self):
+ self.mock_redis.pubsub_channels.return_value = self.channels_and_events_redis
+ self.db.subscribe_channel(self.ns, Mock(), self.channels)
+ self.db.start_event_listener()
+
+ if self.test_backend_type == 'sentinel_cluster':
+ assert self.mock_redis.pubsub_channels.call_count == 2
+ assert self.mock_pubsub.run_in_thread.call_count == 2
+ self.mock_pubsub.run_in_thread.assert_has_calls([
+ call(daemon=True, sleep_time=0.001),
+ call(daemon=True, sleep_time=0.001),
+ ])
+ else:
+ self.mock_redis.pubsub_channels.assert_called_once()
+ self.mock_pubsub.run_in_thread.assert_called_once_with(daemon=True, sleep_time=0.001)
+
+ def test_start_event_listener_fail(self):
+ self.mock_pubsub_thread.is_alive.return_value = True
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.start_event_listener()
+
+ def test_handle_events_success(self):
+ self.db.handle_events()
+ self.db.handle_events()
+ self.db.handle_events()
+ self.db.handle_events()
+ assert self.mock_pubsub.get_message.call_count == 4
+
+ def test_handle_events_fail_if_subsub_thread_alive(self):
+ self.mock_pubsub_thread.is_alive.return_value = True
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.handle_events()
+
+ def test_handle_events_fail_if_event_listener_already_running(self):
+ self.db.start_event_listener()
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.handle_events()
+
+ def test_handle_events_ignores_message_handling_redis_runtime_exception(self):
+ self.mock_pubsub.get_message.side_effect = RuntimeError()
+ self.db.handle_events()
+ self.mock_pubsub.get_message.assert_called_once()
+