Implement sentinel based DB capacity scaling
[ric-plt/sdlpy.git] / ricsdl-package / tests / backend / test_redis.py
index c5e1b03..1d79752 100755 (executable)
@@ -19,7 +19,7 @@
 #
 
 
-from unittest.mock import patch, Mock
+from unittest.mock import patch, Mock, MagicMock, call, ANY
 import pytest
 from redis import exceptions as redis_exceptions
 import ricsdl.backend
@@ -29,8 +29,32 @@ from ricsdl.configuration import DbBackendType
 import ricsdl.exceptions
 
 
+def get_test_sdl_standby_config():
+    return _Configuration.Params(db_host='service-ricplt-dbaas-tcp-cluster-0.ricplt',
+                                 db_port=6379,
+                                 db_sentinel_port=None,
+                                 db_sentinel_master_name=None,
+                                 db_cluster_addr_list=None,
+                                 db_type=DbBackendType.REDIS)
+
+def get_test_sdl_sentinel_config():
+    return _Configuration.Params(db_host='service-ricplt-dbaas-tcp-cluster-0.ricplt',
+                                 db_port=6379,
+                                 db_sentinel_port=26379,
+                                 db_sentinel_master_name='dbaasmaster',
+                                 db_cluster_addr_list=None,
+                                 db_type=DbBackendType.REDIS)
+
+def get_test_sdl_sentinel_cluster_config():
+    return _Configuration.Params(db_host='service-ricplt-dbaas-tcp-cluster-0.ricplt',
+                                 db_port=6379,
+                                 db_sentinel_port=26379,
+                                 db_sentinel_master_name='dbaasmaster',
+                                 db_cluster_addr_list='service-ricplt-dbaas-tcp-cluster-0.ricplt,service-ricplt-dbaas-tcp-cluster-1.ricplt',
+                                 db_type=DbBackendType.REDIS)
+
 @pytest.fixture()
-def redis_backend_fixture(request):
+def redis_backend_common_fixture(request):
     request.cls.ns = 'some-ns'
     request.cls.dl_redis = [b'1', b'2']
     request.cls.dm = {'a': b'1', 'b': b'2'}
@@ -56,22 +80,86 @@ def redis_backend_fixture(request):
     request.cls.groupmembers = set([b'm1', b'm2'])
     request.cls.groupmember = b'm1'
     request.cls.channels = ['abs', 'gma']
-    request.cls.channels_and_events = {'abs': ['cbn']}
-    request.cls.channels_and_events_redis = ['{some-ns},abs', 'cbn']
+    request.cls.channels_and_events = {'abs': ['cbn'], 'gma': ['jkl']}
+    request.cls.channels_and_events_redis = ['{some-ns},abs', 'cbn', '{some-ns},gma', 'jkl']
+
+    yield
 
+@pytest.fixture(params=['standalone', 'sentinel', 'sentinel_cluster'])
+def redis_backend_fixture(request, redis_backend_common_fixture):
     request.cls.configuration = Mock()
