Add support for notifications 34/4534/4
authorTimothy Ebido <tj.ebido@samsung.com>
Wed, 12 Aug 2020 01:49:26 +0000 (10:49 +0900)
committerTimothy Ebido <tj.ebido@samsung.com>
Wed, 26 Aug 2020 04:50:30 +0000 (13:50 +0900)
- set_and_publish
- set_if_and_publish
- remove_and_publish
- remove_if_and_publish
- remove_all_and_publish
- start_event_listener
- handle_events
- subscribe_channel
- unsubscribe_channel

Issue ID: RIC-372

Signed-off-by: Timothy Ebido <tj.ebido@samsung.com>
Change-Id: I55b32df5cdf6ed394a80fe70f8cd1e0d09dc4b3b

12 files changed:
docs/release-notes.rst
ricsdl-package/README.md [changed mode: 0644->0755]
ricsdl-package/examples/notify.py [new file with mode: 0755]
ricsdl-package/ricsdl/__init__.py
ricsdl-package/ricsdl/backend/dbbackend_abc.py [changed mode: 0644->0755]
ricsdl-package/ricsdl/backend/fake_dict_db.py [changed mode: 0644->0755]
ricsdl-package/ricsdl/backend/redis.py [changed mode: 0644->0755]
ricsdl-package/ricsdl/syncstorage.py [changed mode: 0644->0755]
ricsdl-package/ricsdl/syncstorage_abc.py [changed mode: 0644->0755]
ricsdl-package/tests/backend/test_fake_dict_db.py [changed mode: 0644->0755]
ricsdl-package/tests/backend/test_redis.py [changed mode: 0644->0755]
ricsdl-package/tests/test_syncstorage.py [changed mode: 0644->0755]

index bd884a6..2e04473 100644 (file)
@@ -33,6 +33,10 @@ This document provides the release notes of the ricsdl library.
 Version history
 ---------------
 
+[2.1.0] - 2020-08-26
+
+* Add support for notifications
+
 [2.0.4] - 2020-05-13
 
 * Enhance SDL API set() function argument validation to cover also dictionary items.
@@ -136,5 +140,3 @@ Workarounds
 
 References
 ----------
-
-
old mode 100644 (file)
new mode 100755 (executable)
index 39ccf75..d32c8b4
@@ -35,6 +35,9 @@ implementation from SDL API clients, and therefore backend data storage
 technology can be changed without affecting SDL API clients. Currently, Redis
 database is used as a backend data storage solution.
 
+Notifications
+
+Notifications functionality provide SDL clients the possibility to receive notifications about data changes in SDL namespaces. SDL client receiving notifications about data changes is referred to as "subscriber", while the SDL client modifying data and publishing notifications is referred to as "publisher".
 
 Install
 -------
diff --git a/ricsdl-package/examples/notify.py b/ricsdl-package/examples/notify.py
new file mode 100755 (executable)
index 0000000..7d2f203
--- /dev/null
@@ -0,0 +1,208 @@
+# Copyright (c) 2020 Samsung Electronics
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#
+# This source code is part of the near-RT RIC (RAN Intelligent Controller)
+# platform project (RICP).
+#
+
+"""Examples on how to use Shared Data Layer (SDL) notification feature.
+
+Execution of  these examples requires:
+ * Following Redis extension commands have been installed to runtime environment:
+   - MSETPUB
+   - SETIE
+   - SETIEPUB
+   - SETNXPUB
+   - DELPUB
+   - DELIE
+   - DELIEPUB
+   Redis v4.0 or greater is required. Older versions do not support extension modules.
+   Implementation of above commands is produced by RIC DBaaS:
+   https://gerrit.o-ran-sc.org/r/admin/repos/ric-plt/dbaas
+   In official RIC deployments these commands are installed by `dbaas` service to Redis
+   container(s).
+   In development environment you may want install commands manually to pod/container, which is
+   running Redis.
+ * Following environment variables are needed to set to the pod/container where the application
+   utilizing SDL is going to be run.
+     DBAAS_SERVICE_HOST = [redis server address]
+     DBAAS_SERVICE_PORT= [redis server port]
+     DBAAS_MASTER_NAME = [master Redis sentinel name]. Needed to set only if sentinel is in use.
+     DBAAS_SERVICE_SENTINEL_PORT = [Redis sentinel port number]. Needed to set only if sentinel
+     is in use.
+"""
+
+import threading
+import time
+
+from ricsdl.syncstorage import SyncStorage
+from ricsdl.exceptions import RejectedByBackend, NotConnected, BackendError
+
+# There are two available methods for applications to handle notifications:
+#   - EVENT_LISTENER (true):
+#     - User calls sdl.start_event_listener() which will create an SDL managed
+#       event loop for handling messages.
+#   - EVENT_LISTENER (false):
+#     - User need to call sdl.handle_messages() which will return the message
+#
+# Note: In both cases, the given callback function will be executed.
+EVENT_LISTENER = True
+
+# Constants used in the examples below.
+MY_NS = 'my_ns'
+MY_CHANNEL = "my_channel"
+MY_LOCK = threading.Lock()
+
+
+def _try_func_return(func):
+    """
+    Generic wrapper function to call SDL API function and handle exceptions if they are raised.
+    """
+    try:
+        return func()
+    except RejectedByBackend as exp:
+        print(f'SDL function {func.__name__} failed: {str(exp)}')
+        # Permanent failure, just forward the exception
+        raise
+    except (NotConnected, BackendError) as exp:
+        print(f'SDL function {func.__name__} failed for a temporal error: {str(exp)}')
+        # Here we could have a retry logic
+
+
+def _try_func_callback_return(func):
+    """Generic wrapper function for testing SDL APIs with callback functions.
+
+    threading.Lock is unlocked in the callback function and threading.Lock is
+    only used to demonstrate that the callback function was called.
+    """
+    global MY_LOCK
+    MY_LOCK.acquire()
+    ret = _try_func_return(func)
+    while MY_LOCK.locked():
+        time.sleep(0.01)
+    return ret
+
+
+# Creates SDL instance. The call creates connection to the SDL database backend.
+mysdl = _try_func_return(SyncStorage)
+
+# Stores the last received channel and message
+last_cb_channel = ""
+last_cb_message = ""
+
+# Allows program to stop receive thread at the end of execution
+stop_thread = False
+
+
+def cb(channel: str, message: str):
+    """An example of function that will be called when an event is received.
+
+    This function sets last_cb_channel and last_cb_message as channel and
+    message respectively. This also unlocks the global lock variable.
+
+    Args:
+        channel: Channel where the message was received
+        message: Received message
+    """
+    global last_cb_channel, last_cb_message, MY_LOCK
+    last_cb_channel = channel
+    last_cb_message = message
+    if MY_LOCK.locked():
+        MY_LOCK.release()
+
+
+def listen_thread():
+    """An example of a listener thread that continuously calls sdl.handle_events()."""
+    global mysdl
+    global stop_thread
+    while not stop_thread:
+        message = mysdl.handle_events()
+        if message:
+            # You could process message here
+            pass
+        time.sleep(0.001)
+
+
+# As mentioned above, there are two available methods for applications to
+# handle notifications
+if EVENT_LISTENER:
+    _try_func_return(mysdl.start_event_listener)
+else:
+    thread = threading.Thread(target=listen_thread)
+    thread.start()
+
+# Subscribe to MY_CHANNEL. We expect that anytime we receive a message in the
+# channel, cb function will be called.
+_try_func_return(lambda: mysdl.subscribe_channel(MY_NS, cb, MY_CHANNEL))
+
+# Sets a value 'my_value' for a key 'my_key' under given namespace. Note that value
+# type must be bytes and multiple key values can be set in one set function call.
+_try_func_callback_return(
+    lambda: mysdl.set_and_publish(MY_NS, {MY_CHANNEL: "SET PUBLISH"}, {'my_key': b'my_value'}))
+assert last_cb_channel == MY_CHANNEL and last_cb_message == "SET PUBLISH"
+
+# Sets a value 'my_value2' for a key 'my_key' under given namespace only if the old value is
+# 'my_value'.
+was_set = _try_func_callback_return(lambda: mysdl.set_if_and_publish(
+    MY_NS, {MY_CHANNEL: "SET IF PUBLISH"}, 'my_key', b'my_value', b'my_value2'))
+assert was_set is True
+assert last_cb_channel == MY_CHANNEL and last_cb_message == "SET IF PUBLISH"
+# Try again. This time value 'my_value2' won't be set, because the key has already 'my_value2'
+# value. Callback function will not be called here.
+was_set = _try_func_return(lambda: mysdl.set_if_and_publish(MY_NS, {MY_CHANNEL: "SET IF PUBLISH"},
+                                                            'my_key', b'my_value', b'my_value2'))
+assert was_set is False
+
+# Sets a value 'my_value' for a key 'my_key2' under given namespace only if the key doesn't exist.
+# Note that value types must be bytes.
+was_set = _try_func_callback_return(lambda: mysdl.set_if_not_exists_and_publish(
+    MY_NS, {MY_CHANNEL: "SET IF NOT PUBLISH"}, 'my_key2', b'my_value'))
+assert was_set is True
+assert last_cb_channel == MY_CHANNEL and last_cb_message == "SET IF NOT PUBLISH"
+# Try again. This time the key 'my_key2' already exists. Callback function will not be called here.
+was_set = _try_func_return(lambda: mysdl.set_if_not_exists_and_publish(
+    MY_NS, {MY_CHANNEL: "SET IF NOT PUBLISH"}, 'my_key2', b'my_value'))
+assert was_set is False
+
+# Removes a key 'my_key' under given namespace.
+_try_func_callback_return(
+    lambda: mysdl.remove_and_publish(MY_NS, {MY_CHANNEL: "REMOVE PUBLISH"}, 'my_key'))
+my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, 'my_key'))
+assert my_ret_dict == {}
+assert last_cb_channel == MY_CHANNEL and last_cb_message == "REMOVE PUBLISH"
+
+# Removes a key 'my_key' under given namespace only if the old value is 'my_value'.
+was_removed = _try_func_callback_return(lambda: mysdl.remove_if_and_publish(
+    MY_NS, {MY_CHANNEL: "REMOVE IF PUBLISH"}, 'my_key2', b'my_value'))
+assert was_removed is True
+assert last_cb_channel == MY_CHANNEL and last_cb_message == "REMOVE IF PUBLISH"
+# Try again to remove not anymore existing key 'my_key'. Callback function will not be called here.
+was_removed = _try_func_return(lambda: mysdl.remove_if_and_publish(
+    MY_NS, {MY_CHANNEL: "REMOVE IF PUBLISH"}, 'my_key2', b'my_value'))
+assert was_removed is False
+
+# Removes all the keys under given namespace.
+_try_func_return(lambda: mysdl.set(MY_NS, {'my_key': b'something'}))
+my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, {'my_key'}))
+assert my_ret_dict != {}
+
+_try_func_callback_return(
+    lambda: mysdl.remove_all_and_publish(MY_NS, {MY_CHANNEL: "REMOVE ALL PUBLISH"}))
+my_ret_dict = _try_func_return(lambda: mysdl.get(MY_NS, {'my_key'}))
+assert my_ret_dict == {}
+assert last_cb_channel == MY_CHANNEL and last_cb_message == "REMOVE ALL PUBLISH"
+
+stop_thread = True
+mysdl.close()
index b4e3750..df7e9c2 100644 (file)
@@ -31,7 +31,7 @@ from .exceptions import (
 )
 
 
