Pack notifications to be compatible with SDL golang 61/6261/1
authorTimo Tietavainen <timo.tietavainen@nokia.com>
Wed, 9 Jun 2021 02:56:54 +0000 (05:56 +0300)
committerTimo Tietavainen <timo.tietavainen@nokia.com>
Wed, 9 Jun 2021 03:51:54 +0000 (06:51 +0300)
In SDL Golang 'publish' API implementation packs all the events to single DB
notification, where events are separated by '___' characters. In SDL Golang
subscriber receives this packed DB notification and it splits the DB
notification by '___' characters and calls application notification handler
callback function with list of received events.
Current implementation of SDL Python is however different, is does not do any
packing for events and hence it calls application notification callback many
times, once for each event it has received. Also if SDL Python application is
used as event subscriber and SDL Golang application as event published,
Python application won't be able to handle those notifications what are packed
to one db notification by SDL Golang application.
With this commit implement notification packing to SDL Python.

Issue-ID: RIC-795

Signed-off-by: Timo Tietavainen <timo.tietavainen@nokia.com>
Change-Id: Ie494430cf46756ea476b98343a8c651a2fa1dbcd

12 files changed:
docs/release-notes.rst
ricsdl-package/examples/notify.py
ricsdl-package/ricsdl/__init__.py
ricsdl-package/ricsdl/backend/dbbackend_abc.py
ricsdl-package/ricsdl/backend/fake_dict_db.py
ricsdl-package/ricsdl/backend/redis.py
ricsdl-package/ricsdl/configuration.py
ricsdl-package/ricsdl/syncstorage.py
ricsdl-package/ricsdl/syncstorage_abc.py
ricsdl-package/tests/backend/test_redis.py
ricsdl-package/tests/test_configuration.py
ricsdl-package/tests/test_syncstorage.py

index 8177824..12beb5b 100644 (file)
@@ -33,6 +33,10 @@ This document provides the release notes of the ricsdl library.
 Version history
 ---------------
 
+[2.3.0] - 2021-06-09
+
+* Pack all the events in a channel to one DB notification to be in line with SDL Golang.
+
 [2.2.0] - 2021-05-11
 
 * Add DB backend instance selection based on namespace value to balance DB load.
index 473dd73..7495bfe 100755 (executable)
@@ -57,13 +57,14 @@ import time
 
 from ricsdl.syncstorage import SyncStorage
 from ricsdl.exceptions import RejectedByBackend, NotConnected, BackendError
+from typing import (Union, List)
 
 # There are two available methods for applications to handle notifications:
 #   - EVENT_LISTENER (true):
 #     - User calls sdl.start_event_listener() which will create an SDL managed
 #       event loop for handling messages.
 #   - EVENT_LISTENER (false):
-#     - User need to call sdl.handle_messages() which will return the message
+#     - User need to call sdl.handle_events() which will return the message
 #
 # Note: In both cases, the given callback function will be executed.
 EVENT_LISTENER = True
@@ -114,8 +115,9 @@ last_cb_message = ""
 stop_thread = False
 
 
-def cb(channel: str, message: str):
-    """An example of function that will be called when an event is received.
+def cb(channel: str, message: Union[str, List[str]]):
+    """An example of function that will be called when a single event or list of
+    events are received.
 
     This function sets last_cb_channel and last_cb_message as channel and
     message respectively. This also unlocks the global lock variable.
@@ -160,6 +162,13 @@ _try_func_callback_return(
     lambda: mysdl.set_and_publish(MY_NS, {MY_CHANNEL: "SET PUBLISH"}, {'my_key': b'my_value'}))
 assert last_cb_channel == MY_CHANNEL and last_cb_message == "SET PUBLISH"
 