-    mock_conf_params = _Configuration.Params(db_host=None,
-                                             db_port=None,
-                                             db_sentinel_port=None,
-                                             db_sentinel_master_name=None,
-                                             db_type=DbBackendType.REDIS)
-    request.cls.configuration.get_params.return_value = mock_conf_params
-    with patch('ricsdl.backend.redis.Redis') as mock_redis, patch(
-            'ricsdl.backend.redis.PubSub') as mock_pubsub:
-        db = ricsdl.backend.get_backend_instance(request.cls.configuration)
-        request.cls.mock_redis = mock_redis.return_value
-        request.cls.mock_pubsub = mock_pubsub.return_value
-    request.cls.db = db
+    request.cls.test_backend_type = request.param
+    if request.param == 'standalone':
+        cfg = get_test_sdl_standby_config()
+        request.cls.configuration.get_params.return_value = cfg
+        with patch('ricsdl.backend.redis.Redis') as mock_redis, patch(
+                   'ricsdl.backend.redis.PubSub') as mock_pubsub, patch(
+                   'threading.Thread') as mock_thread:
+            db = ricsdl.backend.get_backend_instance(request.cls.configuration)
+            request.cls.mock_redis = mock_redis.return_value
+            request.cls.mock_pubsub = mock_pubsub.return_value
+            request.cls.mock_pubsub_thread = mock_thread.return_value
+            request.cls.mock_pubsub_thread.is_alive.return_value = False
+        request.cls.db = db
+
+        mock_redis.assert_called_once_with(db=0, host=cfg.db_host, max_connections=20, port=cfg.db_port)
+        mock_pubsub.assert_called_once_with(request.cls.mock_redis.connection_pool, ignore_subscribe_messages=True)
+        assert request.cls.mock_redis.set_response_callback.call_count == 2
+        assert request.cls.mock_redis.set_response_callback.call_args_list == [call('SETIE', ANY), call('DELIE', ANY)]
+
+    elif request.param == 'sentinel':
+        cfg = get_test_sdl_sentinel_config()
+        request.cls.configuration.get_params.return_value = cfg
+        with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel, patch(
+                   'ricsdl.backend.redis.PubSub') as mock_pubsub, patch(
+                   'threading.Thread') as mock_thread:
+            db = ricsdl.backend.get_backend_instance(request.cls.configuration)
+            request.cls.mock_redis = mock_sentinel.return_value.master_for.return_value
+            request.cls.mock_pubsub = mock_pubsub.return_value
+            request.cls.mock_pubsub_thread = mock_thread.return_value
+            request.cls.mock_pubsub_thread.is_alive.return_value = False
+        request.cls.db = db
+
+        mock_sentinel.assert_called_once_with([(cfg.db_host, cfg.db_sentinel_port)])
+        mock_sentinel.master_for.called_once_with(cfg.db_sentinel_master_name)
+        mock_pubsub.assert_called_once_with(request.cls.mock_redis.connection_pool, ignore_subscribe_messages=True)
+        assert request.cls.mock_redis.set_response_callback.call_count == 2
+        assert request.cls.mock_redis.set_response_callback.call_args_list == [call('SETIE', ANY), call('DELIE', ANY)]
+
+    elif request.param == 'sentinel_cluster':
+        cfg = get_test_sdl_sentinel_cluster_config()
+        request.cls.configuration.get_params.return_value = cfg
+        with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel, patch(
+                   'ricsdl.backend.redis.PubSub') as mock_pubsub, patch(
+                   'threading.Thread') as mock_thread:
+            db = ricsdl.backend.get_backend_instance(request.cls.configuration)
+            request.cls.mock_redis = mock_sentinel.return_value.master_for.return_value
+            request.cls.mock_pubsub = mock_pubsub.return_value
+            request.cls.mock_pubsub_thread = mock_thread.return_value
+            request.cls.mock_pubsub_thread.is_alive.return_value = False
+        request.cls.db = db
+
+        assert mock_sentinel.call_count == 2
+        mock_sentinel.assert_has_calls([
+            call([('service-ricplt-dbaas-tcp-cluster-0.ricplt', cfg.db_sentinel_port)]),
+            call([('service-ricplt-dbaas-tcp-cluster-1.ricplt', cfg.db_sentinel_port)]),
+        ], any_order=True)
+        assert mock_sentinel.return_value.master_for.call_count == 2
+        mock_sentinel.return_value.master_for.assert_has_calls(
+            [call(cfg.db_sentinel_master_name), call(cfg.db_sentinel_master_name)], any_order=True,
+        )
+        assert mock_pubsub.call_count == 2
+        mock_pubsub.assert_has_calls([
+            call(request.cls.mock_redis.connection_pool, ignore_subscribe_messages=True),
+            call(request.cls.mock_redis.connection_pool, ignore_subscribe_messages=True),
+        ])
+        assert request.cls.mock_redis.set_response_callback.call_count == 4
+        assert request.cls.mock_redis.set_response_callback.call_args_list == [
+            call('SETIE', ANY), call('DELIE', ANY),
+            call('SETIE', ANY), call('DELIE', ANY),
+        ]
+    else:
+        raise NotImplementedError
 
     yield
 
@@ -81,7 +169,10 @@ class TestRedisBackend:
     def test_is_connected_function_success(self):
         self.mock_redis.ping.return_value = True
         ret = self.db.is_connected()
-        self.mock_redis.ping.assert_called_once()
+        if self.test_backend_type == 'sentinel_cluster':
+            assert self.mock_redis.ping.call_count == 2
+        else:
+            assert self.mock_redis.ping.call_count == 1
         assert ret is True
 
     def test_is_connected_function_returns_false_if_ping_fails(self):