-__version__ = '2.0.4'
+__version__ = '2.1.0'
 
 
 __all__ = [
old mode 100644 (file)
new mode 100755 (executable)
index 4f31554..cdf0311
@@ -21,7 +21,7 @@
 
 """The module provides Shared Data Layer (SDL) database backend interface."""
 
-from typing import (Dict, Set, List, Union)
+from typing import (Callable, Dict, Set, List, Optional, Tuple, Union)
 from abc import ABC, abstractmethod
 
 
@@ -114,6 +114,83 @@ class DbBackendAbc(ABC):
         """Return the number of members in a group under a namespace in database."""
         pass
 
+    @abstractmethod
+    def set_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
+                        data_map: Dict[str, bytes]) -> None:
+        """Publish event to channel after writing data."""
+        pass
+
+    @abstractmethod
+    def set_if_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]], key: str,
+                           old_data: bytes, new_data: bytes) -> bool:
+        """
+        Publish event to channel after writing key value to database under a namespace
+        if the old value is expected one.
+        """
+        pass
+
+    @abstractmethod
+    def set_if_not_exists_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
+                                      key: str, data: bytes) -> bool:
+        """"
+        Publish event to channel after writing key value to database under a namespace if
+        key doesn't exist.
+        """
+        pass
+
+    @abstractmethod
+    def remove_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
+                           keys: List[str]) -> None:
+        """Publish event to channel after removing data."""
+        pass
+
+    @abstractmethod
+    def remove_if_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]], key: str,
+                              data: bytes) -> bool:
+        """
+        Publish event to channel after removing key and its data from database if the
+        current data value is expected one.
+        """
+        pass
+
+    @abstractmethod
+    def remove_all_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]]) -> None:
+        """
+        Publish event to channel after removing all keys in namespace.
+        """
+        pass
+
+    @abstractmethod
+    def subscribe_channel(self, ns: str, cb: Callable[[str, str], None],
+                          channels: List[str]) -> None:
+        """
+        This takes a callback function and one or many channels to be subscribed.
+        When an event is received for the given channel, the given callback function
+        shall be called with channel and notifications as parameter.
+        """
+        pass
+
+    @abstractmethod
+    def unsubscribe_channel(self, ns: str, channels: List[str]) -> None:
+        """Unsubscribes from channel and removes set callback function."""
+        pass
+
+    @abstractmethod
+    def start_event_listener(self) -> None:
+        """
+        start_event_listener creates an event loop in a separate thread for handling
+        notifications from subscriptions.
+        """
+        pass
+
+    @abstractmethod
+    def handle_events(self) -> Optional[Tuple[str, str]]:
+        """
+        handle_events is a non-blocking function that returns a tuple containing channel
+        name and message received from notification.
+        """
+        pass
+
 
 class DbBackendLockAbc(ABC):
     """
old mode 100644 (file)
new mode 100755 (executable)
index b6c84a2..5a49f10
 
 """The module provides fake implementation of Shared Data Layer (SDL) database backend interface."""
 import fnmatch
-from typing import (Dict, Set, List, Union)
+from typing import (Callable, Dict, Set, List, Optional, Tuple, Union)
+import queue
+import threading
+import time
 from ricsdl.configuration import _Configuration
 from .dbbackend_abc import DbBackendAbc
 from .dbbackend_abc import DbBackendLockAbc
@@ -43,6 +46,10 @@ class FakeDictBackend(DbBackendAbc):
         super().__init__()
         self._db = {}
         self._configuration = configuration
+        self._queue = queue.Queue(1)
+        self._channel_cbs = {}
+        self._listen_thread = threading.Thread(target=self._listen, daemon=True)
+        self._run_in_thread = False
 
     def __str__(self):
         return str(
@@ -138,6 +145,93 @@ class FakeDictBackend(DbBackendAbc):
             return 0
         return len(self._db[group])
 
+    def set_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
+                        data_map: Dict[str, bytes]) -> None:
+        self._db.update(data_map.copy())
+        for channel, events in channels_and_events.items():
+            for event in events:
+                self._queue.put((channel, event))
+
+    def set_if_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]], key: str,
+                           old_data: bytes, new_data: bytes) -> bool:
+        if self.set_if(ns, key, old_data, new_data):
+            for channel, events in channels_and_events.items():
+                for event in events:
+                    self._queue.put((channel, event))
+            return True
+        return False
+
+    def set_if_not_exists_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
+                                      key: str, data: bytes) -> bool:
+        if self.set_if_not_exists(ns, key, data):
+            for channel, events in channels_and_events.items():
+                for event in events:
+                    self._queue.put((channel, event))
+            return True
+        return False
+
+    def remove_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
+                           keys: List[str]) -> None:
+        for key in keys:
+            self._db.pop(key, None)
+        for channel, events in channels_and_events.items():
+            for event in events:
+                self._queue.put((channel, event))
+
+    def remove_if_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]], key: str,
+                              data: bytes) -> bool:
+        if self.remove_if(ns, key, data):
+            for channel, events in channels_and_events.items():
+                for event in events:
+                    self._queue.put((channel, event))
+            return True
+        return False
+
+    def remove_all_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]]) -> None:
+        # Note: Since fake db has only one namespace, this deletes all keys
+        self._db.clear()
+        for channel, events in channels_and_events.items():
+            for event in events:
+                self._queue.put((channel, event))
+
+    def subscribe_channel(self, ns: str, cb: Callable[[str, str], None],
+                          channels: List[str]) -> None:
+        for channel in channels:
+            self._channel_cbs[channel] = cb
+            if not self._listen_thread.is_alive() and self._run_in_thread:
+                self._listen_thread.start()
+
+    def _listen(self):
+        while True:
+            message = self._queue.get()
+            cb = self._channel_cbs.get(message[0], None)
+            if cb:
+                cb(message[0], message[1])
+            time.sleep(0.001)
+
+    def unsubscribe_channel(self, ns: str, channels: List[str]) -> None:
+        for channel in channels:
+            self._channel_cbs.pop(channel, None)
+
+    def start_event_listener(self) -> None:
+        if self._listen_thread.is_alive():
+            raise Exception("Event loop already started")
+        if len(self._channel_cbs) > 0:
+            self._listen_thread.start()
+        self._run_in_thread = True
+
+    def handle_events(self) -> Optional[Tuple[str, str]]:
+        if self._listen_thread.is_alive() or self._run_in_thread:
+            raise Exception("Event loop already started")
+        try:
+            message = self._queue.get(block=False)
+        except queue.Empty:
+            return None
+        cb = self._channel_cbs.get(message[0], None)
+        if cb:
+            cb(message[0], message[1])
+        return (message[0], message[1])
+
 
 class FakeDictBackendLock(DbBackendLockAbc):
     """