+# Sets a value 'my_value' for a key 'my_key' under given namespace. Note that value
+# type must be bytes and multiple key values can be set in one set function call.
+# Function publishes two events into one channel.
+_try_func_callback_return(
+    lambda: mysdl.set_and_publish(MY_NS, {MY_CHANNEL: ["SET PUBLISH1", "SET PUBLISH2"]}, {'my_key': b'my_value'}))
+assert last_cb_channel == MY_CHANNEL and last_cb_message == ["SET PUBLISH1", "SET PUBLISH2"]
+
 # Sets a value 'my_value2' for a key 'my_key' under given namespace only if the old value is
 # 'my_value'.
 was_set = _try_func_callback_return(lambda: mysdl.set_if_and_publish(
index d969bec..81f7e72 100644 (file)
@@ -31,7 +31,7 @@ from .exceptions import (
 )
 
 
-__version__ = '2.2.0'
+__version__ = '2.3.0'
 
 
 __all__ = [
index cdf0311..49944b5 100755 (executable)
@@ -161,7 +161,8 @@ class DbBackendAbc(ABC):
         pass
 
     @abstractmethod
-    def subscribe_channel(self, ns: str, cb: Callable[[str, str], None],
+    def subscribe_channel(self, ns: str,
+                          cb: Union[Callable[[str, str], None], Callable[[str, List[str]], None]],
                           channels: List[str]) -> None:
         """
         This takes a callback function and one or many channels to be subscribed.
@@ -184,10 +185,10 @@ class DbBackendAbc(ABC):
         pass
 
     @abstractmethod
-    def handle_events(self) -> Optional[Tuple[str, str]]:
+    def handle_events(self) -> Optional[Union[Tuple[str, str], Tuple[str, List[str]]]]:
         """
         handle_events is a non-blocking function that returns a tuple containing channel
-        name and message received from notification.
+        name and message(s) received from notification.
         """
         pass
 
index 5a49f10..1a63ebd 100755 (executable)
@@ -149,15 +149,13 @@ class FakeDictBackend(DbBackendAbc):
                         data_map: Dict[str, bytes]) -> None:
         self._db.update(data_map.copy())
         for channel, events in channels_and_events.items():
-            for event in events:
-                self._queue.put((channel, event))
+            self._queue.put((channel, events))
 
     def set_if_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]], key: str,
                            old_data: bytes, new_data: bytes) -> bool:
         if self.set_if(ns, key, old_data, new_data):
             for channel, events in channels_and_events.items():
-                for event in events:
-                    self._queue.put((channel, event))
+                self._queue.put((channel, events))
             return True
         return False
 
@@ -165,8 +163,7 @@ class FakeDictBackend(DbBackendAbc):
                                       key: str, data: bytes) -> bool:
         if self.set_if_not_exists(ns, key, data):
             for channel, events in channels_and_events.items():
-                for event in events:
-                    self._queue.put((channel, event))
+                self._queue.put((channel, events))
             return True
         return False
 
@@ -175,15 +172,13 @@ class FakeDictBackend(DbBackendAbc):
         for key in keys:
             self._db.pop(key, None)
         for channel, events in channels_and_events.items():
-            for event in events:
-                self._queue.put((channel, event))
+            self._queue.put((channel, events))
 
     def remove_if_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]], key: str,
                               data: bytes) -> bool:
         if self.remove_if(ns, key, data):
             for channel, events in channels_and_events.items():
-                for event in events:
-                    self._queue.put((channel, event))
+                self._queue.put((channel, events))
             return True
         return False
 
@@ -191,10 +186,10 @@ class FakeDictBackend(DbBackendAbc):
         # Note: Since fake db has only one namespace, this deletes all keys
         self._db.clear()
         for channel, events in channels_and_events.items():
-            for event in events:
-                self._queue.put((channel, event))
+            self._queue.put((channel, events))
 
-    def subscribe_channel(self, ns: str, cb: Callable[[str, str], None],
+    def subscribe_channel(self, ns: str,
+                          cb: Union[Callable[[str, str], None], Callable[[str, List[str]], None]],
                           channels: List[str]) -> None:
         for channel in channels:
             self._channel_cbs[channel] = cb
@@ -206,7 +201,7 @@ class FakeDictBackend(DbBackendAbc):
             message = self._queue.get()
             cb = self._channel_cbs.get(message[0], None)
             if cb:
-                cb(message[0], message[1])
+                cb(message[0], message[1][0] if (isinstance(message[1], list) and len(message[1]) == 1) else message[1])
             time.sleep(0.001)
 
     def unsubscribe_channel(self, ns: str, channels: List[str]) -> None:
@@ -220,7 +215,7 @@ class FakeDictBackend(DbBackendAbc):
             self._listen_thread.start()
         self._run_in_thread = True
 
-    def handle_events(self) -> Optional[Tuple[str, str]]:
+    def handle_events(self) -> Optional[Union[Tuple[str, str], Tuple[str, List[str]]]]:
         if self._listen_thread.is_alive() or self._run_in_thread:
             raise Exception("Event loop already started")
         try:
@@ -228,9 +223,10 @@ class FakeDictBackend(DbBackendAbc):
         except queue.Empty:
             return None
         cb = self._channel_cbs.get(message[0], None)
+        notifications = message[1][0] if (isinstance(message[1], list) and len(message[1]) == 1) else message[1]
         if cb:
-            cb(message[0], message[1])
-        return (message[0], message[1])
+            cb(message[0], notifications)
+        return (message[0], notifications)
 
 
 class FakeDictBackendLock(DbBackendLockAbc):
index 12726a3..bc4b43b 100755 (executable)
@@ -57,6 +57,10 @@ def _map_to_sdl_exception():
 
 
 class PubSub(redis.client.PubSub):
+    def __init__(self, event_separator, connection_pool, ignore_subscribe_messages=False):
+        super().__init__(connection_pool, shard_hint=None, ignore_subscribe_messages=ignore_subscribe_messages)
+        self.event_separator = event_separator
+
     def handle_message(self, response, ignore_subscribe_messages=False):
         """
         Parses a pub/sub message. If the channel or pattern was subscribed to
@@ -112,8 +116,10 @@ class PubSub(redis.client.PubSub):
                 # message
                 message_channel = self._strip_ns_from_bin_key('', message['channel'])
                 message_data = message['data'].decode('utf-8')
-                handler(message_channel, message_data)
-                return message_channel, message_data
+                messages = message_data.split(self.event_separator)
+                notification = messages[0] if len(messages) == 1 else messages
+                handler(message_channel, notification)
+                return message_channel, notification
         elif message_type != 'pong':
             # this is a subscribe/unsubscribe message. ignore if we don't
             # want them
@@ -148,6 +154,7 @@ class RedisBackend(DbBackendAbc):
     def __init__(self, configuration: _Configuration) -> None:
         super().__init__()
         self.next_client_event = 0
+        self.event_separator = configuration.get_event_separator()
         self.clients = list()
         with _map_to_sdl_exception():
             self.clients = self.__create_redis_clients(configuration)
@@ -323,7 +330,8 @@ class RedisBackend(DbBackendAbc):
                 *channels_and_events_prepared,
             )
 
-    def subscribe_channel(self, ns: str, cb: Callable[[str, str], None],
+    def subscribe_channel(self, ns: str,
+                          cb: Union[Callable[[str, str], None], Callable[[str, List[str]], None]],
                           channels: List[str]) -> None:
         channels = self.__add_keys_ns_prefix(ns, channels)
         for channel in channels:
@@ -349,7 +357,7 @@ class RedisBackend(DbBackendAbc):
                 redis_ctx.pubsub_thread = redis_ctx.redis_pubsub.run_in_thread(sleep_time=0.001, daemon=True)
             redis_ctx.run_in_thread = True
 
-    def handle_events(self) -> Optional[Tuple[str, str]]:
+    def handle_events(self) -> Optional[Union[Tuple[str, str], Tuple[str, List[str]]]]:
         if self.next_client_event >= len(self.clients):
             self.next_client_event = 0
         redis_ctx = self.clients[self.next_client_event]
@@ -389,7 +397,7 @@ class RedisBackend(DbBackendAbc):
         new_redis.set_response_callback('SETIE', lambda r: r and nativestr(r) == 'OK' or False)
         new_redis.set_response_callback('DELIE', lambda r: r and int(r) == 1 or False)
 
-        redis_pubsub = PubSub(new_redis.connection_pool, ignore_subscribe_messages=True)
+        redis_pubsub = PubSub(self.event_separator, new_redis.connection_pool, ignore_subscribe_messages=True)
         pubsub_thread = threading.Thread(target=None)
         run_in_thread = False
 
@@ -448,17 +456,20 @@ class RedisBackend(DbBackendAbc):
             ret_keys.append(nskey[1])
         return ret_keys
 
-    @classmethod
-    def _prepare_channels(cls, ns: str, channels_and_events: Dict[str,
-                                                                  List[str]]) -> Tuple[List, int]:
+    def _prepare_channels(self, ns: str,
+                          channels_and_events: Dict[str, List[str]]) -> Tuple[List, int]:
         channels_and_events_prepared = []
-        total_events = 0
         for channel, events in channels_and_events.items():
+            one_channel_join_events = None
             for event in events:
-                channels_and_events_prepared.append(cls.__add_key_ns_prefix(ns, channel))
-                channels_and_events_prepared.append(event)
-                total_events += 1
-        return channels_and_events_prepared, total_events
+                if one_channel_join_events is None:
+                    channels_and_events_prepared.append(self.__add_key_ns_prefix(ns, channel))
+                    one_channel_join_events = event
+                else:
+                    one_channel_join_events = one_channel_join_events + self.event_separator + event
+            channels_and_events_prepared.append(one_channel_join_events)
+        pairs_cnt = int(len(channels_and_events_prepared) / 2)
+        return channels_and_events_prepared, pairs_cnt
 
     def get_redis_connection(self, ns: str):
         """Return existing Redis database connection valid for the namespace."""
index b1521da..e99e68e 100644 (file)
@@ -76,3 +76,7 @@ class _Configuration():
                                      db_sentinel_master_name=os.getenv('DBAAS_MASTER_NAME'),
                                      db_cluster_addr_list=os.getenv('DBAAS_CLUSTER_ADDR_LIST'),
                                      db_type=backend_type)
+
+    @classmethod
+    def get_event_separator(cls):
+        return "___"
index 48b5e3d..55063e4 100755 (executable)
@@ -120,6 +120,7 @@ class SyncStorage(SyncStorageAbc):
     def __init__(self, fake_db_backend=None) -> None:
         super().__init__()
         self.__configuration = _Configuration(fake_db_backend)
+        self.event_separator = self.__configuration.get_event_separator()
         self.__dbbackend = ricsdl.backend.get_backend_instance(self.__configuration)
 
     def __del__(self):
@@ -265,7 +266,8 @@ class SyncStorage(SyncStorageAbc):
         self.__dbbackend.remove_all_and_publish(ns, channels_and_events)
 
     @func_arg_checker(SdlTypeError, 1, ns=str, cb=Callable, channels=(str, builtins.set))
-    def subscribe_channel(self, ns: str, cb: Callable[[str, str], None],
+    def subscribe_channel(self, ns: str,
+                          cb: Union[Callable[[str, str], None], Callable[[str, List[str]], None]],
                           channels: Union[str, Set[str]]) -> None:
         self._validate_callback(cb)
         channels = [channels] if isinstance(channels, str) else list(channels)
@@ -279,7 +281,7 @@ class SyncStorage(SyncStorageAbc):
     def start_event_listener(self) -> None:
         self.__dbbackend.start_event_listener()
 
-    def handle_events(self) -> Optional[Tuple[str, str]]:
+    def handle_events(self) -> Optional[Union[Tuple[str, str], Tuple[str, List[str]]]]:
         return self.__dbbackend.handle_events()
 
     @func_arg_checker(SdlTypeError, 1, ns=str, resource=str, expiration=(int, float))
@@ -302,8 +304,7 @@ class SyncStorage(SyncStorageAbc):
             if not isinstance(v, bytes):
                 raise SdlTypeError(r"Wrong dict value type: {}={}. Must be: bytes".format(v, type(v)))
 
-    @classmethod
-    def _validate_channels_events(cls, channels_and_events: Dict[Any, Any]):
+    def _validate_channels_events(self, channels_and_events: Dict[Any, Any]):
         for channel, events in channels_and_events.items():
             if not isinstance(channel, str):
                 raise SdlTypeError(r"Wrong channel type: {}={}. Must be: str".format(
@@ -316,6 +317,13 @@ class SyncStorage(SyncStorageAbc):
                     if not isinstance(event, str):
                         raise SdlTypeError(r"Wrong event type: {}={}. Must be: str".format(
                             events, type(events)))
+                    if self.event_separator in event:
+                        raise SdlTypeError(r"Events {} contains illegal substring (\"{}\")".format(
+                            events, self.event_separator))
+            else:
+                if self.event_separator in events:
+                    raise SdlTypeError(r"Events {} contains illegal substring (\"{}\")".format(
+                        events, self.event_separator))
 
     @classmethod
     def _validate_callback(cls, cb):
index 61dd67f..1151979 100755 (executable)
@@ -829,7 +829,8 @@ class SyncStorageAbc(ABC):
         pass
 
     @abstractmethod
-    def subscribe_channel(self, ns: str, cb: Callable[[str, str], None],
+    def subscribe_channel(self, ns: str,
+                          cb: Union[Callable[[str, str], None], Callable[[str, List[str]], None]],
                           channels: Union[str, Set[str]]) -> None:
         """
         Subscribes the client to the specified channels.
@@ -840,10 +841,10 @@ class SyncStorageAbc(ABC):
         the events.
 
         When subscribing for a channel, a callback function is given as a parameter.
-        Whenever a notification is received from a channel, this callback is called
-        with channel and notifications as parameter. A call to subscribe_channel function
-        returns immediately, callbacks will be called synchronously from a dedicated
-        thread.
+        Whenever single notification or many notifications are received from a channel,
+        this callback is called with channel and notifications as parameter. A call to
+        subscribe_channel function returns immediately, callbacks will be called
+        synchronously from a dedicated thread.
 
         It is possible to subscribe to different channels using different callbacks. In
         this case simply use subscribe_channel function separately for each channel.
@@ -855,7 +856,7 @@ class SyncStorageAbc(ABC):
 
         Args:
             ns: Namespace under which this operation is targeted.
-            cb: A function that is called when an event on channel is received.
+            cb: A function that is called when events on channel are received.
             channels: One channel or multiple channels to be subscribed.
 
         Returns:
@@ -909,10 +910,10 @@ class SyncStorageAbc(ABC):
         pass
 
     @abstractmethod
-    def handle_events(self) -> Optional[Tuple[str, str]]:
+    def handle_events(self) -> Optional[Union[Tuple[str, str], Tuple[str, List[str]]]]:
         """
         handle_events is a non-blocking function that returns a tuple containing channel
-        name and message received from an event. The registered callback function will
+        name and message(s) received from an event. The registered callback function will
         still be called when an event is received.
 
         This function is called if SDL user decides to handle notifications in its own
@@ -924,7 +925,8 @@ class SyncStorageAbc(ABC):
         events handling starts.
 
         Returns:
-            Tuple: (channel: str, message: str)
+            Tuple: (channel: str, message: str) or
+            Tuple: (channel: str, messages: list of str)
 
         Raises:
             SdlTypeError: If function's argument is of an inappropriate type.
index 1d79752..7d2928b 100755 (executable)
@@ -28,6 +28,7 @@ from ricsdl.configuration import _Configuration
 from ricsdl.configuration import DbBackendType
 import ricsdl.exceptions
 
+EVENT_SEPARATOR = "___"
 
 def get_test_sdl_standby_config():
     return _Configuration.Params(db_host='service-ricplt-dbaas-tcp-cluster-0.ricplt',
@@ -79,15 +80,17 @@ def redis_backend_common_fixture(request):
     request.cls.group_redis = '{some-ns},some-group'
     request.cls.groupmembers = set([b'm1', b'm2'])
     request.cls.groupmember = b'm1'
-    request.cls.channels = ['abs', 'gma']
-    request.cls.channels_and_events = {'abs': ['cbn'], 'gma': ['jkl']}
-    request.cls.channels_and_events_redis = ['{some-ns},abs', 'cbn', '{some-ns},gma', 'jkl']
+    request.cls.channels = ['ch1', 'ch2']
+    request.cls.channels_and_events = {'ch1': ['ev1'], 'ch2': ['ev2', 'ev3']}
+    request.cls.channels_and_events_redis = ['{some-ns},ch1', 'ev1',
+                                             '{some-ns},ch2', 'ev2' + EVENT_SEPARATOR + 'ev3']
 
     yield
 
 @pytest.fixture(params=['standalone', 'sentinel', 'sentinel_cluster'])
 def redis_backend_fixture(request, redis_backend_common_fixture):
     request.cls.configuration = Mock()
+    request.cls.configuration.get_event_separator.return_value = EVENT_SEPARATOR
     request.cls.test_backend_type = request.param
     if request.param == 'standalone':
         cfg = get_test_sdl_standby_config()
@@ -103,7 +106,8 @@ def redis_backend_fixture(request, redis_backend_common_fixture):
         request.cls.db = db
 
         mock_redis.assert_called_once_with(db=0, host=cfg.db_host, max_connections=20, port=cfg.db_port)
-        mock_pubsub.assert_called_once_with(request.cls.mock_redis.connection_pool, ignore_subscribe_messages=True)
+        mock_pubsub.assert_called_once_with(EVENT_SEPARATOR, request.cls.mock_redis.connection_pool,
+                                            ignore_subscribe_messages=True)
         assert request.cls.mock_redis.set_response_callback.call_count == 2
         assert request.cls.mock_redis.set_response_callback.call_args_list == [call('SETIE', ANY), call('DELIE', ANY)]
 
@@ -122,7 +126,8 @@ def redis_backend_fixture(request, redis_backend_common_fixture):
 
         mock_sentinel.assert_called_once_with([(cfg.db_host, cfg.db_sentinel_port)])
         mock_sentinel.master_for.called_once_with(cfg.db_sentinel_master_name)
-        mock_pubsub.assert_called_once_with(request.cls.mock_redis.connection_pool, ignore_subscribe_messages=True)
+        mock_pubsub.assert_called_once_with(EVENT_SEPARATOR, request.cls.mock_redis.connection_pool,
+                                            ignore_subscribe_messages=True)
         assert request.cls.mock_redis.set_response_callback.call_count == 2
         assert request.cls.mock_redis.set_response_callback.call_args_list == [call('SETIE', ANY), call('DELIE', ANY)]
 
@@ -150,8 +155,8 @@ def redis_backend_fixture(request, redis_backend_common_fixture):
         )
         assert mock_pubsub.call_count == 2
         mock_pubsub.assert_has_calls([
-            call(request.cls.mock_redis.connection_pool, ignore_subscribe_messages=True),
-            call(request.cls.mock_redis.connection_pool, ignore_subscribe_messages=True),
+            call(EVENT_SEPARATOR, request.cls.mock_redis.connection_pool, ignore_subscribe_messages=True),
+            call(EVENT_SEPARATOR, request.cls.mock_redis.connection_pool, ignore_subscribe_messages=True),
         ])
         assert request.cls.mock_redis.set_response_callback.call_count == 4
         assert request.cls.mock_redis.set_response_callback.call_args_list == [
@@ -543,7 +548,7 @@ class TestRedisBackend:
 
     def test_unsubscribe_channel_success(self):
         self.db.unsubscribe_channel(self.ns, [self.channels[0]])
-        self.mock_pubsub.unsubscribe.assert_called_with('{some-ns},abs')
+        self.mock_pubsub.unsubscribe.assert_called_with('{some-ns},ch1')
 
     def test_unsubscribe_channel_can_map_redis_exception_to_sdl_exeception(self):
         self.mock_pubsub.unsubscribe.side_effect = redis_exceptions.ResponseError('redis error!')
@@ -859,9 +864,9 @@ def test_system_error_exceptions_are_not_mapped_to_any_sdl_exception():
 class TestRedisClient:
     @classmethod
     def setup_class(cls):
-        cls.pubsub = ricsdl.backend.redis.PubSub(Mock())
-        cls.pubsub.channels = {b'{some-ns},abs': Mock()}
+        cls.pubsub = ricsdl.backend.redis.PubSub(EVENT_SEPARATOR, Mock())
+        cls.pubsub.channels = {b'{some-ns},ch1': Mock()}
 
     def test_handle_pubsub_message(self):
-        assert self.pubsub.handle_message([b'message', b'{some-ns},abs', b'cbn']) == ('abs', 'cbn')
-        self.pubsub.channels.get(b'{some-ns},abs').assert_called_once_with('abs', 'cbn')
+        assert self.pubsub.handle_message([b'message', b'{some-ns},ch1', b'cbn']) == ('ch1', 'cbn')
+        self.pubsub.channels.get(b'{some-ns},ch1').assert_called_once_with('ch1', 'cbn')
index a9d24ba..1c89663 100644 (file)
@@ -62,6 +62,9 @@ class TestConfiguration:
                                                 db_type=DbBackendType.FAKE_DICT)
         assert expected_config == self.config.get_params()
 
+    def test_get_event_separator_function_return_expected_separator(self, config_fixture):
+        assert "___" == _Configuration.get_event_separator()
+
     def test_get_params_function_can_raise_exception_if_wrong_fake_db_type(self):
         with pytest.raises(ValueError, match=r"Configuration error"):
             _Configuration(fake_db_backend='bad value')
index 332a2fa..839d159 100755 (executable)
@@ -26,6 +26,7 @@ from ricsdl.syncstorage import SyncLock
 from ricsdl.syncstorage import func_arg_checker
 from ricsdl.exceptions import (SdlTypeError, NotConnected)
 
+EVENT_SEPARATOR = "___"
 
 @pytest.fixture()
 def sync_storage_fixture(request):
@@ -44,7 +45,8 @@ def sync_storage_fixture(request):
     request.cls.lock_int_expiration = 10
     request.cls.lock_float_expiration = 1.1
     request.cls.channels = {'abs', 'cbn'}
-    request.cls.channels_and_events = {'abs': 'cbn'}
+    request.cls.channels_and_events = {'ch1': 'ev1', 'ch2': ['ev1', 'ev2', 'ev3']}
+    request.cls.ill_event = "illegal" + EVENT_SEPARATOR + "ev"
 
     with patch('ricsdl.backend.get_backend_instance') as mock_db_backend:
         storage = SyncStorage()
@@ -322,13 +324,25 @@ class TestSyncStorage:
 
     def test_set_and_publish_can_raise_exception_for_wrong_argument(self):
         with pytest.raises(SdlTypeError):
-            self.storage.set_and_publish(123, self.channels_and_events, {'a': b'v1'})
+            self.storage.set_and_publish(123, self.channels_and_events, self.dm)
         with pytest.raises(SdlTypeError):
-            self.storage.set_and_publish('ns', self.channels_and_events, [1, 2])
+            self.storage.set_and_publish(self.ns, None, self.dm)
         with pytest.raises(SdlTypeError):
-            self.storage.set_and_publish('ns', self.channels_and_events, {0xbad: b'v1'})
+            self.storage.set_and_publish(self.ns, {0xbad: "ev1"}, self.dm)
         with pytest.raises(SdlTypeError):
-            self.storage.set_and_publish('ns', self.channels_and_events, {'a': 0xbad})
+            self.storage.set_and_publish(self.ns, {"ch1": 0xbad}, self.dm)
+        with pytest.raises(SdlTypeError):
+            self.storage.set_and_publish(self.ns, {"ch1": self.ill_event}, self.dm)
+        with pytest.raises(SdlTypeError):
+            self.storage.set_and_publish(self.ns, {"ch1": ["ev1", 0xbad]}, self.dm)
+        with pytest.raises(SdlTypeError):
+            self.storage.set_and_publish(self.ns, {"ch1": ["ev1", self.ill_event]}, self.dm)
+        with pytest.raises(SdlTypeError):
+            self.storage.set_and_publish(self.ns, self.channels_and_events, [1, 2])
+        with pytest.raises(SdlTypeError):
+            self.storage.set_and_publish(self.ns, self.channels_and_events, {0xbad: b'v1'})
+        with pytest.raises(SdlTypeError):
+            self.storage.set_and_publish(self.ns, self.channels_and_events, {'a': 0xbad})
 
     def test_set_if_and_publish_success(self):
         self.mock_db_backend.set_if_and_publish.return_value = True
@@ -348,13 +362,35 @@ class TestSyncStorage:
 
     def test_set_if_and_publish_can_raise_exception_for_wrong_argument(self):
         with pytest.raises(SdlTypeError):
-            self.storage.set_if_and_publish(0xbad, self.channels_and_events, 'key', b'v1', b'v2')
+            self.storage.set_if_and_publish(0xbad, self.channels_and_events, self.key,
+                                            self.old_data, self.new_data)
+        with pytest.raises(SdlTypeError):
+            self.storage.set_if_and_publish(self.ns, None, self.key, self.old_data,
+                                            self.new_data)
+        with pytest.raises(SdlTypeError):
+            self.storage.set_if_and_publish(self.ns, {0xbad: "ev1"}, self.key,
+                                            self.old_data, self.new_data)
+        with pytest.raises(SdlTypeError):
+            self.storage.set_if_and_publish(self.ns, {"ch1": 0xbad}, self.key,
+                                            self.old_data, self.new_data)
+        with pytest.raises(SdlTypeError):
+            self.storage.set_if_and_publish(self.ns, {"ch1": self.ill_event}, self.key,
+                                            self.old_data, self.new_data)
+        with pytest.raises(SdlTypeError):
+            self.storage.set_if_and_publish(self.ns, {"ch1": ["ev1", 0xbad]}, self.key,
+                                            self.old_data, self.new_data)
         with pytest.raises(SdlTypeError):
-            self.storage.set_if_and_publish('ns', self.channels_and_events, 0xbad, b'v1', b'v2')
+            self.storage.set_if_and_publish(self.ns, {"ch1": ["ev1", self.ill_event]}, self.key,
+                                            self.old_data, self.new_data)
         with pytest.raises(SdlTypeError):
-            self.storage.set_if_and_publish('ns', self.channels_and_events, 'key', 0xbad, b'v2')
+            self.storage.set_if_and_publish(self.ns, self.channels_and_events, 0xbad,
+                                            self.old_data, self.new_data)
         with pytest.raises(SdlTypeError):
-            self.storage.set_if_and_publish('ns', self.channels_and_events, 'key', b'v1', 0xbad)
+            self.storage.set_if_and_publish(self.ns, self.channels_and_events, self.key,
+                                            0xbad, self.new_data)
+        with pytest.raises(SdlTypeError):
+            self.storage.set_if_and_publish(self.ns, self.channels_and_events, self.key,
+                                            self.old_data, 0xbad)
 
     def test_set_if_not_exists_and_publish_success(self):
         self.mock_db_backend.set_if_not_exists_and_publish.return_value = True
@@ -374,12 +410,32 @@ class TestSyncStorage:
 
     def test_set_if_not_exists_and_publish_can_raise_exception_for_wrong_argument(self):
         with pytest.raises(SdlTypeError):
-            self.storage.set_if_not_exists_and_publish(0xbad, self.channels_and_events, 'key',
-                                                       b'v1')
+            self.storage.set_if_not_exists_and_publish(0xbad, self.channels_and_events,
+                                                       self.key, self.new_data)
+        with pytest.raises(SdlTypeError):
+            self.storage.set_if_not_exists_and_publish(self.ns, None, self.key,
+                                                       self.new_data)
+        with pytest.raises(SdlTypeError):
+            self.storage.set_if_not_exists_and_publish(self.ns, {0xbad: "ev1"},
+                                                       self.key, self.new_data)
+        with pytest.raises(SdlTypeError):
+            self.storage.set_if_not_exists_and_publish(self.ns, {"ch1": 0xbad},
+                                                       self.key, self.new_data)
+        with pytest.raises(SdlTypeError):
+            self.storage.set_if_not_exists_and_publish(self.ns, {"ch1": self.ill_event},
+                                                       self.key, self.new_data)
         with pytest.raises(SdlTypeError):
-            self.storage.set_if_not_exists_and_publish('ns', self.channels_and_events, 0xbad, b'v1')
+            self.storage.set_if_not_exists_and_publish(self.ns, {"ch1": ["ev1", 0xbad]},
+                                                       self.key, self.new_data)
         with pytest.raises(SdlTypeError):
-            self.storage.set_if_not_exists_and_publish('ns', self.channels_and_events, 'key', 0xbad)
+            self.storage.set_if_not_exists_and_publish(self.ns, {"ch1": ["ev1", self.ill_event]},
+                                                       self.key, self.new_data)
+        with pytest.raises(SdlTypeError):
+            self.storage.set_if_not_exists_and_publish(self.ns, self.channels_and_events,
+                                                       0xbad, b'v1')
+        with pytest.raises(SdlTypeError):
+            self.storage.set_if_not_exists_and_publish(self.ns, self.channels_and_events,
+                                                       self.key, 0xbad)
 
     def test_remove_and_publish_function_success(self):
         self.storage.remove_and_publish(self.ns, self.channels_and_events, self.keys)
@@ -390,7 +446,19 @@ class TestSyncStorage:
         with pytest.raises(SdlTypeError):
             self.storage.remove_and_publish(0xbad, self.channels_and_events, self.keys)
         with pytest.raises(SdlTypeError):
-            self.storage.remove(self.ns, self.channels_and_events, 0xbad)
+            self.storage.remove_and_publish(self.ns, None, self.keys)
+        with pytest.raises(SdlTypeError):
+            self.storage.remove_and_publish(self.ns, {0xbad: "ev1"}, self.keys)
+        with pytest.raises(SdlTypeError):
+            self.storage.remove_and_publish(self.ns, {"ch1": 0xbad}, self.keys)
+        with pytest.raises(SdlTypeError):
+            self.storage.remove_and_publish(self.ns, {"ch1": self.ill_event}, self.keys)
+        with pytest.raises(SdlTypeError):
+            self.storage.remove_and_publish(self.ns, {"ch1": ["ev1", 0xbad]}, self.keys)
+        with pytest.raises(SdlTypeError):
+            self.storage.remove_and_publish(self.ns, {"ch1": ["ev1", self.ill_event]}, self.keys)
+        with pytest.raises(SdlTypeError):
+            self.storage.remove_and_publish(self.ns, self.channels_and_events, 0xbad)
 
     def test_remove_if_and_publish_success(self):
         self.mock_db_backend.remove_if_and_publish.return_value = True
@@ -412,6 +480,23 @@ class TestSyncStorage:
         with pytest.raises(SdlTypeError):
             self.storage.remove_if_and_publish(0xbad, self.channels_and_events, self.keys,
                                                self.old_data)
+        with pytest.raises(SdlTypeError):
+            self.storage.remove_if_and_publish(self.ns, None, self.keys, self.old_data)
+        with pytest.raises(SdlTypeError):
+            self.storage.remove_if_and_publish(self.ns, {0xbad: "ev1"}, self.keys,
+                                               self.old_data)
+        with pytest.raises(SdlTypeError):
+            self.storage.remove_if_and_publish(self.ns, {"ch1": 0xbad}, self.keys,
+                                               self.old_data)
+        with pytest.raises(SdlTypeError):
+            self.storage.remove_if_and_publish(self.ns, {"ch1": self.ill_event}, self.keys,
+                                               self.old_data)
+        with pytest.raises(SdlTypeError):
+            self.storage.remove_if_and_publish(self.ns, {"ch1": ["ev1", 0xbad]}, self.keys,
+                                               self.old_data)
+        with pytest.raises(SdlTypeError):
+            self.storage.remove_if_and_publish(self.ns, {"ch1": ["ev1", self.ill_event]}, self.keys,
+                                               self.old_data)
         with pytest.raises(SdlTypeError):
             self.storage.remove_if_and_publish(self.ns, self.channels_and_events, 0xbad,
                                                self.old_data)
@@ -426,6 +511,18 @@ class TestSyncStorage:
     def test_remove_all_and_publish_can_raise_exception_for_wrong_argument(self):
         with pytest.raises(SdlTypeError):
             self.storage.remove_all_and_publish(0xbad, self.channels_and_events)
+        with pytest.raises(SdlTypeError):
+            self.storage.remove_all_and_publish(self.ns, None)
+        with pytest.raises(SdlTypeError):
+            self.storage.remove_all_and_publish(self.ns, {0xbad: "ev1"})
+        with pytest.raises(SdlTypeError):
+            self.storage.remove_all_and_publish(self.ns, {"ch1": 0xbad})
+        with pytest.raises(SdlTypeError):
+            self.storage.remove_all_and_publish(self.ns, {"ch1": self.ill_event})
+        with pytest.raises(SdlTypeError):
+            self.storage.remove_all_and_publish(self.ns, {"ch1": ["ev1", 0xbad]})
+        with pytest.raises(SdlTypeError):
+            self.storage.remove_all_and_publish(self.ns, {"ch1": ["ev1", self.ill_event]})
 
     def test_subscribe_function_success(self):
         def cb(channel, message):