#
-from unittest.mock import patch, Mock
+from unittest.mock import patch, Mock, MagicMock, call, ANY
import pytest
from redis import exceptions as redis_exceptions
import ricsdl.backend
import ricsdl.exceptions
+def get_test_sdl_standby_config():
+ return _Configuration.Params(db_host='service-ricplt-dbaas-tcp-cluster-0.ricplt',
+ db_port=6379,
+ db_sentinel_port=None,
+ db_sentinel_master_name=None,
+ db_cluster_addr_list=None,
+ db_type=DbBackendType.REDIS)
+
+def get_test_sdl_sentinel_config():
+ return _Configuration.Params(db_host='service-ricplt-dbaas-tcp-cluster-0.ricplt',
+ db_port=6379,
+ db_sentinel_port=26379,
+ db_sentinel_master_name='dbaasmaster',
+ db_cluster_addr_list=None,
+ db_type=DbBackendType.REDIS)
+
+def get_test_sdl_sentinel_cluster_config():
+ return _Configuration.Params(db_host='service-ricplt-dbaas-tcp-cluster-0.ricplt',
+ db_port=6379,
+ db_sentinel_port=26379,
+ db_sentinel_master_name='dbaasmaster',
+ db_cluster_addr_list='service-ricplt-dbaas-tcp-cluster-0.ricplt,service-ricplt-dbaas-tcp-cluster-1.ricplt',
+ db_type=DbBackendType.REDIS)
+
@pytest.fixture()
-def redis_backend_fixture(request):
+def redis_backend_common_fixture(request):
request.cls.ns = 'some-ns'
request.cls.dl_redis = [b'1', b'2']
request.cls.dm = {'a': b'1', 'b': b'2'}
request.cls.dm_redis = {'{some-ns},a': b'1', '{some-ns},b': b'2'}
+ request.cls.dm_redis_flat = ['{some-ns},a', b'1', '{some-ns},b', b'2']
request.cls.key = 'a'
request.cls.key_redis = '{some-ns},a'
request.cls.keys = ['a', 'b']
request.cls.group_redis = '{some-ns},some-group'
request.cls.groupmembers = set([b'm1', b'm2'])
request.cls.groupmember = b'm1'
+ request.cls.channels = ['abs', 'gma']
+ request.cls.channels_and_events = {'abs': ['cbn'], 'gma': ['jkl']}
+ request.cls.channels_and_events_redis = ['{some-ns},abs', 'cbn', '{some-ns},gma', 'jkl']
+
+ yield
+@pytest.fixture(params=['standalone', 'sentinel', 'sentinel_cluster'])
+def redis_backend_fixture(request, redis_backend_common_fixture):
request.cls.configuration = Mock()
- mock_conf_params = _Configuration.Params(db_host=None,
- db_port=None,
- db_sentinel_port=None,
- db_sentinel_master_name=None,
- db_type=DbBackendType.REDIS)
- request.cls.configuration.get_params.return_value = mock_conf_params
- with patch('ricsdl.backend.redis.Redis') as mock_redis:
- db = ricsdl.backend.get_backend_instance(request.cls.configuration)
- request.cls.mock_redis = mock_redis.return_value
- request.cls.db = db
+ request.cls.test_backend_type = request.param
+ if request.param == 'standalone':
+ cfg = get_test_sdl_standby_config()
+ request.cls.configuration.get_params.return_value = cfg
+ with patch('ricsdl.backend.redis.Redis') as mock_redis, patch(
+ 'ricsdl.backend.redis.PubSub') as mock_pubsub, patch(
+ 'threading.Thread') as mock_thread:
+ db = ricsdl.backend.get_backend_instance(request.cls.configuration)
+ request.cls.mock_redis = mock_redis.return_value
+ request.cls.mock_pubsub = mock_pubsub.return_value
+ request.cls.mock_pubsub_thread = mock_thread.return_value
+ request.cls.mock_pubsub_thread.is_alive.return_value = False
+ request.cls.db = db
+
+ mock_redis.assert_called_once_with(db=0, host=cfg.db_host, max_connections=20, port=cfg.db_port)
+ mock_pubsub.assert_called_once_with(request.cls.mock_redis.connection_pool, ignore_subscribe_messages=True)
+ assert request.cls.mock_redis.set_response_callback.call_count == 2
+ assert request.cls.mock_redis.set_response_callback.call_args_list == [call('SETIE', ANY), call('DELIE', ANY)]
+
+ elif request.param == 'sentinel':
+ cfg = get_test_sdl_sentinel_config()
+ request.cls.configuration.get_params.return_value = cfg
+ with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel, patch(
+ 'ricsdl.backend.redis.PubSub') as mock_pubsub, patch(
+ 'threading.Thread') as mock_thread:
+ db = ricsdl.backend.get_backend_instance(request.cls.configuration)
+ request.cls.mock_redis = mock_sentinel.return_value.master_for.return_value
+ request.cls.mock_pubsub = mock_pubsub.return_value
+ request.cls.mock_pubsub_thread = mock_thread.return_value
+ request.cls.mock_pubsub_thread.is_alive.return_value = False
+ request.cls.db = db
+
+ mock_sentinel.assert_called_once_with([(cfg.db_host, cfg.db_sentinel_port)])
+ mock_sentinel.master_for.called_once_with(cfg.db_sentinel_master_name)
+ mock_pubsub.assert_called_once_with(request.cls.mock_redis.connection_pool, ignore_subscribe_messages=True)
+ assert request.cls.mock_redis.set_response_callback.call_count == 2
+ assert request.cls.mock_redis.set_response_callback.call_args_list == [call('SETIE', ANY), call('DELIE', ANY)]
+
+ elif request.param == 'sentinel_cluster':
+ cfg = get_test_sdl_sentinel_cluster_config()
+ request.cls.configuration.get_params.return_value = cfg
+ with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel, patch(
+ 'ricsdl.backend.redis.PubSub') as mock_pubsub, patch(
+ 'threading.Thread') as mock_thread:
+ db = ricsdl.backend.get_backend_instance(request.cls.configuration)
+ request.cls.mock_redis = mock_sentinel.return_value.master_for.return_value
+ request.cls.mock_pubsub = mock_pubsub.return_value
+ request.cls.mock_pubsub_thread = mock_thread.return_value
+ request.cls.mock_pubsub_thread.is_alive.return_value = False
+ request.cls.db = db
+
+ assert mock_sentinel.call_count == 2
+ mock_sentinel.assert_has_calls([
+ call([('service-ricplt-dbaas-tcp-cluster-0.ricplt', cfg.db_sentinel_port)]),
+ call([('service-ricplt-dbaas-tcp-cluster-1.ricplt', cfg.db_sentinel_port)]),
+ ], any_order=True)
+ assert mock_sentinel.return_value.master_for.call_count == 2
+ mock_sentinel.return_value.master_for.assert_has_calls(
+ [call(cfg.db_sentinel_master_name), call(cfg.db_sentinel_master_name)], any_order=True,
+ )
+ assert mock_pubsub.call_count == 2
+ mock_pubsub.assert_has_calls([
+ call(request.cls.mock_redis.connection_pool, ignore_subscribe_messages=True),
+ call(request.cls.mock_redis.connection_pool, ignore_subscribe_messages=True),
+ ])
+ assert request.cls.mock_redis.set_response_callback.call_count == 4
+ assert request.cls.mock_redis.set_response_callback.call_args_list == [
+ call('SETIE', ANY), call('DELIE', ANY),
+ call('SETIE', ANY), call('DELIE', ANY),
+ ]
+ else:
+ raise NotImplementedError
yield
def test_is_connected_function_success(self):
self.mock_redis.ping.return_value = True
ret = self.db.is_connected()
- self.mock_redis.ping.assert_called_once()
+ if self.test_backend_type == 'sentinel_cluster':
+ assert self.mock_redis.ping.call_count == 2
+ else:
+ assert self.mock_redis.ping.call_count == 1
assert ret is True
def test_is_connected_function_returns_false_if_ping_fails(self):
with pytest.raises(ricsdl.exceptions.RejectedByBackend):
self.db.group_size(self.ns, self.group)
+ def test_set_and_publish_success(self):
+ self.db.set_and_publish(self.ns, self.channels_and_events, self.dm)
+ self.mock_redis.execute_command.assert_called_once_with('MSETMPUB', len(self.dm),
+ len(self.channels_and_events),
+ *self.dm_redis_flat,
+ *self.channels_and_events_redis)
+
+ def test_set_and_publish_can_map_redis_exception_to_sdl_exception(self):
+ self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.set_and_publish(self.ns, self.channels_and_events, self.dm)
+
+ def test_set_if_and_publish_success(self):
+ self.mock_redis.execute_command.return_value = b"OK"
+ ret = self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
+ self.new_data)
+ self.mock_redis.execute_command.assert_called_once_with('SETIEMPUB', self.key_redis,
+ self.new_data, self.old_data,
+ *self.channels_and_events_redis)
+ assert ret is True
+
+ def test_set_if_and_publish_returns_false_if_existing_key_value_not_expected(self):
+ self.mock_redis.execute_command.return_value = None
+ ret = self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
+ self.new_data)
+ self.mock_redis.execute_command.assert_called_once_with('SETIEMPUB', self.key_redis,
+ self.new_data, self.old_data,
+ *self.channels_and_events_redis)
+ assert ret is False
+
+ def test_set_if_and_publish_can_map_redis_exception_to_sdl_exception(self):
+ self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
+ self.new_data)
+
+ def test_set_if_not_exists_and_publish_success(self):
+ self.mock_redis.execute_command.return_value = b"OK"
+ ret = self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
+ self.new_data)
+ self.mock_redis.execute_command.assert_called_once_with('SETNXMPUB', self.key_redis,
+ self.new_data,
+ *self.channels_and_events_redis)
+ assert ret is True
+
+ def test_set_if_not_exists_and_publish_returns_false_if_key_already_exists(self):
+ self.mock_redis.execute_command.return_value = None
+ ret = self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
+ self.new_data)
+ self.mock_redis.execute_command.assert_called_once_with('SETNXMPUB', self.key_redis,
+ self.new_data,
+ *self.channels_and_events_redis)
+ assert ret is False
+
+ def set_if_not_exists_and_publish_can_map_redis_exception_to_sdl_exception(self):
+ self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
+ self.new_data)
+
+ def test_remove_and_publish_success(self):
+ self.db.remove_and_publish(self.ns, self.channels_and_events, self.key)
+ self.mock_redis.execute_command.assert_called_once_with('DELMPUB', len(self.key),
+ len(self.channels_and_events),
+ self.key_redis,
+ *self.channels_and_events_redis)
+
+ def test_remove_if_and_publish_success(self):
+ self.mock_redis.execute_command.return_value = 1
+ ret = self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
+ self.new_data)
+ self.mock_redis.execute_command.assert_called_once_with('DELIEMPUB', self.key_redis,
+ self.new_data,
+ *self.channels_and_events_redis)
+ assert ret is True
+
+ def test_remove_if_and_publish_returns_false_if_data_does_not_match(self):
+ self.mock_redis.execute_command.return_value = 0
+ ret = self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
+ self.new_data)
+ self.mock_redis.execute_command.assert_called_once_with('DELIEMPUB', self.key_redis,
+ self.new_data,
+ *self.channels_and_events_redis)
+ assert ret is False
+
+ def test_remove_if_and_publish_can_map_redis_exception_to_sdl_exception(self):
+ self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
+ self.new_data)
+
+ def test_remove_all_and_publish_success(self):
+ self.mock_redis.keys.return_value = ['{some-ns},a']
+ self.db.remove_all_and_publish(self.ns, self.channels_and_events)
+ self.mock_redis.keys.assert_called_once()
+ self.mock_redis.execute_command.assert_called_once_with('DELMPUB', len(self.key),
+ len(self.channels_and_events),
+ self.key_redis,
+ *self.channels_and_events_redis)
+
+ def test_remove_all_and_publish_can_map_redis_exception_to_sdl_exception(self):
+ self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.remove_all_and_publish(self.ns, self.channels_and_events)
+
+ def test_subscribe_channel_success(self):
+ cb = Mock()
+ self.db.subscribe_channel(self.ns, cb, self.channels)
+ for channel in self.channels:
+ self.mock_pubsub.subscribe.assert_any_call(**{f'{{some-ns}},{channel}': cb})
+
+ def test_subscribe_channel_with_thread_success(self):
+ cb = Mock()
+ # Call first start_event_listener() to enable run_in_thread flag. When subscribe_channel()
+ # is called thread is started if the flag is enabled. In real-life scenario it's highly
+ # advisable at first to subscribe to some events by calling subscribe_channel() and only
+ # after it start threads by calling start_event_listener().
+ self.db.start_event_listener()
+ self.db.subscribe_channel(self.ns, cb, self.channels)
+ self.mock_pubsub.run_in_thread.assert_called_once_with(daemon=True, sleep_time=0.001)
+
+ def test_subscribe_can_map_redis_exception_to_sdl_exeception(self):
+ self.mock_pubsub.subscribe.side_effect = redis_exceptions.ResponseError('redis error!')
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.subscribe_channel(self.ns, Mock(), self.channels)
+
+ def test_unsubscribe_channel_success(self):
+ self.db.unsubscribe_channel(self.ns, [self.channels[0]])
+ self.mock_pubsub.unsubscribe.assert_called_with('{some-ns},abs')
+
+ def test_unsubscribe_channel_can_map_redis_exception_to_sdl_exeception(self):
+ self.mock_pubsub.unsubscribe.side_effect = redis_exceptions.ResponseError('redis error!')
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.unsubscribe_channel(self.ns, [self.channels[0]])
+
+ def test_subscribe_and_start_event_listener(self):
+ self.mock_redis.pubsub_channels.return_value = self.channels_and_events_redis
+ self.db.subscribe_channel(self.ns, Mock(), self.channels)
+ self.db.start_event_listener()
+
+ if self.test_backend_type == 'sentinel_cluster':
+ assert self.mock_redis.pubsub_channels.call_count == 2
+ assert self.mock_pubsub.run_in_thread.call_count == 2
+ self.mock_pubsub.run_in_thread.assert_has_calls([
+ call(daemon=True, sleep_time=0.001),
+ call(daemon=True, sleep_time=0.001),
+ ])
+ else:
+ self.mock_redis.pubsub_channels.assert_called_once()
+ self.mock_pubsub.run_in_thread.assert_called_once_with(daemon=True, sleep_time=0.001)
+
+ def test_start_event_listener_fail(self):
+ self.mock_pubsub_thread.is_alive.return_value = True
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.start_event_listener()
+
+ def test_handle_events_success(self):
+ self.db.handle_events()
+ self.db.handle_events()
+ self.db.handle_events()
+ self.db.handle_events()
+ assert self.mock_pubsub.get_message.call_count == 4
+
+ def test_handle_events_fail_if_subsub_thread_alive(self):
+ self.mock_pubsub_thread.is_alive.return_value = True
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.handle_events()
+
+ def test_handle_events_fail_if_event_listener_already_running(self):
+ self.db.start_event_listener()
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ self.db.handle_events()
+
+ def test_handle_events_ignores_message_handling_redis_runtime_exception(self):
+ self.mock_pubsub.get_message.side_effect = RuntimeError()
+ self.db.handle_events()
+ self.mock_pubsub.get_message.assert_called_once()
+
def test_get_redis_connection_function_success(self):
- ret = self.db.get_redis_connection()
+ ret = self.db.get_redis_connection(self.ns)
assert ret is self.mock_redis
def test_redis_backend_object_string_representation(self):
str_out = str(self.db)
assert str_out is not None
+ def test_namespace_hash_algorithm_stays_unaltered(self):
+ ret_hash = self.db._RedisBackend__get_hash('sdltoolns')
+ assert ret_hash == 2897969051
+
+def test_standalone_redis_init_exception_is_mapped_to_sdl_exeception():
+ mock_cfg = Mock()
+ cfg_params = get_test_sdl_standby_config()
+ mock_cfg.get_params.return_value = cfg_params
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ with patch('ricsdl.backend.redis.Redis') as mock_redis:
+ mock_redis.side_effect = redis_exceptions.ResponseError('redis error!')
+ ricsdl.backend.get_backend_instance(mock_cfg)
+
+def test_standalone_pubsub_init_exception_is_mapped_to_sdl_exeception():
+ mock_cfg = Mock()
+ cfg_params = get_test_sdl_standby_config()
+ mock_cfg.get_params.return_value = cfg_params
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ with patch('ricsdl.backend.redis.Redis') as mock_redis, patch(
+ 'ricsdl.backend.redis.PubSub') as mock_pubsub:
+ mock_pubsub.side_effect = redis_exceptions.ResponseError('redis error!')
+ ricsdl.backend.get_backend_instance(mock_cfg)
+
+def test_sentinel_redis_init_exception_is_mapped_to_sdl_exeception():
+ mock_cfg = Mock()
+ cfg_params = get_test_sdl_sentinel_config()
+ mock_cfg.get_params.return_value = cfg_params
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel:
+ mock_sentinel.side_effect = redis_exceptions.ResponseError('redis error!')
+ ricsdl.backend.get_backend_instance(mock_cfg)
+
+def test_sentinel_pubsub_init_exception_is_mapped_to_sdl_exeception():
+ mock_cfg = Mock()
+ cfg_params = get_test_sdl_sentinel_config()
+ mock_cfg.get_params.return_value = cfg_params
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel, patch(
+ 'ricsdl.backend.redis.PubSub') as mock_pubsub:
+ mock_pubsub.side_effect = redis_exceptions.ResponseError('redis error!')
+ ricsdl.backend.get_backend_instance(mock_cfg)
+
+def test_sentinel_master_for_exception_is_mapped_to_sdl_exeception():
+ mock_cfg = Mock()
+ cfg_params = get_test_sdl_sentinel_config()
+ mock_cfg.get_params.return_value = cfg_params
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel, patch(
+ 'ricsdl.backend.redis.PubSub') as mock_pubsub:
+ mock_sentinel.return_value.master_for.side_effect = redis_exceptions.ResponseError('redis error!')
+ ricsdl.backend.get_backend_instance(mock_cfg)
+
+def test_sentinel_cluster_redis_init_exception_is_mapped_to_sdl_exeception():
+ mock_cfg = Mock()
+ cfg_params = get_test_sdl_sentinel_cluster_config()
+ mock_cfg.get_params.return_value = cfg_params
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel:
+ mock_sentinel.side_effect = redis_exceptions.ResponseError('redis error!')
+ ricsdl.backend.get_backend_instance(mock_cfg)
+
+def test_sentinel_cluster_pubsub_init_exception_is_mapped_to_sdl_exeception():
+ mock_cfg = Mock()
+ cfg_params = get_test_sdl_sentinel_cluster_config()
+ mock_cfg.get_params.return_value = cfg_params
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel, patch(
+ 'ricsdl.backend.redis.PubSub') as mock_pubsub:
+ mock_pubsub.side_effect = redis_exceptions.ResponseError('redis error!')
+ ricsdl.backend.get_backend_instance(mock_cfg)
+
+def test_sentinel_cluster_master_for_exception_is_mapped_to_sdl_exeception():
+ mock_cfg = Mock()
+ cfg_params = get_test_sdl_sentinel_cluster_config()
+ mock_cfg.get_params.return_value = cfg_params
+ with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+ with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel, patch(
+ 'ricsdl.backend.redis.PubSub') as mock_pubsub:
+ mock_sentinel.return_value.master_for.side_effect = redis_exceptions.ResponseError('redis error!')
+ ricsdl.backend.get_backend_instance(mock_cfg)
class MockRedisLock:
def __init__(self, redis, name, timeout=None, sleep=0.1,
db_port=None,
db_sentinel_port=None,
db_sentinel_master_name=None,
+ db_cluster_addr_list=None,
db_type=DbBackendType.REDIS)
request.cls.configuration.get_params.return_value = mock_conf_params
with pytest.raises(SystemExit):
with _map_to_sdl_exception():
raise SystemExit('Fatal error')
+
+
+class TestRedisClient:
+ @classmethod
+ def setup_class(cls):
+ cls.pubsub = ricsdl.backend.redis.PubSub(Mock())
+ cls.pubsub.channels = {b'{some-ns},abs': Mock()}
+
+ def test_handle_pubsub_message(self):
+ assert self.pubsub.handle_message([b'message', b'{some-ns},abs', b'cbn']) == ('abs', 'cbn')
+ self.pubsub.channels.get(b'{some-ns},abs').assert_called_once_with('abs', 'cbn')