old mode 100644 (file)
new mode 100755 (executable)
index 3364497..c67be2d
@@ -21,7 +21,9 @@
 
 """The module provides implementation of Shared Data Layer (SDL) database backend interface."""
 import contextlib
-from typing import (Dict, Set, List, Union)
+import threading
+from typing import (Callable, Dict, Set, List, Optional, Tuple, Union)
+import redis
 from redis import Redis
 from redis.sentinel import Sentinel
 from redis.lock import Lock
@@ -53,6 +55,86 @@ def _map_to_sdl_exception():
                            format(str(exc))) from exc
 
 
+class PubSub(redis.client.PubSub):
+    def handle_message(self, response, ignore_subscribe_messages=False):
+        """
+        Parses a pub/sub message. If the channel or pattern was subscribed to
+        with a message handler, the handler is invoked instead of a parsed
+        message being returned.
+
+        Adapted from: https://github.com/andymccurdy/redis-py/blob/master/redis/client.py
+        """
+        message_type = nativestr(response[0])
+        if message_type == 'pmessage':
+            message = {
+                'type': message_type,
+                'pattern': response[1],
+                'channel': response[2],
+                'data': response[3]
+            }
+        elif message_type == 'pong':
+            message = {
+                'type': message_type,
+                'pattern': None,
+                'channel': None,
+                'data': response[1]
+            }
+        else:
+            message = {
+                'type': message_type,
+                'pattern': None,
+                'channel': response[1],
+                'data': response[2]
+            }
+
+        # if this is an unsubscribe message, remove it from memory
+        if message_type in self.UNSUBSCRIBE_MESSAGE_TYPES:
+            if message_type == 'punsubscribe':
+                pattern = response[1]
+                if pattern in self.pending_unsubscribe_patterns:
+                    self.pending_unsubscribe_patterns.remove(pattern)
+                    self.patterns.pop(pattern, None)
+            else:
+                channel = response[1]
+                if channel in self.pending_unsubscribe_channels:
+                    self.pending_unsubscribe_channels.remove(channel)
+                    self.channels.pop(channel, None)
+
+        if message_type in self.PUBLISH_MESSAGE_TYPES:
+            # if there's a message handler, invoke it
+            if message_type == 'pmessage':
+                handler = self.patterns.get(message['pattern'], None)
+            else:
+                handler = self.channels.get(message['channel'], None)
+            if handler:
+                # Need to send only channel and notification instead of raw
+                # message
+                message_channel = self._strip_ns_from_bin_key('', message['channel'])
+                message_data = message['data'].decode('utf-8')
+                handler(message_channel, message_data)
+                return message_channel, message_data
+        elif message_type != 'pong':
+            # this is a subscribe/unsubscribe message. ignore if we don't
+            # want them
+            if ignore_subscribe_messages or self.ignore_subscribe_messages:
+                return None
+
+        return message
+
+    @classmethod
+    def _strip_ns_from_bin_key(cls, ns: str, nskey: bytes) -> str:
+        try:
+            redis_key = nskey.decode('utf-8')
+        except UnicodeDecodeError as exc:
+            msg = u'Namespace %s key conversion to string failed: %s' % (ns, str(exc))
+            raise RejectedByBackend(msg)
+        nskey = redis_key.split(',', 1)
+        if len(nskey) != 2:
+            msg = u'Namespace %s key:%s has no namespace prefix' % (ns, redis_key)
+            raise RejectedByBackend(msg)
+        return nskey[1]
+
+
 class RedisBackend(DbBackendAbc):
     """
     A class providing an implementation of database backend of Shared Data Layer (SDL), when
@@ -79,6 +161,10 @@ class RedisBackend(DbBackendAbc):
         self.__redis.set_response_callback('SETIE', lambda r: r and nativestr(r) == 'OK' or False)
         self.__redis.set_response_callback('DELIE', lambda r: r and int(r) == 1 or False)
 
+        self.__redis_pubsub = PubSub(self.__redis.connection_pool, ignore_subscribe_messages=True)
+        self.pubsub_thread = threading.Thread(target=None)
+        self._run_in_thread = False
+
     def __del__(self):
         self.close()
 
@@ -178,6 +264,105 @@ class RedisBackend(DbBackendAbc):
         with _map_to_sdl_exception():
             return self.__redis.scard(db_key)
 
+    def set_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
+                        data_map: Dict[str, bytes]) -> None:
+        db_data_map = self._add_data_map_ns_prefix(ns, data_map)
+        channels_and_events_prepared = []
+        total_events = 0
+        channels_and_events_prepared, total_events = self._prepare_channels(ns, channels_and_events)
+        with _map_to_sdl_exception():
+            return self.__redis.execute_command(
+                "MSETMPUB",
+                len(db_data_map),
+                total_events,
+                *[val for data in db_data_map.items() for val in data],
+                *channels_and_events_prepared,
+            )
+
+    def set_if_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]], key: str,
+                           old_data: bytes, new_data: bytes) -> bool:
+        db_key = self._add_key_ns_prefix(ns, key)
+        channels_and_events_prepared = []
+        channels_and_events_prepared, _ = self._prepare_channels(ns, channels_and_events)
+        with _map_to_sdl_exception():
+            ret = self.__redis.execute_command("SETIEPUB", db_key, new_data, old_data,
+                                               *channels_and_events_prepared)
+            return ret == b"OK"
+
+    def set_if_not_exists_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
+                                      key: str, data: bytes) -> bool:
+        db_key = self._add_key_ns_prefix(ns, key)
+        channels_and_events_prepared, _ = self._prepare_channels(ns, channels_and_events)
+        with _map_to_sdl_exception():
+            ret = self.__redis.execute_command("SETNXPUB", db_key, data,
+                                               *channels_and_events_prepared)
+            return ret == b"OK"
+
+    def remove_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
+                           keys: List[str]) -> None:
+        db_keys = self._add_keys_ns_prefix(ns, keys)
+        channels_and_events_prepared, total_events = self._prepare_channels(ns, channels_and_events)
+        with _map_to_sdl_exception():
+            return self.__redis.execute_command(
+                "DELMPUB",
+                len(db_keys),
+                total_events,
+                *db_keys,
+                *channels_and_events_prepared,
+            )
+
+    def remove_if_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]], key: str,
+                              data: bytes) -> bool:
+        db_key = self._add_key_ns_prefix(ns, key)
+        channels_and_events_prepared, _ = self._prepare_channels(ns, channels_and_events)
+        with _map_to_sdl_exception():
+            ret = self.__redis.execute_command("DELIEPUB", db_key, data,
+                                               *channels_and_events_prepared)
+            return bool(ret)
+
+    def remove_all_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]]) -> None:
+        keys = self.__redis.keys(self._add_key_ns_prefix(ns, "*"))
+        channels_and_events_prepared, total_events = self._prepare_channels(ns, channels_and_events)
+        with _map_to_sdl_exception():
+            return self.__redis.execute_command(
+                "DELMPUB",
+                len(keys),
+                total_events,
+                *keys,
+                *channels_and_events_prepared,
+            )
+
+    def subscribe_channel(self, ns: str, cb: Callable[[str, str], None],
+                          channels: List[str]) -> None:
+        channels = self._add_keys_ns_prefix(ns, channels)
+        for channel in channels:
+            with _map_to_sdl_exception():
+                self.__redis_pubsub.subscribe(**{channel: cb})
+                if not self.pubsub_thread.is_alive() and self._run_in_thread:
+                    self.pubsub_thread = self.__redis_pubsub.run_in_thread(sleep_time=0.001,
+                                                                           daemon=True)
+
+    def unsubscribe_channel(self, ns: str, channels: List[str]) -> None:
+        channels = self._add_keys_ns_prefix(ns, channels)
+        for channel in channels:
+            with _map_to_sdl_exception():
+                self.__redis_pubsub.unsubscribe(channel)
+
+    def start_event_listener(self) -> None:
+        if self.pubsub_thread.is_alive():
+            raise RejectedByBackend("Event loop already started")
+        if len(self.__redis.pubsub_channels()) > 0:
+            self.pubsub_thread = self.__redis_pubsub.run_in_thread(sleep_time=0.001, daemon=True)
+        self._run_in_thread = True
+
+    def handle_events(self) -> Optional[Tuple[str, str]]:
+        if self.pubsub_thread.is_alive() or self._run_in_thread:
+            raise RejectedByBackend("Event loop already started")
+        try:
+            return self.__redis_pubsub.get_message(ignore_subscribe_messages=True)
+        except RuntimeError:
+            return None
+
     @classmethod
     def _add_key_ns_prefix(cls, ns: str, key: str):
         return '{' + ns + '},' + key
@@ -212,6 +397,18 @@ class RedisBackend(DbBackendAbc):
             ret_keys.append(nskey[1])
         return ret_keys
 
+    @classmethod
+    def _prepare_channels(cls, ns: str, channels_and_events: Dict[str,
+                                                                  List[str]]) -> Tuple[List, int]:
+        channels_and_events_prepared = []
+        total_events = 0
+        for channel, events in channels_and_events.items():
+            for event in events:
+                channels_and_events_prepared.append(cls._add_key_ns_prefix(ns, channel))
+                channels_and_events_prepared.append(event)
+                total_events += 1
+        return channels_and_events_prepared, total_events
+
     def get_redis_connection(self):
         """Return existing Redis database connection."""
         return self.__redis
old mode 100644 (file)
new mode 100755 (executable)
index 0a2d9b3..48b5e3d
@@ -20,7 +20,8 @@
 
 """The module provides implementation of the syncronous Shared Data Layer (SDL) interface."""
 import builtins
-from typing import (Dict, Set, List, Union)
+import inspect
+from typing import (Any, Callable, Dict, Set, List, Optional, Tuple, Union)
 from ricsdl.configuration import _Configuration
 from ricsdl.syncstorage_abc import (SyncStorageAbc, SyncLockAbc)
 import ricsdl.backend
@@ -206,6 +207,81 @@ class SyncStorage(SyncStorageAbc):
     def group_size(self, ns: str, group: str) -> int:
         return self.__dbbackend.group_size(ns, group)
 
+    @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, data_map=dict)
+    def set_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
+                        data_map: Dict[str, bytes]) -> None:
+        self._validate_key_value_dict(data_map)
+        self._validate_channels_events(channels_and_events)
+        for channel, events in channels_and_events.items():
+            channels_and_events[channel] = [events] if isinstance(events, str) else events
+        self.__dbbackend.set_and_publish(ns, channels_and_events, data_map)
+
+    @func_arg_checker(SdlTypeError,
+                      1,
+                      ns=str,
+                      channels_and_events=dict,
+                      key=str,
+                      old_data=bytes,
+                      new_data=bytes)
+    def set_if_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
+                           key: str, old_data: bytes, new_data: bytes) -> bool:
+        self._validate_channels_events(channels_and_events)
+        for channel, events in channels_and_events.items():
+            channels_and_events[channel] = [events] if isinstance(events, str) else events
+        return self.__dbbackend.set_if_and_publish(ns, channels_and_events, key, old_data, new_data)
+
+    @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, key=str, data=bytes)
+    def set_if_not_exists_and_publish(self, ns: str,
+                                      channels_and_events: Dict[str, Union[str, List[str]]],
+                                      key: str, data: bytes) -> bool:
+        self._validate_channels_events(channels_and_events)
+        for channel, events in channels_and_events.items():
+            channels_and_events[channel] = [events] if isinstance(events, str) else events
+        return self.__dbbackend.set_if_not_exists_and_publish(ns, channels_and_events, key, data)
+
+    @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, keys=(str, builtins.set))
+    def remove_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
+                           keys: Union[str, Set[str]]) -> None:
+        self._validate_channels_events(channels_and_events)
+        for channel, events in channels_and_events.items():
+            channels_and_events[channel] = [events] if isinstance(events, str) else events
+        keys = [keys] if isinstance(keys, str) else list(keys)
+        self.__dbbackend.remove_and_publish(ns, channels_and_events, keys)
+
+    @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict, key=str, data=bytes)
+    def remove_if_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
+                              key: str, data: bytes) -> bool:
+        self._validate_channels_events(channels_and_events)
+        for channel, events in channels_and_events.items():
+            channels_and_events[channel] = [events] if isinstance(events, str) else events
+        return self.__dbbackend.remove_if_and_publish(ns, channels_and_events, key, data)
+
+    @func_arg_checker(SdlTypeError, 1, ns=str, channels_and_events=dict)
+    def remove_all_and_publish(self, ns: str,
+                               channels_and_events: Dict[str, Union[str, List[str]]]) -> None:
+        self._validate_channels_events(channels_and_events)
+        for channel, events in channels_and_events.items():
+            channels_and_events[channel] = [events] if isinstance(events, str) else events
+        self.__dbbackend.remove_all_and_publish(ns, channels_and_events)
+
+    @func_arg_checker(SdlTypeError, 1, ns=str, cb=Callable, channels=(str, builtins.set))
+    def subscribe_channel(self, ns: str, cb: Callable[[str, str], None],
+                          channels: Union[str, Set[str]]) -> None:
+        self._validate_callback(cb)
+        channels = [channels] if isinstance(channels, str) else list(channels)
+        self.__dbbackend.subscribe_channel(ns, cb, channels)
+
+    @func_arg_checker(SdlTypeError, 1, ns=str, channels=(str, builtins.set))
+    def unsubscribe_channel(self, ns: str, channels: Union[str, Set[str]]) -> None:
+        channels = [channels] if isinstance(channels, str) else list(channels)
+        self.__dbbackend.unsubscribe_channel(ns, channels)
+
+    def start_event_listener(self) -> None:
+        self.__dbbackend.start_event_listener()
+
+    def handle_events(self) -> Optional[Tuple[str, str]]:
+        return self.__dbbackend.handle_events()
+
     @func_arg_checker(SdlTypeError, 1, ns=str, resource=str, expiration=(int, float))
     def get_lock_resource(self, ns: str, resource: str, expiration: Union[int, float]) -> SyncLock:
         return SyncLock(ns, resource, expiration, self)
@@ -225,3 +301,25 @@ class SyncStorage(SyncStorageAbc):
                 raise SdlTypeError(r"Wrong dict key type: {}={}. Must be: str".format(k, type(k)))
             if not isinstance(v, bytes):
                 raise SdlTypeError(r"Wrong dict value type: {}={}. Must be: bytes".format(v, type(v)))
+
+    @classmethod
+    def _validate_channels_events(cls, channels_and_events: Dict[Any, Any]):
+        for channel, events in channels_and_events.items():
+            if not isinstance(channel, str):
+                raise SdlTypeError(r"Wrong channel type: {}={}. Must be: str".format(
+                    channel, type(channel)))
+            if not isinstance(events, (list, str)):
+                raise SdlTypeError(r"Wrong event type: {}={}. Must be: str".format(
+                    events, type(events)))
+            if isinstance(events, list):
+                for event in events:
+                    if not isinstance(event, str):
+                        raise SdlTypeError(r"Wrong event type: {}={}. Must be: str".format(
+                            events, type(events)))
+
+    @classmethod
+    def _validate_callback(cls, cb):
+        param_len = len(inspect.signature(cb).parameters)
+        if param_len != 2:
+            raise SdlTypeError(
+                f"Callback function should take 2 positional argument but {param_len} were given")
old mode 100644 (file)
new mode 100755 (executable)
index c5b15b3..2c58e37
@@ -20,7 +20,7 @@
 
 
 """The module provides synchronous Shared Data Layer (SDL) interface."""
-from typing import (Dict, Set, List, Union)
+from typing import (Callable, Dict, Set, List, Optional, Tuple, Union)
 from abc import ABC, abstractmethod
 from ricsdl.exceptions import (
     RejectedByBackend
@@ -650,6 +650,282 @@ class SyncStorageAbc(ABC):
         """
         pass
 
