Add support for notifications
[ric-plt/sdlpy.git] / ricsdl-package / tests / backend / test_redis.py
old mode 100644 (file)
new mode 100755 (executable)
index 4f46205..29cb582
@@ -35,6 +35,7 @@ def redis_backend_fixture(request):
     request.cls.dl_redis = [b'1', b'2']
     request.cls.dm = {'a': b'1', 'b': b'2'}
     request.cls.dm_redis = {'{some-ns},a': b'1', '{some-ns},b': b'2'}
+    request.cls.dm_redis_flat = ['{some-ns},a', b'1', '{some-ns},b', b'2']
     request.cls.key = 'a'
     request.cls.key_redis = '{some-ns},a'
     request.cls.keys = ['a', 'b']
@@ -54,6 +55,9 @@ def redis_backend_fixture(request):
     request.cls.group_redis = '{some-ns},some-group'
     request.cls.groupmembers = set([b'm1', b'm2'])
     request.cls.groupmember = b'm1'
+    request.cls.channels = ['abs', 'gma']
+    request.cls.channels_and_events = {'abs': ['cbn']}
+    request.cls.channels_and_events_redis = ['{some-ns},abs', 'cbn']
 
     request.cls.configuration = Mock()
     mock_conf_params = _Configuration.Params(db_host=None,
@@ -62,9 +66,11 @@ def redis_backend_fixture(request):
                                              db_sentinel_master_name=None,
                                              db_type=DbBackendType.REDIS)
     request.cls.configuration.get_params.return_value = mock_conf_params
-    with patch('ricsdl.backend.redis.Redis') as mock_redis:
+    with patch('ricsdl.backend.redis.Redis') as mock_redis, patch(
+            'ricsdl.backend.redis.PubSub') as mock_pubsub:
         db = ricsdl.backend.get_backend_instance(request.cls.configuration)
         request.cls.mock_redis = mock_redis.return_value
+        request.cls.mock_pubsub = mock_pubsub.return_value
     request.cls.db = db
 
     yield
@@ -318,6 +324,171 @@ class TestRedisBackend:
         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
             self.db.group_size(self.ns, self.group)
 