@@ -437,9 +528,11 @@ class TestRedisBackend:
 
     def test_subscribe_channel_with_thread_success(self):
         cb = Mock()
-        self.db.pubsub_thread.is_alive = Mock()
-        self.db.pubsub_thread.is_alive.return_value = False
-        self.db._run_in_thread = True
+        # Call first start_event_listener() to enable run_in_thread flag. When subscribe_channel()
+        # is called thread is started if the flag is enabled. In real-life scenario it's highly
+        # advisable at first to subscribe to some events by calling subscribe_channel() and only
+        # after it start threads by calling start_event_listener().
+        self.db.start_event_listener()
         self.db.subscribe_channel(self.ns, cb, self.channels)
         self.mock_pubsub.run_in_thread.assert_called_once_with(daemon=True, sleep_time=0.001)
 
@@ -457,46 +550,137 @@ class TestRedisBackend:
         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
             self.db.unsubscribe_channel(self.ns, [self.channels[0]])
 
-    def test_start_event_listener_success(self):
-        self.db.start_event_listener()
-        assert self.db._run_in_thread
-
-    def test_start_event_listener_subscribe_first(self):
-        self.mock_pubsub.run_in_thread.return_value = Mock()
-        self.mock_redis.pubsub_channels.return_value = [b'{some-ns},abs']
+    def test_subscribe_and_start_event_listener(self):
+        self.mock_redis.pubsub_channels.return_value = self.channels_and_events_redis
         self.db.subscribe_channel(self.ns, Mock(), self.channels)
         self.db.start_event_listener()
-        self.mock_pubsub.run_in_thread.assert_called_once_with(daemon=True, sleep_time=0.001)
+
+        if self.test_backend_type == 'sentinel_cluster':
+            assert self.mock_redis.pubsub_channels.call_count == 2
+            assert self.mock_pubsub.run_in_thread.call_count == 2
+            self.mock_pubsub.run_in_thread.assert_has_calls([
+                call(daemon=True, sleep_time=0.001),
+                call(daemon=True, sleep_time=0.001),
+            ])
+        else:
+            self.mock_redis.pubsub_channels.assert_called_once()
+            self.mock_pubsub.run_in_thread.assert_called_once_with(daemon=True, sleep_time=0.001)
 
     def test_start_event_listener_fail(self):
-        self.db.pubsub_thread.is_alive = Mock()
-        self.db.pubsub_thread.is_alive.return_value = True
+        self.mock_pubsub_thread.is_alive.return_value = True
         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
             self.db.start_event_listener()
 
     def test_handle_events_success(self):
         self.db.handle_events()
-        self.mock_pubsub.get_message.assert_called_once_with(ignore_subscribe_messages=True)
+        self.db.handle_events()
+        self.db.handle_events()
+        self.db.handle_events()
+        assert self.mock_pubsub.get_message.call_count == 4
 
-    def test_handle_events_fail_already_started(self):
-        self.db.pubsub_thread.is_alive = Mock()
-        self.db.pubsub_thread.is_alive.return_value = True
+    def test_handle_events_fail_if_subsub_thread_alive(self):
+        self.mock_pubsub_thread.is_alive.return_value = True
         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
             self.db.handle_events()
 
-    def test_handle_events_fail_already_set(self):
-        self.db._run_in_thread = True
+    def test_handle_events_fail_if_event_listener_already_running(self):
+        self.db.start_event_listener()
         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
             self.db.handle_events()
 
+    def test_handle_events_ignores_message_handling_redis_runtime_exception(self):
+         self.mock_pubsub.get_message.side_effect = RuntimeError()
+         self.db.handle_events()
+         self.mock_pubsub.get_message.assert_called_once()
+
     def test_get_redis_connection_function_success(self):
-        ret = self.db.get_redis_connection()
+        ret = self.db.get_redis_connection(self.ns)
         assert ret is self.mock_redis
 
     def test_redis_backend_object_string_representation(self):
         str_out = str(self.db)
         assert str_out is not None
 