+    @abstractmethod
+    def set_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
+                        data_map: Dict[str, bytes]) -> None:
+        """
+        Publish event to channel after writing data.
+
+        set_and_publish function writes data to shared data layer storage and sends an
+        event to a channel. Writing is done atomically, i.e. all succeeds or fails.
+        Data to be written is given as key-value pairs. Several key-value pairs can be
+        written with one call.
+        The key is expected to be string whereas value is a byte string.
+
+        If data was set successfully, an event is sent to a channel.
+        It is possible to send several events to several channels by giving a list of
+        events
+        E.g. {"channel1": ["event1", "event3"], "channel2": ["event2"]}
+        will send event1 and event3 to channel1 and event2 to channel2.
+
+        Args:
+            ns: Namespace under which this operation is targeted.
+            channels_and_events: Channel to publish if data was set successfully.
+            data_map: Data to be written.
+
+        Returns:
+            None
+
+        Raises:
+            SdlTypeError: If function's argument is of an inappropriate type.
+            NotConnected: If SDL is not connected to the backend data storage.
+            RejectedByBackend: If backend data storage rejects the request.
+            BackendError: If the backend data storage fails to process the request.
+        """
+        pass
+
+    @abstractmethod
+    def set_if_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
+                           key: str, old_data: bytes, new_data: bytes) -> bool:
+        """
+        Publish event to channel after conditionally modifying the value of a key if the
+        current value in data storage matches the user's last known value.
+
+        set_if_and_publish atomically replaces existing data with new_data in SDL if data
+        matches the old_data. If replace was done successfully, true will be returned.
+        Also, if publishing was successful, an event is published to a given channel.
+
+        Args:
+            ns (str): Namespace under which this operation is targeted.
+            channels_and_events (dict): Channel to publish if data was set successfully.
+            key (str): Key for which data modification will be executed.
+            old_data (bytes): Last known data.
+            new_data (bytes): Data to be written.
+
+        Returns:
+            bool: True for successful modification, false if the user's last known data did not
+                  match the current value in data storage.
+
+        Raises:
+            SdlTypeError: If function's argument is of an inappropriate type.
+            NotConnected: If SDL is not connected to the backend data storage.
+            RejectedByBackend: If backend data storage rejects the request.
+            BackendError: If the backend data storage fails to process the request.
+        """
+        pass
+
+    @abstractmethod
+    def set_if_not_exists_and_publish(self, ns: str,
+                                      channels_and_events: Dict[str, Union[str, List[str]]],
+                                      key: str, data: bytes) -> bool:
+        """
+        Publish event to channel after writing data to SDL storage if key does not exist.
+
+        set_if_not_exists_and_publish conditionally sets the value of a key. If key
+        already exists in SDL, then it's value is not changed. Checking the key existence
+        and potential set operation is done atomically. If the set operation was done
+        successfully, an event is published to a given channel.
+
+        Args:
+            ns (str): Namespace under which this operation is targeted.
+            channels_and_events (dict): Channel to publish if data was set successfully.
+            key (str): Key to be set.
+            data (bytes): Data to be written.
+
+        Returns:
+            bool: True if key didn't exist yet and set operation was executed, false if key already
+                  existed and thus its value was left untouched.
+
+        Raises:
+            SdlTypeError: If function's argument is of an inappropriate type.
+            NotConnected: If SDL is not connected to the backend data storage.
+            RejectedByBackend: If backend data storage rejects the request.
+            BackendError: If the backend data storage fails to process the request.
+        """
+        pass
+
+    @abstractmethod
+    def remove_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
+                           keys: Union[str, Set[str]]) -> None:
+        """
+        Publish event to channel after removing data.
+
+        remove_and_publish removes data from SDL. Operation is done atomically, i.e.
+        either all succeeds or fails.
+        Trying to remove a nonexisting key is not considered as an error.
+        An event is published into a given channel if remove operation is successful and
+        at least one key is removed (if several keys given). If the given key(s) doesn't
+        exist when trying to remove, no event is published.
+
+        Args:
+            ns: Namespace under which this operation is targeted.
+            channels_and_events: Channel to publish if data was removed successfully.
+            keys: One key or multiple keys, which data is to be removed.
+
+        Returns:
+            None
+
+        Raises:
+            SdlTypeError: If function's argument is of an inappropriate type.
+            NotConnected: If SDL is not connected to the backend data storage.
+            RejectedByBackend: If backend data storage rejects the request.
+            BackendError: If the backend data storage fails to process the request.
+        """
+        pass
+
+    @abstractmethod
+    def remove_if_and_publish(self, ns: str, channels_and_events: Dict[str, Union[str, List[str]]],
+                              key: str, data: bytes) -> bool:
+        """
+        Publish event to channel after removing key and its data from database if the
+        current data value is expected one.
+
+        remove_if_and_publish removes data from SDL conditionally, and if remove was done
+        successfully, a given event is published to channel. If existing data matches
+        given data, key and data are removed from SDL. If remove was done successfully,
+        true is returned.
+
+        Args:
+            ns (str): Namespace under which this operation is targeted.
+            channels_and_events (dict): Channel to publish if data was removed successfully.
+            key (str): Key, which data is to be removed.
+            data (bytes): Last known value of data
+
+        Returns:
+            bool: True if successful removal, false if the user's last known data did not match the
+                  current value in data storage.
+
+        Raises:
+            SdlTypeError: If function's argument is of an inappropriate type.
+            NotConnected: If SDL is not connected to the backend data storage.
+            RejectedByBackend: If backend data storage rejects the request.
+            BackendError: If the backend data storage fails to process the request.
+        """
+        pass
+
+    @abstractmethod
+    def remove_all_and_publish(self, ns: str,
+                               channels_and_events: Dict[str, Union[str, List[str]]]) -> None:
+        """
+        Publish event to channel after removing all keys under the namespace.
+
+        remove_all_and_publish removes all keys under the namespace and if successful, it
+        will publish an event to given channel. This operation is not atomic, thus it is
+        not guaranteed that all keys are removed.
+
+        Args:
+            ns (str): Namespace under which this operation is targeted.
+            channels_and_events (dict): Channel to publish if data was removed successfully.
+
+        Returns:
+            None
+
+        Raises:
+            SdlTypeError: If function's argument is of an inappropriate type.
+            NotConnected: If SDL is not connected to the backend data storage.
+            RejectedByBackend: If backend data storage rejects the request.
+            BackendError: If the backend data storage fails to process the request.
+        """
+        pass
+
+    @abstractmethod
+    def subscribe_channel(self, ns: str, cb: Callable[[str, str], None],
+                          channels: Union[str, Set[str]]) -> None:
+        """
+        Subscribes the client to the specified channels.
+
+        subscribe_channel lets you to subscribe for events on a given channels.
+        SDL notifications are events that are published on a specific channel.
+        Both the channel and events are defined by the entity that is publishing
+        the events.
+
+        When subscribing for a channel, a callback function is given as a parameter.
+        Whenever a notification is received from a channel, this callback is called
+        with channel and notifications as parameter. A call to subscribe_channel function
+        returns immediately, callbacks will be called synchronously from a dedicated
+        thread.
+
+        It is possible to subscribe to different channels using different callbacks. In
+        this case simply use subscribe_channel function separately for each channel.
+
+        When receiving events in callback routine, it is a good practice to return from
+        callback as quickly as possible. Also it should be noted that in case of several
+        events received from different channels, callbacks are called in series one by
+        one.
+
+        Args:
+            ns: Namespace under which this operation is targeted.
+            cb: A function that is called when an event on channel is received.
+            channels: One channel or multiple channels to be subscribed.
+
+        Returns:
+            None
+
+        Raises:
+            SdlTypeError: If function's argument is of an inappropriate type.
+            NotConnected: If SDL is not connected to the backend data storage.
+            RejectedByBackend: If backend data storage rejects the request.
+            BackendError: If the backend data storage fails to process the request.
+        """
+        pass
+
+    @abstractmethod
+    def unsubscribe_channel(self, ns: str, channels: Union[str, Set[str]]) -> None:
+        """
+        unsubscribe_channel removes subscription from one or several channels.
+
+        Args:
+            ns: Namespace under which this operation is targeted.
+            channels: One channel or multiple channels to be unsubscribed.
+
+        Returns:
+            None
+
+        Raises:
+            SdlTypeError: If function's argument is of an inappropriate type.
+            NotConnected: If SDL is not connected to the backend data storage.
+            RejectedByBackend: If backend data storage rejects the request.
+            BackendError: If the backend data storage fails to process the request.
+        """
+        pass
+
+    @abstractmethod
+    def start_event_listener(self) -> None:
+        """
+        start_event_listener creates an event loop in a separate thread for handling
+        events from subscriptions. The registered callback function will be called
+        when an event is received.
+
+        Raises:
+            SdlTypeError: If function's argument is of an inappropriate type.
+            NotConnected: If SDL is not connected to the backend data storage.
+            RejectedByBackend: If backend data storage rejects the request.
+            BackendError: If the backend data storage fails to process the request.
+        """
+        pass
+
+    @abstractmethod
+    def handle_events(self) -> Optional[Tuple[str, str]]:
+        """
+        handle_events is a non-blocking function that returns a tuple containing channel
+        name and message received from an event. The registered callback function will
+        still be called when an event is received.
+
+        This function is called if SDL user decides to handle notifications in its own
+        event loop. Calling this function after start_event_listener raises an exception.
+        If there are no notifications, these returns None.
+
+        Returns:
+            Tuple: (channel: str, message: str)
+
+        Raises:
+            SdlTypeError: If function's argument is of an inappropriate type.
+            NotConnected: If SDL is not connected to the backend data storage.
+            RejectedByBackend: If backend data storage rejects the request.
+            BackendError: If the backend data storage fails to process the request.
+        """
+        pass
+
     @abstractmethod
     def get_lock_resource(self, ns: str, resource: str,
                           expiration: Union[int, float]) -> SyncLockAbc:
old mode 100644 (file)
new mode 100755 (executable)
index 3b072e5..a8c2d12
@@ -18,7 +18,8 @@
 # platform project (RICP).
 #
 
-
+import queue
+import time
 from unittest.mock import Mock
 import pytest
 import ricsdl.backend
@@ -44,6 +45,8 @@ def fake_dict_backend_fixture(request):
     request.cls.groupmembers = set([b'm1', b'm2'])
     request.cls.new_groupmembers = set(b'm3')
     request.cls.all_groupmembers = request.cls.groupmembers | request.cls.new_groupmembers
+    request.cls.channels = ['abs', 'gma']
+    request.cls.channels_and_events = {'abs': ['cbn']}
 
     request.cls.configuration = Mock()
     mock_conf_params = _Configuration.Params(db_host=None,
@@ -171,6 +174,148 @@ class TestFakeDictBackend:
     def test_fake_dict_backend_object_string_representation(self):
         assert str(self.db) == str({'DB type': 'FAKE DB'})
 
+    def test_set_and_publish_function_success(self):
+        self.db.set_and_publish(self.ns, self.channels_and_events, self.dm)
+        ret = self.db.get(self.ns, self.keys)
+        assert ret == self.dm
+        assert self.db._queue.qsize() == 1
+
+    def test_set_if_and_publish_success(self):
+        self.db.set(self.ns, self.dm)
+        ret = self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
+                                         self.new_data)
+        assert ret is True
+        ret = self.db.get(self.ns, self.keys)
+        assert ret == self.new_dm
+        assert self.db._queue.qsize() == 1
+
+    def test_set_if_and_publish_returns_false_if_existing_key_value_not_expected(self):
+        self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
+                                   self.new_data)
+        self.db.set(self.ns, self.new_dm)
+        ret = self.db.set_if(self.ns, self.key, self.old_data, self.new_data)
+        assert ret is False
+        assert self.db._queue.qsize() == 0
+
+    def test_set_if_not_exists_and_publish_success(self):
+        ret = self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
+                                                    self.new_data)
+        assert ret is True
+        ret = self.db.get(self.ns, self.keys)
+        assert ret == {self.key: self.new_data}
+        assert self.db._queue.qsize() == 1
+
+    def test_set_if_not_exists_and_publish_returns_false_if_key_already_exists(self):
+        self.db.set(self.ns, self.dm)
+        ret = self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
+                                                    self.new_data)
+        assert ret is False
+        assert self.db._queue.qsize() == 0
+
+    def test_remove_and_publish_function_success(self):
+        self.db.set(self.ns, self.dm)
+        self.db.remove_and_publish(self.ns, self.channels_and_events, self.keys)
+        ret = self.db.get(self.ns, self.keys)
+        assert ret == dict()
+        assert self.db._queue.qsize() == 1
+
+    def test_remove_if_and_publish_success(self):
+        self.db.set(self.ns, self.dm)
+        ret = self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
+                                            self.old_data)
+        assert ret is True
+        ret = self.db.get(self.ns, self.keys)
+        assert ret == self.remove_dm
+        assert self.db._queue.qsize() == 1
+
+    def test_remove_if_and_publish_returns_false_if_data_does_not_match(self):
+        ret = self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
+                                            self.old_data)
+        assert ret is False
+        self.db.set(self.ns, self.dm)
+        ret = self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
+                                            self.new_data)
+        assert ret is False
+        assert self.db._queue.qsize() == 0
+
+    def test_remove_all_publish_success(self):
+        self.db.set(self.ns, self.dm)
+        self.db.remove_all_and_publish(self.ns, self.channels_and_events)
+        ret = self.db.get(self.ns, self.keys)
+        assert ret == dict()
+        assert self.db._queue.qsize() == 1
+
+    def test_subscribe_channel_success(self):
+        cb = Mock()
+        self.db.subscribe_channel(self.ns, cb, self.channels)
+        for channel in self.channels:
+            assert self.db._channel_cbs.get(channel, None)
+        assert not self.db._listen_thread.is_alive()
+
+    def test_subscribe_channel_event_loop_success(self):
+        cb = Mock()
+        self.db.start_event_listener()
+        self.db.subscribe_channel(self.ns, cb, self.channels)
+        for channel in self.channels:
+            assert self.db._channel_cbs.get(channel, None)
+        assert self.db._listen_thread.is_alive()
+
+    def test_unsubscribe_channel_success(self):
+        self.db.subscribe_channel(self.ns, Mock(), self.channels)
+        self.db.unsubscribe_channel(self.ns, [self.channels[0]])
+        assert self.db._channel_cbs.get(self.channels[0], None) is None
+        assert self.db._channel_cbs.get(self.channels[1], None)
+
+    def test_listen(self):
+        cb = Mock()
+        self.db.start_event_listener()
+        self.db.subscribe_channel(self.ns, cb, self.channels)
+        self.db._queue.put(("abs", "cbn"))
+        time.sleep(0.5)
+        assert self.db._queue.qsize() == 0
+
+    def test_start_event_listener_success(self):
+        self.db.start_event_listener()
+        assert self.db._run_in_thread
+
+    def test_start_event_listener_subscribe_first(self):
+        self.db._listen_thread.start = Mock()
+        mock_cb = Mock()
+        self.db._channel_cbs = {'abs': mock_cb}
+        self.db.subscribe_channel(self.ns, Mock(), self.channels)
+        self.db.start_event_listener()
+        self.db._listen_thread.start.assert_called_once()
+
+    def test_start_event_listener_fail(self):
+        self.db._listen_thread.is_alive = Mock()
+        self.db._listen_thread.is_alive.return_value = True
+        with pytest.raises(Exception):
+            self.db.start_event_listener()
+
+    def test_handle_events_success(self):
+        self.db._queue = Mock()
+        self.db._queue.get.return_value = ('abs', 'cbn')
+        mock_cb = Mock()
+        self.db._channel_cbs = {'abs': mock_cb}
+        assert self.db.handle_events() == ('abs', 'cbn')
+        mock_cb.assert_called_once_with('abs', 'cbn')
+
+    def test_handle_events_success_no_notification(self):
+        self.db._queue = Mock()
+        self.db._queue.get.side_effect = queue.Empty
+        assert self.db.handle_events() is None
+
+    def test_handle_events_fail_already_started(self):
+        self.db._listen_thread = Mock()
+        self.db._listen_thread.is_alive.return_value = True
+        with pytest.raises(Exception):
+            self.db.handle_events()
+
+    def test_handle_events_fail_already_set(self):
+        self.db._run_in_thread = True
+        with pytest.raises(Exception):
+            self.db.handle_events()
+
 @pytest.fixture()
 def fake_dict_backend_lock_fixture(request):
     request.cls.ns = 'some-ns'