+    def test_set_and_publish_success(self):
+        self.db.set_and_publish(self.ns, self.channels_and_events, self.dm)
+        self.mock_redis.execute_command.assert_called_once_with('MSETMPUB', len(self.dm),
+                                                                len(self.channels_and_events),
+                                                                *self.dm_redis_flat,
+                                                                *self.channels_and_events_redis)
+
+    def test_set_and_publish_can_map_redis_exception_to_sdl_exception(self):
+        self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
+        with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+            self.db.set_and_publish(self.ns, self.channels_and_events, self.dm)
+
+    def test_set_if_and_publish_success(self):
+        self.mock_redis.execute_command.return_value = b"OK"
+        ret = self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
+                                         self.new_data)
+        self.mock_redis.execute_command.assert_called_once_with('SETIEPUB', self.key_redis,
+                                                                self.new_data, self.old_data,
+                                                                *self.channels_and_events_redis)
+        assert ret is True
+
+    def test_set_if_and_publish_returns_false_if_existing_key_value_not_expected(self):
+        self.mock_redis.execute_command.return_value = None
+        ret = self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
+                                         self.new_data)
+        self.mock_redis.execute_command.assert_called_once_with('SETIEPUB', self.key_redis,
+                                                                self.new_data, self.old_data,
+                                                                *self.channels_and_events_redis)
+        assert ret is False
+
+    def test_set_if_and_publish_can_map_redis_exception_to_sdl_exception(self):
+        self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
+        with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+            self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
+                                       self.new_data)
+
+    def test_set_if_not_exists_and_publish_success(self):
+        self.mock_redis.execute_command.return_value = b"OK"
+        ret = self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
+                                                    self.new_data)
+        self.mock_redis.execute_command.assert_called_once_with('SETNXPUB', self.key_redis,
+                                                                self.new_data,
+                                                                *self.channels_and_events_redis)
+        assert ret is True
+
+    def test_set_if_not_exists_and_publish_returns_false_if_key_already_exists(self):
+        self.mock_redis.execute_command.return_value = None
+        ret = self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
+                                                    self.new_data)
+        self.mock_redis.execute_command.assert_called_once_with('SETNXPUB', self.key_redis,
+                                                                self.new_data,
+                                                                *self.channels_and_events_redis)
+        assert ret is False
+
+    def set_if_not_exists_and_publish_can_map_redis_exception_to_sdl_exception(self):
+        self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
+        with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+            self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
+                                                  self.new_data)
+
+    def test_remove_and_publish_success(self):
+        self.db.remove_and_publish(self.ns, self.channels_and_events, self.key)
+        self.mock_redis.execute_command.assert_called_once_with('DELMPUB', len(self.key),
+                                                                len(self.channels_and_events),
+                                                                self.key_redis,
+                                                                *self.channels_and_events_redis)
+
+    def test_remove_if_and_publish_success(self):
+        self.mock_redis.execute_command.return_value = 1
+        ret = self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
+                                            self.new_data)
+        self.mock_redis.execute_command.assert_called_once_with('DELIEPUB', self.key_redis,
+                                                                self.new_data,
+                                                                *self.channels_and_events_redis)
+        assert ret is True
+
+    def test_remove_if_and_publish_returns_false_if_data_does_not_match(self):
+        self.mock_redis.execute_command.return_value = 0
+        ret = self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
+                                            self.new_data)
+        self.mock_redis.execute_command.assert_called_once_with('DELIEPUB', self.key_redis,
+                                                                self.new_data,
+                                                                *self.channels_and_events_redis)
+        assert ret is False
+
+    def test_remove_if_and_publish_can_map_redis_exception_to_sdl_exception(self):
+        self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
+        with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+            self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
+                                          self.new_data)
+
+    def test_remove_all_and_publish_success(self):
+        self.mock_redis.keys.return_value = ['{some-ns},a']
+        self.db.remove_all_and_publish(self.ns, self.channels_and_events)
+        self.mock_redis.keys.assert_called_once()
+        self.mock_redis.execute_command.assert_called_once_with('DELMPUB', len(self.key),
+                                                                len(self.channels_and_events),
+                                                                self.key_redis,
+                                                                *self.channels_and_events_redis)
+
+    def test_remove_all_and_publish_can_map_redis_exception_to_sdl_exception(self):
+        self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
+        with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+            self.db.remove_all_and_publish(self.ns, self.channels_and_events)
+
+    def test_subscribe_channel_success(self):
+        cb = Mock()
+        self.db.subscribe_channel(self.ns, cb, self.channels)
+        for channel in self.channels:
+            self.mock_pubsub.subscribe.assert_any_call(**{f'{{some-ns}},{channel}': cb})
+
+    def test_subscribe_channel_with_thread_success(self):
+        cb = Mock()
+        self.db.pubsub_thread.is_alive = Mock()
+        self.db.pubsub_thread.is_alive.return_value = False
+        self.db._run_in_thread = True
+        self.db.subscribe_channel(self.ns, cb, self.channels)
+        self.mock_pubsub.run_in_thread.assert_called_once_with(daemon=True, sleep_time=0.001)
+
+    def test_subscribe_can_map_redis_exception_to_sdl_exeception(self):
+        self.mock_pubsub.subscribe.side_effect = redis_exceptions.ResponseError('redis error!')
+        with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+            self.db.subscribe_channel(self.ns, Mock(), self.channels)
+
+    def test_unsubscribe_channel_success(self):
+        self.db.unsubscribe_channel(self.ns, [self.channels[0]])
+        self.mock_pubsub.unsubscribe.assert_called_with('{some-ns},abs')
+
+    def test_unsubscribe_channel_can_map_redis_exception_to_sdl_exeception(self):
+        self.mock_pubsub.unsubscribe.side_effect = redis_exceptions.ResponseError('redis error!')
+        with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+            self.db.unsubscribe_channel(self.ns, [self.channels[0]])
+
+    def test_start_event_listener_success(self):
+        self.db.start_event_listener()
+        assert self.db._run_in_thread
+
+    def test_start_event_listener_subscribe_first(self):
+        self.mock_pubsub.run_in_thread.return_value = Mock()
+        self.mock_redis.pubsub_channels.return_value = [b'{some-ns},abs']
+        self.db.subscribe_channel(self.ns, Mock(), self.channels)
+        self.db.start_event_listener()
+        self.mock_pubsub.run_in_thread.assert_called_once_with(daemon=True, sleep_time=0.001)
+
+    def test_start_event_listener_fail(self):
+        self.db.pubsub_thread.is_alive = Mock()
+        self.db.pubsub_thread.is_alive.return_value = True
+        with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+            self.db.start_event_listener()
+
+    def test_handle_events_success(self):
+        self.db.handle_events()
+        self.mock_pubsub.get_message.assert_called_once_with(ignore_subscribe_messages=True)
+
+    def test_handle_events_fail_already_started(self):
+        self.db.pubsub_thread.is_alive = Mock()
+        self.db.pubsub_thread.is_alive.return_value = True
+        with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+            self.db.handle_events()
+
+    def test_handle_events_fail_already_set(self):
+        self.db._run_in_thread = True
+        with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+            self.db.handle_events()
+
     def test_get_redis_connection_function_success(self):
         ret = self.db.get_redis_connection()
         assert ret is self.mock_redis
@@ -498,3 +669,14 @@ def test_system_error_exceptions_are_not_mapped_to_any_sdl_exception():
     with pytest.raises(SystemExit):
         with _map_to_sdl_exception():
             raise SystemExit('Fatal error')
+
+
+class TestRedisClient:
+    @classmethod
+    def setup_class(cls):
+        cls.pubsub = ricsdl.backend.redis.PubSub(Mock())
+        cls.pubsub.channels = {b'{some-ns},abs': Mock()}
+
+    def test_handle_pubsub_message(self):
+        assert self.pubsub.handle_message([b'message', b'{some-ns},abs', b'cbn']) == ('abs', 'cbn')
+        self.pubsub.channels.get(b'{some-ns},abs').assert_called_once_with('abs', 'cbn')