+ def test_set_and_publish_success(self):
+ self.db.set_and_publish(self.ns, self.channels_and_events, self.dm)
+ self.mock_redis.execute_command.assert_called_once_with('MSETMPUB', len(self.dm),
+ len(self.channels_and_events),
+ *self.dm_redis_flat,
+ *self.channels_and_events_redis)
+
+ def test_set_and_publish_can_map_redis_exception_to_sdl_exception(self):
+ self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.set_and_publish(self.ns, self.channels_and_events, self.dm)
+
+ def test_set_if_and_publish_success(self):
+ self.mock_redis.execute_command.return_value = b"OK"
+ ret = self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
+ self.new_data)
+ self.mock_redis.execute_command.assert_called_once_with('SETIEMPUB', self.key_redis,
+ self.new_data, self.old_data,
+ *self.channels_and_events_redis)
+ assert ret is True
+
+ def test_set_if_and_publish_returns_false_if_existing_key_value_not_expected(self):
+ self.mock_redis.execute_command.return_value = None
+ ret = self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
+ self.new_data)
+ self.mock_redis.execute_command.assert_called_once_with('SETIEMPUB', self.key_redis,
+ self.new_data, self.old_data,
+ *self.channels_and_events_redis)
+ assert ret is False
+
+ def test_set_if_and_publish_can_map_redis_exception_to_sdl_exception(self):
+ self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
+ self.new_data)
+
+ def test_set_if_not_exists_and_publish_success(self):
+ self.mock_redis.execute_command.return_value = b"OK"
+ ret = self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
+ self.new_data)
+ self.mock_redis.execute_command.assert_called_once_with('SETNXMPUB', self.key_redis,
+ self.new_data,
+ *self.channels_and_events_redis)
+ assert ret is True
+
+ def test_set_if_not_exists_and_publish_returns_false_if_key_already_exists(self):
+ self.mock_redis.execute_command.return_value = None
+ ret = self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
+ self.new_data)
+ self.mock_redis.execute_command.assert_called_once_with('SETNXMPUB', self.key_redis,
+ self.new_data,
+ *self.channels_and_events_redis)
+ assert ret is False
+
+ def set_if_not_exists_and_publish_can_map_redis_exception_to_sdl_exception(self):
+ self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
+ self.new_data)
+
+ def test_remove_and_publish_success(self):
+ self.db.remove_and_publish(self.ns, self.channels_and_events, self.key)
+ self.mock_redis.execute_command.assert_called_once_with('DELMPUB', len(self.key),
+ len(self.channels_and_events),
+ self.key_redis,
+ *self.channels_and_events_redis)
+
+ def test_remove_if_and_publish_success(self):
+ self.mock_redis.execute_command.return_value = 1
+ ret = self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
+ self.new_data)
+ self.mock_redis.execute_command.assert_called_once_with('DELIEMPUB', self.key_redis,
+ self.new_data,
+ *self.channels_and_events_redis)
+ assert ret is True
+
+ def test_remove_if_and_publish_returns_false_if_data_does_not_match(self):
+ self.mock_redis.execute_command.return_value = 0
+ ret = self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
+ self.new_data)
+ self.mock_redis.execute_command.assert_called_once_with('DELIEMPUB', self.key_redis,
+ self.new_data,
+ *self.channels_and_events_redis)
+ assert ret is False
+
+ def test_remove_if_and_publish_can_map_redis_exception_to_sdl_exception(self):
+ self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
+ self.new_data)
+
+ def test_remove_all_and_publish_success(self):
+ self.mock_redis.keys.return_value = ['{some-ns},a']
+ self.db.remove_all_and_publish(self.ns, self.channels_and_events)
+ self.mock_redis.keys.assert_called_once()
+ self.mock_redis.execute_command.assert_called_once_with('DELMPUB', len(self.key),
+ len(self.channels_and_events),
+ self.key_redis,
+ *self.channels_and_events_redis)
+
+ def test_remove_all_and_publish_can_map_redis_exception_to_sdl_exception(self):
+ self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.remove_all_and_publish(self.ns, self.channels_and_events)
+
+ def test_subscribe_channel_success(self):
+ cb = Mock()
+ self.db.subscribe_channel(self.ns, cb, self.channels)
+ for channel in self.channels:
+ self.mock_pubsub.subscribe.assert_any_call(**{f'{{some-ns}},{channel}': cb})
+
+ def test_subscribe_channel_with_thread_success(self):
+ cb = Mock()
+ self.db.pubsub_thread.is_alive = Mock()
+ self.db.pubsub_thread.is_alive.return_value = False
+ self.db._run_in_thread = True
+ self.db.subscribe_channel(self.ns, cb, self.channels)
+ self.mock_pubsub.run_in_thread.assert_called_once_with(daemon=True, sleep_time=0.001)
+
+ def test_subscribe_can_map_redis_exception_to_sdl_exeception(self):
+ self.mock_pubsub.subscribe.side_effect = redis_exceptions.ResponseError('redis error!')
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.subscribe_channel(self.ns, Mock(), self.channels)
+
+ def test_unsubscribe_channel_success(self):
+ self.db.unsubscribe_channel(self.ns, [self.channels[0]])
+ self.mock_pubsub.unsubscribe.assert_called_with('{some-ns},abs')
+
+ def test_unsubscribe_channel_can_map_redis_exception_to_sdl_exeception(self):
+ self.mock_pubsub.unsubscribe.side_effect = redis_exceptions.ResponseError('redis error!')
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.unsubscribe_channel(self.ns, [self.channels[0]])
+
+ def test_start_event_listener_success(self):
+ self.db.start_event_listener()
+ assert self.db._run_in_thread
+
+ def test_start_event_listener_subscribe_first(self):
+ self.mock_pubsub.run_in_thread.return_value = Mock()
+ self.mock_redis.pubsub_channels.return_value = [b'{some-ns},abs']
+ self.db.subscribe_channel(self.ns, Mock(), self.channels)
+ self.db.start_event_listener()
+ self.mock_pubsub.run_in_thread.assert_called_once_with(daemon=True, sleep_time=0.001)
+
+ def test_start_event_listener_fail(self):
+ self.db.pubsub_thread.is_alive = Mock()
+ self.db.pubsub_thread.is_alive.return_value = True
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.start_event_listener()
+
+ def test_handle_events_success(self):
+ self.db.handle_events()
+ self.mock_pubsub.get_message.assert_called_once_with(ignore_subscribe_messages=True)
+
+ def test_handle_events_fail_already_started(self):
+ self.db.pubsub_thread.is_alive = Mock()
+ self.db.pubsub_thread.is_alive.return_value = True
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.handle_events()
+
+ def test_handle_events_fail_already_set(self):
+ self.db._run_in_thread = True
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.handle_events()
+