old mode 100644 (file)
new mode 100755 (executable)
index 4f46205..29cb582
@@ -35,6 +35,7 @@ def redis_backend_fixture(request):
     request.cls.dl_redis = [b'1', b'2']
     request.cls.dm = {'a': b'1', 'b': b'2'}
     request.cls.dm_redis = {'{some-ns},a': b'1', '{some-ns},b': b'2'}
+    request.cls.dm_redis_flat = ['{some-ns},a', b'1', '{some-ns},b', b'2']
     request.cls.key = 'a'
     request.cls.key_redis = '{some-ns},a'
     request.cls.keys = ['a', 'b']
@@ -54,6 +55,9 @@ def redis_backend_fixture(request):
     request.cls.group_redis = '{some-ns},some-group'
     request.cls.groupmembers = set([b'm1', b'm2'])
     request.cls.groupmember = b'm1'
+    request.cls.channels = ['abs', 'gma']
+    request.cls.channels_and_events = {'abs': ['cbn']}
+    request.cls.channels_and_events_redis = ['{some-ns},abs', 'cbn']
 
     request.cls.configuration = Mock()
     mock_conf_params = _Configuration.Params(db_host=None,
@@ -62,9 +66,11 @@ def redis_backend_fixture(request):
                                              db_sentinel_master_name=None,
                                              db_type=DbBackendType.REDIS)
     request.cls.configuration.get_params.return_value = mock_conf_params