+    def test_namespace_hash_algorithm_stays_unaltered(self):
+        ret_hash = self.db._RedisBackend__get_hash('sdltoolns')
+        assert ret_hash == 2897969051
+
+def test_standalone_redis_init_exception_is_mapped_to_sdl_exeception():
+    mock_cfg = Mock()
+    cfg_params = get_test_sdl_standby_config()
+    mock_cfg.get_params.return_value = cfg_params
+    with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+        with patch('ricsdl.backend.redis.Redis') as mock_redis:
+            mock_redis.side_effect = redis_exceptions.ResponseError('redis error!')
+            ricsdl.backend.get_backend_instance(mock_cfg)
+
+def test_standalone_pubsub_init_exception_is_mapped_to_sdl_exeception():
+    mock_cfg = Mock()
+    cfg_params = get_test_sdl_standby_config()
+    mock_cfg.get_params.return_value = cfg_params
+    with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+        with patch('ricsdl.backend.redis.Redis') as mock_redis, patch(
+                   'ricsdl.backend.redis.PubSub') as mock_pubsub:
+            mock_pubsub.side_effect = redis_exceptions.ResponseError('redis error!')
+            ricsdl.backend.get_backend_instance(mock_cfg)
+
+def test_sentinel_redis_init_exception_is_mapped_to_sdl_exeception():
+    mock_cfg = Mock()
+    cfg_params = get_test_sdl_sentinel_config()
+    mock_cfg.get_params.return_value = cfg_params
+    with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+        with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel:
+            mock_sentinel.side_effect = redis_exceptions.ResponseError('redis error!')
+            ricsdl.backend.get_backend_instance(mock_cfg)
+
+def test_sentinel_pubsub_init_exception_is_mapped_to_sdl_exeception():
+    mock_cfg = Mock()
+    cfg_params = get_test_sdl_sentinel_config()
+    mock_cfg.get_params.return_value = cfg_params
+    with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+        with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel, patch(
+                   'ricsdl.backend.redis.PubSub') as mock_pubsub:
+            mock_pubsub.side_effect = redis_exceptions.ResponseError('redis error!')
+            ricsdl.backend.get_backend_instance(mock_cfg)
+
+def test_sentinel_master_for_exception_is_mapped_to_sdl_exeception():
+    mock_cfg = Mock()
+    cfg_params = get_test_sdl_sentinel_config()
+    mock_cfg.get_params.return_value = cfg_params
+    with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+        with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel, patch(
+                   'ricsdl.backend.redis.PubSub') as mock_pubsub:
+            mock_sentinel.return_value.master_for.side_effect = redis_exceptions.ResponseError('redis error!')
+            ricsdl.backend.get_backend_instance(mock_cfg)
+
+def test_sentinel_cluster_redis_init_exception_is_mapped_to_sdl_exeception():
+    mock_cfg = Mock()
+    cfg_params = get_test_sdl_sentinel_cluster_config()
+    mock_cfg.get_params.return_value = cfg_params
+    with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+        with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel:
+            mock_sentinel.side_effect = redis_exceptions.ResponseError('redis error!')
+            ricsdl.backend.get_backend_instance(mock_cfg)
+
+def test_sentinel_cluster_pubsub_init_exception_is_mapped_to_sdl_exeception():
+    mock_cfg = Mock()
+    cfg_params = get_test_sdl_sentinel_cluster_config()
+    mock_cfg.get_params.return_value = cfg_params
+    with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+        with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel, patch(
+                   'ricsdl.backend.redis.PubSub') as mock_pubsub:
+            mock_pubsub.side_effect = redis_exceptions.ResponseError('redis error!')
+            ricsdl.backend.get_backend_instance(mock_cfg)
+
+def test_sentinel_cluster_master_for_exception_is_mapped_to_sdl_exeception():
+    mock_cfg = Mock()
+    cfg_params = get_test_sdl_sentinel_cluster_config()
+    mock_cfg.get_params.return_value = cfg_params
+    with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+        with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel, patch(
+                   'ricsdl.backend.redis.PubSub') as mock_pubsub:
+            mock_sentinel.return_value.master_for.side_effect = redis_exceptions.ResponseError('redis error!')
+            ricsdl.backend.get_backend_instance(mock_cfg)
 
 class MockRedisLock:
     def __init__(self, redis, name, timeout=None, sleep=0.1,
@@ -542,6 +726,7 @@ def redis_backend_lock_fixture(request, mock_redis_lock):
                                              db_port=None,
                                              db_sentinel_port=None,
                                              db_sentinel_master_name=None,
+                                             db_cluster_addr_list=None,
                                              db_type=DbBackendType.REDIS)
     request.cls.configuration.get_params.return_value = mock_conf_params