-    with patch('ricsdl.backend.redis.Redis') as mock_redis:
+    with patch('ricsdl.backend.redis.Redis') as mock_redis, patch(
+            'ricsdl.backend.redis.PubSub') as mock_pubsub:
         db = ricsdl.backend.get_backend_instance(request.cls.configuration)
         request.cls.mock_redis = mock_redis.return_value
+        request.cls.mock_pubsub = mock_pubsub.return_value
     request.cls.db = db
 
     yield
@@ -318,6 +324,171 @@ class TestRedisBackend:
         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
             self.db.group_size(self.ns, self.group)
 
+    def test_set_and_publish_success(self):
+        self.db.set_and_publish(self.ns, self.channels_and_events, self.dm)
+        self.mock_redis.execute_command.assert_called_once_with('MSETMPUB', len(self.dm),
+                                                                len(self.channels_and_events),
+                                                                *self.dm_redis_flat,
+                                                                *self.channels_and_events_redis)
+
+    def test_set_and_publish_can_map_redis_exception_to_sdl_exception(self):
+        self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
+        with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+            self.db.set_and_publish(self.ns, self.channels_and_events, self.dm)
+
+    def test_set_if_and_publish_success(self):
+        self.mock_redis.execute_command.return_value = b"OK"
+        ret = self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
+                                         self.new_data)
+        self.mock_redis.execute_command.assert_called_once_with('SETIEPUB', self.key_redis,
+                                                                self.new_data, self.old_data,
+                                                                *self.channels_and_events_redis)
+        assert ret is True
+
+    def test_set_if_and_publish_returns_false_if_existing_key_value_not_expected(self):
+        self.mock_redis.execute_command.return_value = None
+        ret = self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
+                                         self.new_data)
+        self.mock_redis.execute_command.assert_called_once_with('SETIEPUB', self.key_redis,
+                                                                self.new_data, self.old_data,
+                                                                *self.channels_and_events_redis)
+        assert ret is False
+
+    def test_set_if_and_publish_can_map_redis_exception_to_sdl_exception(self):
+        self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
+        with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+            self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
+                                       self.new_data)
+
+    def test_set_if_not_exists_and_publish_success(self):
+        self.mock_redis.execute_command.return_value = b"OK"
+        ret = self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
+                                                    self.new_data)
+        self.mock_redis.execute_command.assert_called_once_with('SETNXPUB', self.key_redis,
+                                                                self.new_data,
+                                                                *self.channels_and_events_redis)
+        assert ret is True
+
+    def test_set_if_not_exists_and_publish_returns_false_if_key_already_exists(self):
+        self.mock_redis.execute_command.return_value = None
+        ret = self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
+                                                    self.new_data)
+        self.mock_redis.execute_command.assert_called_once_with('SETNXPUB', self.key_redis,
+                                                                self.new_data,
+                                                                *self.channels_and_events_redis)
+        assert ret is False
+
+    def set_if_not_exists_and_publish_can_map_redis_exception_to_sdl_exception(self):
+        self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
+        with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+            self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
+                                                  self.new_data)
+
+    def test_remove_and_publish_success(self):
+        self.db.remove_and_publish(self.ns, self.channels_and_events, self.key)
+        self.mock_redis.execute_command.assert_called_once_with('DELMPUB', len(self.key),
+                                                                len(self.channels_and_events),
+                                                                self.key_redis,
+                                                                *self.channels_and_events_redis)
+
+    def test_remove_if_and_publish_success(self):
+        self.mock_redis.execute_command.return_value = 1
+        ret = self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
+                                            self.new_data)
+        self.mock_redis.execute_command.assert_called_once_with('DELIEPUB', self.key_redis,
+                                                                self.new_data,
+                                                                *self.channels_and_events_redis)
+        assert ret is True
+
+    def test_remove_if_and_publish_returns_false_if_data_does_not_match(self):
+        self.mock_redis.execute_command.return_value = 0
+        ret = self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
+                                            self.new_data)
+        self.mock_redis.execute_command.assert_called_once_with('DELIEPUB', self.key_redis,
+                                                                self.new_data,
+                                                                *self.channels_and_events_redis)
+        assert ret is False
+
+    def test_remove_if_and_publish_can_map_redis_exception_to_sdl_exception(self):
+        self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
+        with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+            self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
+                                          self.new_data)
+
+    def test_remove_all_and_publish_success(self):
+        self.mock_redis.keys.return_value = ['{some-ns},a']
+        self.db.remove_all_and_publish(self.ns, self.channels_and_events)
+        self.mock_redis.keys.assert_called_once()
+        self.mock_redis.execute_command.assert_called_once_with('DELMPUB', len(self.key),
+                                                                len(self.channels_and_events),
+                                                                self.key_redis,
+                                                                *self.channels_and_events_redis)
+
+    def test_remove_all_and_publish_can_map_redis_exception_to_sdl_exception(self):
+        self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
+        with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+            self.db.remove_all_and_publish(self.ns, self.channels_and_events)
+
+    def test_subscribe_channel_success(self):
+        cb = Mock()
+        self.db.subscribe_channel(self.ns, cb, self.channels)
+        for channel in self.channels:
+            self.mock_pubsub.subscribe.assert_any_call(**{f'{{some-ns}},{channel}': cb})
+
+    def test_subscribe_channel_with_thread_success(self):
+        cb = Mock()
+        self.db.pubsub_thread.is_alive = Mock()
+        self.db.pubsub_thread.is_alive.return_value = False
+        self.db._run_in_thread = True
+        self.db.subscribe_channel(self.ns, cb, self.channels)
+        self.mock_pubsub.run_in_thread.assert_called_once_with(daemon=True, sleep_time=0.001)
+
+    def test_subscribe_can_map_redis_exception_to_sdl_exeception(self):
+        self.mock_pubsub.subscribe.side_effect = redis_exceptions.ResponseError('redis error!')
+        with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+            self.db.subscribe_channel(self.ns, Mock(), self.channels)
+
+    def test_unsubscribe_channel_success(self):
+        self.db.unsubscribe_channel(self.ns, [self.channels[0]])
+        self.mock_pubsub.unsubscribe.assert_called_with('{some-ns},abs')
+
+    def test_unsubscribe_channel_can_map_redis_exception_to_sdl_exeception(self):
+        self.mock_pubsub.unsubscribe.side_effect = redis_exceptions.ResponseError('redis error!')
+        with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+            self.db.unsubscribe_channel(self.ns, [self.channels[0]])
+
+    def test_start_event_listener_success(self):
+        self.db.start_event_listener()
+        assert self.db._run_in_thread
+
+    def test_start_event_listener_subscribe_first(self):
+        self.mock_pubsub.run_in_thread.return_value = Mock()
+        self.mock_redis.pubsub_channels.return_value = [b'{some-ns},abs']
+        self.db.subscribe_channel(self.ns, Mock(), self.channels)
+        self.db.start_event_listener()
+        self.mock_pubsub.run_in_thread.assert_called_once_with(daemon=True, sleep_time=0.001)
+
+    def test_start_event_listener_fail(self):
+        self.db.pubsub_thread.is_alive = Mock()
+        self.db.pubsub_thread.is_alive.return_value = True
+        with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+            self.db.start_event_listener()
+
+    def test_handle_events_success(self):
+        self.db.handle_events()
+        self.mock_pubsub.get_message.assert_called_once_with(ignore_subscribe_messages=True)
+
+    def test_handle_events_fail_already_started(self):
+        self.db.pubsub_thread.is_alive = Mock()
+        self.db.pubsub_thread.is_alive.return_value = True
+        with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+            self.db.handle_events()
+
+    def test_handle_events_fail_already_set(self):
+        self.db._run_in_thread = True
+        with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+            self.db.handle_events()
+
     def test_get_redis_connection_function_success(self):
         ret = self.db.get_redis_connection()
         assert ret is self.mock_redis
@@ -498,3 +669,14 @@ def test_system_error_exceptions_are_not_mapped_to_any_sdl_exception():
     with pytest.raises(SystemExit):
         with _map_to_sdl_exception():
             raise SystemExit('Fatal error')
+
+
+class TestRedisClient:
+    @classmethod
+    def setup_class(cls):
+        cls.pubsub = ricsdl.backend.redis.PubSub(Mock())
+        cls.pubsub.channels = {b'{some-ns},abs': Mock()}
+
+    def test_handle_pubsub_message(self):
+        assert self.pubsub.handle_message([b'message', b'{some-ns},abs', b'cbn']) == ('abs', 'cbn')
+        self.pubsub.channels.get(b'{some-ns},abs').assert_called_once_with('abs', 'cbn')
old mode 100644 (file)
new mode 100755 (executable)
index 7fb08dc..332a2fa
@@ -43,6 +43,8 @@ def sync_storage_fixture(request):
     request.cls.lock_name = 'some-lock-name'
     request.cls.lock_int_expiration = 10
     request.cls.lock_float_expiration = 1.1
+    request.cls.channels = {'abs', 'cbn'}
+    request.cls.channels_and_events = {'abs': 'cbn'}
 
     with patch('ricsdl.backend.get_backend_instance') as mock_db_backend:
         storage = SyncStorage()
@@ -312,6 +314,149 @@ class TestSyncStorage:
         with pytest.raises(SdlTypeError):
             self.storage.group_size(self.ns, 0xbad)
 
+    def test_set_and_publish_function_success(self):
+        self.storage.set_and_publish(self.ns, self.channels_and_events, self.dm)
+        self.mock_db_backend.set_and_publish.assert_called_once_with(self.ns,
+                                                                     self.channels_and_events,
+                                                                     self.dm)
+
+    def test_set_and_publish_can_raise_exception_for_wrong_argument(self):
+        with pytest.raises(SdlTypeError):
+            self.storage.set_and_publish(123, self.channels_and_events, {'a': b'v1'})
+        with pytest.raises(SdlTypeError):
+            self.storage.set_and_publish('ns', self.channels_and_events, [1, 2])
+        with pytest.raises(SdlTypeError):
+            self.storage.set_and_publish('ns', self.channels_and_events, {0xbad: b'v1'})
+        with pytest.raises(SdlTypeError):
+            self.storage.set_and_publish('ns', self.channels_and_events, {'a': 0xbad})
+
+    def test_set_if_and_publish_success(self):
+        self.mock_db_backend.set_if_and_publish.return_value = True
+        ret = self.storage.set_if_and_publish(self.ns, self.channels_and_events, self.key,
+                                              self.old_data, self.new_data)
+        self.mock_db_backend.set_if_and_publish.assert_called_once_with(
+            self.ns, self.channels_and_events, self.key, self.old_data, self.new_data)
+        assert ret is True
+
+    def test_set_if_and_publish_can_return_false_if_same_data_already_exists(self):
+        self.mock_db_backend.set_if_and_publish.return_value = False
+        ret = self.storage.set_if_and_publish(self.ns, self.channels_and_events, self.key,
+                                              self.old_data, self.new_data)
+        self.mock_db_backend.set_if_and_publish.assert_called_once_with(
+            self.ns, self.channels_and_events, self.key, self.old_data, self.new_data)
+        assert ret is False
+
+    def test_set_if_and_publish_can_raise_exception_for_wrong_argument(self):
+        with pytest.raises(SdlTypeError):
+            self.storage.set_if_and_publish(0xbad, self.channels_and_events, 'key', b'v1', b'v2')
+        with pytest.raises(SdlTypeError):
+            self.storage.set_if_and_publish('ns', self.channels_and_events, 0xbad, b'v1', b'v2')
+        with pytest.raises(SdlTypeError):
+            self.storage.set_if_and_publish('ns', self.channels_and_events, 'key', 0xbad, b'v2')
+        with pytest.raises(SdlTypeError):
+            self.storage.set_if_and_publish('ns', self.channels_and_events, 'key', b'v1', 0xbad)
+
+    def test_set_if_not_exists_and_publish_success(self):
+        self.mock_db_backend.set_if_not_exists_and_publish.return_value = True
+        ret = self.storage.set_if_not_exists_and_publish(self.ns, self.channels_and_events,
+                                                         self.key, self.new_data)
+        self.mock_db_backend.set_if_not_exists_and_publish.assert_called_once_with(
+            self.ns, self.channels_and_events, self.key, self.new_data)
+        assert ret is True
+
+    def test_set_if_not_exists_and_publish_function_can_return_false_if_key_already_exists(self):
+        self.mock_db_backend.set_if_not_exists_and_publish.return_value = False
+        ret = self.storage.set_if_not_exists_and_publish(self.ns, self.channels_and_events,
+                                                         self.key, self.new_data)
+        self.mock_db_backend.set_if_not_exists_and_publish.assert_called_once_with(
+            self.ns, self.channels_and_events, self.key, self.new_data)
+        assert ret is False
+
+    def test_set_if_not_exists_and_publish_can_raise_exception_for_wrong_argument(self):
+        with pytest.raises(SdlTypeError):
+            self.storage.set_if_not_exists_and_publish(0xbad, self.channels_and_events, 'key',
+                                                       b'v1')
+        with pytest.raises(SdlTypeError):
+            self.storage.set_if_not_exists_and_publish('ns', self.channels_and_events, 0xbad, b'v1')
+        with pytest.raises(SdlTypeError):
+            self.storage.set_if_not_exists_and_publish('ns', self.channels_and_events, 'key', 0xbad)
+
+    def test_remove_and_publish_function_success(self):
+        self.storage.remove_and_publish(self.ns, self.channels_and_events, self.keys)
+        self.mock_db_backend.remove_and_publish.assert_called_once_with(
+            self.ns, self.channels_and_events, list(self.keys))
+
+    def test_remove_and_publish_can_raise_exception_for_wrong_argument(self):
+        with pytest.raises(SdlTypeError):
+            self.storage.remove_and_publish(0xbad, self.channels_and_events, self.keys)
+        with pytest.raises(SdlTypeError):
+            self.storage.remove(self.ns, self.channels_and_events, 0xbad)
+
+    def test_remove_if_and_publish_success(self):
+        self.mock_db_backend.remove_if_and_publish.return_value = True
+        ret = self.storage.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
+                                                 self.new_data)
+        self.mock_db_backend.remove_if_and_publish.assert_called_once_with(
+            self.ns, self.channels_and_events, self.key, self.new_data)
+        assert ret is True
+
+    def test_remove_if_remove_and_publish_can_return_false_if_data_does_not_match(self):
+        self.mock_db_backend.remove_if_and_publish.return_value = False
+        ret = self.storage.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
+                                                 self.old_data)
+        self.mock_db_backend.remove_if_and_publish.assert_called_once_with(
+            self.ns, self.channels_and_events, self.key, self.old_data)
+        assert ret is False
+
+    def test_remove_if_remove_and_publish_can_raise_exception_for_wrong_argument(self):
+        with pytest.raises(SdlTypeError):
+            self.storage.remove_if_and_publish(0xbad, self.channels_and_events, self.keys,
+                                               self.old_data)
+        with pytest.raises(SdlTypeError):
+            self.storage.remove_if_and_publish(self.ns, self.channels_and_events, 0xbad,
+                                               self.old_data)
+        with pytest.raises(SdlTypeError):
+            self.storage.remove_if_and_publish(self.ns, self.channels_and_events, self.keys, 0xbad)
+
+    def test_remove_all_and_publish_success(self):
+        self.storage.remove_all_and_publish(self.ns, self.channels_and_events)
+        self.mock_db_backend.remove_all_and_publish.assert_called_once_with(
+            self.ns, self.channels_and_events)
+
+    def test_remove_all_and_publish_can_raise_exception_for_wrong_argument(self):
+        with pytest.raises(SdlTypeError):
+            self.storage.remove_all_and_publish(0xbad, self.channels_and_events)
+
+    def test_subscribe_function_success(self):
+        def cb(channel, message):
+            pass
+        self.storage.subscribe_channel(self.ns, cb, self.channels)
+        self.mock_db_backend.subscribe_channel.assert_called_once_with(
+            self.ns, cb, list(self.channels))
+
+    def test_subscribe_can_raise_exception_for_wrong_argument(self):
+        def cb3(channel, message, extra):
+            pass
+        def cb1(channel):
+            pass
+        with pytest.raises(SdlTypeError):
+            self.storage.subscribe_channel(self.ns, cb3, self.channels)
+        with pytest.raises(SdlTypeError):
+            self.storage.subscribe_channel(self.ns, cb1, self.channels)
+
+    def test_unsubscribe_function_success(self):
+        self.storage.unsubscribe_channel(self.ns, self.channels)
+        self.mock_db_backend.unsubscribe_channel.assert_called_once_with(
+            self.ns, list(self.channels))
+
+    def test_start_event_listener_success(self):
+        self.storage.start_event_listener()
+        self.mock_db_backend.start_event_listener.assert_called()
+
+    def test_handle_events_success(self):
+        self.storage.handle_events()
+        self.mock_db_backend.handle_events.assert_called()
+
     @patch('ricsdl.syncstorage.SyncLock')
     def test_get_lock_resource_function_success_when_expiration_time_is_integer(self, mock_db_lock):
         ret = self.storage.get_lock_resource(self.ns, self.lock_name, self.lock_int_expiration)