Implement sentinel based DB capacity scaling 60/6060/2
authorTimo Tietavainen <timo.tietavainen@nokia.com>
Tue, 11 May 2021 11:43:09 +0000 (14:43 +0300)
committerTimo Tietavainen <timo.tietavainen@nokia.com>
Tue, 11 May 2021 11:59:48 +0000 (14:59 +0300)
For time being SDL has supported standalone DBAAS DB and DBAAS HA DB deployment
with Redis sentinel. With this commit extent SDL functionality to support Redis
sentinel based DB cluster where we have multiple DBAAS Redis sentinel groups
and these groups can be used to spread out SDL DB write and read operations to
different DB instances.
Implement cluster DBAAS DB service addresses reading from environment variable
'DBAAS_CLUSTER_ADDR_LIST'.
Implement crc32 hash value calculation from namespace string and selection of
a DB instance from DB cluster based on calculated hash.

Issue-ID: RIC-699

Signed-off-by: Timo Tietavainen <timo.tietavainen@nokia.com>
Change-Id: Ib325ca68e212bde80c1536b2392293ee76f8fe9e

docs/release-notes.rst
ricsdl-package/examples/notify.py
ricsdl-package/examples/sync.py
ricsdl-package/ricsdl/__init__.py
ricsdl-package/ricsdl/backend/redis.py
ricsdl-package/ricsdl/configuration.py
ricsdl-package/tests/backend/test_fake_dict_db.py
ricsdl-package/tests/backend/test_redis.py
ricsdl-package/tests/test_configuration.py

index 496254c..8177824 100644 (file)
@@ -33,6 +33,10 @@ This document provides the release notes of the ricsdl library.
 Version history
 ---------------
 
+[2.2.0] - 2021-05-11
+
+* Add DB backend instance selection based on namespace value to balance DB load.
+
 [2.1.1] - 2021-03-09
 
 * Take DBAAS multi-channel publishing Redis modules into use.
index 3041d80..473dd73 100755 (executable)
@@ -37,11 +37,19 @@ Execution of  these examples requires:
    running Redis.
  * Following environment variables are needed to set to the pod/container where the application
    utilizing SDL is going to be run.
-     DBAAS_SERVICE_HOST = [redis server address]
-     DBAAS_SERVICE_PORT= [redis server port]
-     DBAAS_MASTER_NAME = [master Redis sentinel name]. Needed to set only if sentinel is in use.
-     DBAAS_SERVICE_SENTINEL_PORT = [Redis sentinel port number]. Needed to set only if sentinel
-     is in use.
+     DBAAS_SERVICE_HOST = [DB service address]
+     DBAAS_SERVICE_PORT= [DB service port]
+     DBAAS_MASTER_NAME = [DB name]. Needed to set only if Redis sentinel is used to provide high
+     availability for Redis DB solution.
+     DBAAS_SERVICE_SENTINEL_PORT = [Redis sentinel port number]. Needed to set only if Redis
+     sentinel is in use.
+     DBASS_CLUSTER_ADDR_LIST = [list of DB service addresses]. Is set only if more than one
+     Redis sentinel groups are in use.
+   In official RIC deployments four first environment variables are defined in Helm configMaps
+   of the DBaaS and these configurations can be loaded automatically as environment variables
+   into application pods via `envFrom dbaas-appconfig` statement in an application Helm Charts.
+   The last environment variable is not for time being in use in official RIC deployments, only
+   in Nokia SEP deployments.
 """
 
 import threading
index 1fa5742..618586d 100755 (executable)
@@ -39,11 +39,19 @@ Execution of  these examples requires:
    running Redis.
  * Following environment variables are needed to set to the pod/container where the application
    utilizing SDL is going to be run.
-     DBAAS_SERVICE_HOST = [redis server address]
-     DBAAS_SERVICE_PORT= [redis server port]
-     DBAAS_MASTER_NAME = [master Redis sentinel name]. Needed to set only if sentinel is in use.
-     DBAAS_SERVICE_SENTINEL_PORT = [Redis sentinel port number]. Needed to set only if sentinel
-     is in use.
+     DBAAS_SERVICE_HOST = [DB service address]
+     DBAAS_SERVICE_PORT= [DB service port]
+     DBAAS_MASTER_NAME = [DB name]. Needed to set only if Redis sentinel is used to provide high
+     availability for Redis DB solution.
+     DBAAS_SERVICE_SENTINEL_PORT = [Redis sentinel port number]. Needed to set only if Redis
+     sentinel is in use.
+     DBASS_CLUSTER_ADDR_LIST = [list of DB service addresses]. Is set only if more than one
+     Redis sentinel groups are in use.
+   In official RIC deployments four first environment variables are defined in Helm configMaps
+   of the DBaaS and these configurations can be loaded automatically as environment variables
+   into application pods via `envFrom dbaas-appconfig` statement in an application Helm Charts.
+   The last environment variable is not for time being in use in official RIC deployments, only
+   in Nokia SEP deployments.
 """
 from ricsdl.syncstorage import SyncStorage
 from ricsdl.exceptions import RejectedByBackend, NotConnected, BackendError
index b4c587c..d969bec 100644 (file)
@@ -31,7 +31,7 @@ from .exceptions import (
 )
 
 
-__version__ = '2.1.1'
+__version__ = '2.2.0'
 
 
 __all__ = [
index c319bf0..12726a3 100755 (executable)
@@ -23,6 +23,7 @@
 import contextlib
 import threading
 from typing import (Callable, Dict, Set, List, Optional, Tuple, Union)
+import zlib
 import redis
 from redis import Redis
 from redis.sentinel import Sentinel
@@ -146,63 +147,53 @@ class RedisBackend(DbBackendAbc):
     """
     def __init__(self, configuration: _Configuration) -> None:
         super().__init__()
+        self.next_client_event = 0
+        self.clients = list()
         with _map_to_sdl_exception():
-            if configuration.get_params().db_sentinel_port:
-                sentinel_node = (configuration.get_params().db_host,
-                                 configuration.get_params().db_sentinel_port)
-                master_name = configuration.get_params().db_sentinel_master_name
-                self.__sentinel = Sentinel([sentinel_node])
-                self.__redis = self.__sentinel.master_for(master_name)
-            else:
-                self.__redis = Redis(host=configuration.get_params().db_host,
-                                     port=configuration.get_params().db_port,
-                                     db=0,
-                                     max_connections=20)
-        self.__redis.set_response_callback('SETIE', lambda r: r and nativestr(r) == 'OK' or False)
-        self.__redis.set_response_callback('DELIE', lambda r: r and int(r) == 1 or False)
-
-        self.__redis_pubsub = PubSub(self.__redis.connection_pool, ignore_subscribe_messages=True)
-        self.pubsub_thread = threading.Thread(target=None)
-        self._run_in_thread = False
+            self.clients = self.__create_redis_clients(configuration)
 
     def __del__(self):
         self.close()
 
     def __str__(self):
-        return str(
-            {
-                "DB type": "Redis",
-                "Redis connection": repr(self.__redis)
-            }
-        )
+        out = {"DB type": "Redis"}
+        for i, r in enumerate(self.clients):
+            out["Redis client[" + str(i) + "]"] = str(r)
+        return str(out)
 
     def is_connected(self):
+        is_connected = True
         with _map_to_sdl_exception():
-            return self.__redis.ping()
+            for c in self.clients:
+                if not c.redis_client.ping():
+                    is_connected = False
+                    break
+        return is_connected
 
     def close(self):
-        self.__redis.close()
+        for c in self.clients:
+            c.redis_client.close()
 
     def set(self, ns: str, data_map: Dict[str, bytes]) -> None:
-        db_data_map = self._add_data_map_ns_prefix(ns, data_map)
+        db_data_map = self.__add_data_map_ns_prefix(ns, data_map)
         with _map_to_sdl_exception():
-            self.__redis.mset(db_data_map)
+            self.__getClient(ns).mset(db_data_map)
 
     def set_if(self, ns: str, key: str, old_data: bytes, new_data: bytes) -> bool:
-        db_key = self._add_key_ns_prefix(ns, key)
+        db_key = self.__add_key_ns_prefix(ns, key)
         with _map_to_sdl_exception():
-            return self.__redis.execute_command('SETIE', db_key, new_data, old_data)
+            return self.__getClient(ns).execute_command('SETIE', db_key, new_data, old_data)
 
     def set_if_not_exists(self, ns: str, key: str, data: bytes) -> bool:
-        db_key = self._add_key_ns_prefix(ns, key)
+        db_key = self.__add_key_ns_prefix(ns, key)
         with _map_to_sdl_exception():
-            return self.__redis.setnx(db_key, data)
+            return self.__getClient(ns).setnx(db_key, data)
 
     def get(self, ns: str, keys: List[str]) -> Dict[str, bytes]:
         ret = dict()
-        db_keys = self._add_keys_ns_prefix(ns, keys)
+        db_keys = self.__add_keys_ns_prefix(ns, keys)
         with _map_to_sdl_exception():
-            values = self.__redis.mget(db_keys)
+            values = self.__getClient(ns).mget(db_keys)
             for idx, val in enumerate(values):
                 # return only key values, which has a value
                 if val is not None:
@@ -210,10 +201,10 @@ class RedisBackend(DbBackendAbc):
             return ret
 
     def find_keys(self, ns: str, key_pattern: str) -> List[str]:
-        db_key_pattern = self._add_key_ns_prefix(ns, key_pattern)
+        db_key_pattern = self.__add_key_ns_prefix(ns, key_pattern)
         with _map_to_sdl_exception():
-            ret = self.__redis.keys(db_key_pattern)
-            return self._strip_ns_from_bin_keys(ns, ret)
+            ret = self.__getClient(ns).keys(db_key_pattern)
+            return self.__strip_ns_from_bin_keys(ns, ret)
 
     def find_and_get(self, ns: str, key_pattern: str) -> Dict[str, bytes]:
         # todo: replace below implementation with redis 'NGET' module
@@ -225,53 +216,53 @@ class RedisBackend(DbBackendAbc):
         return ret
 
     def remove(self, ns: str, keys: List[str]) -> None:
-        db_keys = self._add_keys_ns_prefix(ns, keys)
+        db_keys = self.__add_keys_ns_prefix(ns, keys)
         with _map_to_sdl_exception():
-            self.__redis.delete(*db_keys)
+            self.__getClient(ns).delete(*db_keys)
 
     def remove_if(self, ns: str, key: str, data: bytes) -> bool:
-        db_key = self._add_key_ns_prefix(ns, key)
+        db_key = self.__add_key_ns_prefix(ns, key)
         with _map_to_sdl_exception():
-            return self.__redis.execute_command('DELIE', db_key, data)
+            return self.__getClient(ns).execute_command('DELIE', db_key, data)
 
     def add_member(self, ns: str, group: str, members: Set[bytes]) -> None:
-        db_key = self._add_key_ns_prefix(ns, group)
+        db_key = self.__add_key_ns_prefix(ns, group)
         with _map_to_sdl_exception():
-            self.__redis.sadd(db_key, *members)
+            self.__getClient(ns).sadd(db_key, *members)
 
     def remove_member(self, ns: str, group: str, members: Set[bytes]) -> None:
-        db_key = self._add_key_ns_prefix(ns, group)
+        db_key = self.__add_key_ns_prefix(ns, group)
         with _map_to_sdl_exception():
-            self.__redis.srem(db_key, *members)
+            self.__getClient(ns).srem(db_key, *members)
 
     def remove_group(self, ns: str, group: str) -> None:
-        db_key = self._add_key_ns_prefix(ns, group)
+        db_key = self.__add_key_ns_prefix(ns, group)
         with _map_to_sdl_exception():
-            self.__redis.delete(db_key)
+            self.__getClient(ns).delete(db_key)
 
     def get_members(self, ns: str, group: str) -> Set[bytes]:
-        db_key = self._add_key_ns_prefix(ns, group)
+        db_key = self.__add_key_ns_prefix(ns, group)
         with _map_to_sdl_exception():
-            return self.__redis.smembers(db_key)
+            return self.__getClient(ns).smembers(db_key)
 
     def is_member(self, ns: str, group: str, member: bytes) -> bool:
-        db_key = self._add_key_ns_prefix(ns, group)
+        db_key = self.__add_key_ns_prefix(ns, group)
         with _map_to_sdl_exception():
-            return self.__redis.sismember(db_key, member)
+            return self.__getClient(ns).sismember(db_key, member)
 
     def group_size(self, ns: str, group: str) -> int:
-        db_key = self._add_key_ns_prefix(ns, group)
+        db_key = self.__add_key_ns_prefix(ns, group)
         with _map_to_sdl_exception():
-            return self.__redis.scard(db_key)
+            return self.__getClient(ns).scard(db_key)
 
     def set_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
                         data_map: Dict[str, bytes]) -> None:
-        db_data_map = self._add_data_map_ns_prefix(ns, data_map)
+        db_data_map = self.__add_data_map_ns_prefix(ns, data_map)
         channels_and_events_prepared = []
         total_events = 0
         channels_and_events_prepared, total_events = self._prepare_channels(ns, channels_and_events)
         with _map_to_sdl_exception():
-            return self.__redis.execute_command(
+            return self.__getClient(ns).execute_command(
                 "MSETMPUB",
                 len(db_data_map),
                 total_events,
@@ -281,29 +272,29 @@ class RedisBackend(DbBackendAbc):
 
     def set_if_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]], key: str,
                            old_data: bytes, new_data: bytes) -> bool:
-        db_key = self._add_key_ns_prefix(ns, key)
+        db_key = self.__add_key_ns_prefix(ns, key)
         channels_and_events_prepared = []
         channels_and_events_prepared, _ = self._prepare_channels(ns, channels_and_events)
         with _map_to_sdl_exception():
-            ret = self.__redis.execute_command("SETIEMPUB", db_key, new_data, old_data,
-                                               *channels_and_events_prepared)
+            ret = self.__getClient(ns).execute_command("SETIEMPUB", db_key, new_data, old_data,
+                                                       *channels_and_events_prepared)
             return ret == b"OK"
 
     def set_if_not_exists_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
                                       key: str, data: bytes) -> bool:
-        db_key = self._add_key_ns_prefix(ns, key)
+        db_key = self.__add_key_ns_prefix(ns, key)
         channels_and_events_prepared, _ = self._prepare_channels(ns, channels_and_events)
         with _map_to_sdl_exception():
-            ret = self.__redis.execute_command("SETNXMPUB", db_key, data,
-                                               *channels_and_events_prepared)
+            ret = self.__getClient(ns).execute_command("SETNXMPUB", db_key, data,
+                                                       *channels_and_events_prepared)
             return ret == b"OK"
 
     def remove_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]],
                            keys: List[str]) -> None:
-        db_keys = self._add_keys_ns_prefix(ns, keys)
+        db_keys = self.__add_keys_ns_prefix(ns, keys)
         channels_and_events_prepared, total_events = self._prepare_channels(ns, channels_and_events)
         with _map_to_sdl_exception():
-            return self.__redis.execute_command(
+            return self.__getClient(ns).execute_command(
                 "DELMPUB",
                 len(db_keys),
                 total_events,
@@ -313,18 +304,18 @@ class RedisBackend(DbBackendAbc):
 
     def remove_if_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]], key: str,
                               data: bytes) -> bool:
-        db_key = self._add_key_ns_prefix(ns, key)
+        db_key = self.__add_key_ns_prefix(ns, key)
         channels_and_events_prepared, _ = self._prepare_channels(ns, channels_and_events)
         with _map_to_sdl_exception():
-            ret = self.__redis.execute_command("DELIEMPUB", db_key, data,
-                                               *channels_and_events_prepared)
+            ret = self.__getClient(ns).execute_command("DELIEMPUB", db_key, data,
+                                                       *channels_and_events_prepared)
             return bool(ret)
 
     def remove_all_and_publish(self, ns: str, channels_and_events: Dict[str, List[str]]) -> None:
-        keys = self.__redis.keys(self._add_key_ns_prefix(ns, "*"))
+        keys = self.__getClient(ns).keys(self.__add_key_ns_prefix(ns, "*"))
         channels_and_events_prepared, total_events = self._prepare_channels(ns, channels_and_events)
         with _map_to_sdl_exception():
-            return self.__redis.execute_command(
+            return self.__getClient(ns).execute_command(
                 "DELMPUB",
                 len(keys),
                 total_events,
@@ -334,55 +325,115 @@ class RedisBackend(DbBackendAbc):
 
     def subscribe_channel(self, ns: str, cb: Callable[[str, str], None],
                           channels: List[str]) -> None:
-        channels = self._add_keys_ns_prefix(ns, channels)
+        channels = self.__add_keys_ns_prefix(ns, channels)
         for channel in channels:
             with _map_to_sdl_exception():
-                self.__redis_pubsub.subscribe(**{channel: cb})
-                if not self.pubsub_thread.is_alive() and self._run_in_thread:
-                    self.pubsub_thread = self.__redis_pubsub.run_in_thread(sleep_time=0.001,
-                                                                           daemon=True)
+                redis_ctx = self.__getClientConn(ns)
+                redis_ctx.redis_pubsub.subscribe(**{channel: cb})
+                if not redis_ctx.pubsub_thread.is_alive() and redis_ctx.run_in_thread:
+                    redis_ctx.pubsub_thread = redis_ctx.redis_pubsub.run_in_thread(sleep_time=0.001,
+                                                                                   daemon=True)
 
     def unsubscribe_channel(self, ns: str, channels: List[str]) -> None:
-        channels = self._add_keys_ns_prefix(ns, channels)
+        channels = self.__add_keys_ns_prefix(ns, channels)
         for channel in channels:
             with _map_to_sdl_exception():
-                self.__redis_pubsub.unsubscribe(channel)
+                self.__getClientConn(ns).redis_pubsub.unsubscribe(channel)
 
     def start_event_listener(self) -> None:
-        if self.pubsub_thread.is_alive():
-            raise RejectedByBackend("Event loop already started")
-        if len(self.__redis.pubsub_channels()) > 0:
-            self.pubsub_thread = self.__redis_pubsub.run_in_thread(sleep_time=0.001, daemon=True)
-        self._run_in_thread = True
+        redis_ctxs = self.__getClientConns()
+        for redis_ctx in redis_ctxs:
+            if redis_ctx.pubsub_thread.is_alive():
+                raise RejectedByBackend("Event loop already started")
+            if redis_ctx.redis_pubsub.subscribed and len(redis_ctx.redis_client.pubsub_channels()) > 0:
+                redis_ctx.pubsub_thread = redis_ctx.redis_pubsub.run_in_thread(sleep_time=0.001, daemon=True)
+            redis_ctx.run_in_thread = True
 
     def handle_events(self) -> Optional[Tuple[str, str]]:
-        if self.pubsub_thread.is_alive() or self._run_in_thread:
+        if self.next_client_event >= len(self.clients):
+            self.next_client_event = 0
+        redis_ctx = self.clients[self.next_client_event]
+        self.next_client_event += 1
+        if redis_ctx.pubsub_thread.is_alive() or redis_ctx.run_in_thread:
             raise RejectedByBackend("Event loop already started")
         try:
-            return self.__redis_pubsub.get_message(ignore_subscribe_messages=True)
+            return redis_ctx.redis_pubsub.get_message(ignore_subscribe_messages=True)
         except RuntimeError:
             return None
 
+    def __create_redis_clients(self, config):
+        clients = list()
+        cfg_params = config.get_params()
+        if cfg_params.db_cluster_addr_list is None:
+            clients.append(self.__create_legacy_redis_client(cfg_params))
+        else:
+            for addr in cfg_params.db_cluster_addr_list.split(","):
+                client = self.__create_redis_client(cfg_params, addr)
+                clients.append(client)
+        return clients
+
+    def __create_legacy_redis_client(self, cfg_params):
+        return self.__create_redis_client(cfg_params, cfg_params.db_host)
+
+    def __create_redis_client(self, cfg_params, addr):
+        new_sentinel = None
+        new_redis = None
+        if cfg_params.db_sentinel_port is None:
+            new_redis = Redis(host=addr, port=cfg_params.db_port, db=0, max_connections=20)
+        else:
+            sentinel_node = (addr, cfg_params.db_sentinel_port)
+            master_name = cfg_params.db_sentinel_master_name
+            new_sentinel = Sentinel([sentinel_node])
+            new_redis = new_sentinel.master_for(master_name)
+
+        new_redis.set_response_callback('SETIE', lambda r: r and nativestr(r) == 'OK' or False)
+        new_redis.set_response_callback('DELIE', lambda r: r and int(r) == 1 or False)
+
+        redis_pubsub = PubSub(new_redis.connection_pool, ignore_subscribe_messages=True)
+        pubsub_thread = threading.Thread(target=None)
+        run_in_thread = False
+
+        return _RedisConn(new_redis, redis_pubsub, pubsub_thread, run_in_thread)
+
+    def __getClientConns(self):
+        return self.clients
+
+    def __getClientConn(self, ns):
+        clients_cnt = len(self.clients)
+        client_id = self.__get_hash(ns) % clients_cnt
+        return self.clients[client_id]
+
+    def __getClient(self, ns):
+        clients_cnt = len(self.clients)
+        client_id = 0
+        if clients_cnt > 1:
+            client_id = self.__get_hash(ns) % clients_cnt
+        return self.clients[client_id].redis_client
+
+    @classmethod
+    def __get_hash(cls, str):
+        return zlib.crc32(str.encode())
+
     @classmethod
-    def _add_key_ns_prefix(cls, ns: str, key: str):
+    def __add_key_ns_prefix(cls, ns: str, key: str):
         return '{' + ns + '},' + key
 
     @classmethod
-    def _add_keys_ns_prefix(cls, ns: str, keylist: List[str]) -> List[str]:
+    def __add_keys_ns_prefix(cls, ns: str, keylist: List[str]) -> List[str]:
         ret_nskeys = []
         for k in keylist:
             ret_nskeys.append('{' + ns + '},' + k)
         return ret_nskeys
 
     @classmethod
-    def _add_data_map_ns_prefix(cls, ns: str, data_dict: Dict[str, bytes]) -> Dict[str, bytes]:
+    def __add_data_map_ns_prefix(cls, ns: str, data_dict: Dict[str, bytes]) -> Dict[str, bytes]:
         ret_nsdict = {}
         for key, val in data_dict.items():
             ret_nsdict['{' + ns + '},' + key] = val
         return ret_nsdict
 
     @classmethod
-    def _strip_ns_from_bin_keys(cls, ns: str, nskeylist: List[bytes]) -> List[str]:
+    def __strip_ns_from_bin_keys(cls, ns: str, nskeylist: List[bytes]) -> List[str]:
         ret_keys = []
         for k in nskeylist:
             try:
@@ -404,14 +455,36 @@ class RedisBackend(DbBackendAbc):
         total_events = 0
         for channel, events in channels_and_events.items():
             for event in events:
-                channels_and_events_prepared.append(cls._add_key_ns_prefix(ns, channel))
+                channels_and_events_prepared.append(cls.__add_key_ns_prefix(ns, channel))
                 channels_and_events_prepared.append(event)
                 total_events += 1
         return channels_and_events_prepared, total_events
 
-    def get_redis_connection(self):
-        """Return existing Redis database connection."""
-        return self.__redis
+    def get_redis_connection(self, ns: str):
+        """Return existing Redis database connection valid for the namespace."""
+        return self.__getClient(ns)
+
+
+class _RedisConn:
+    """
+    Internal class container to hold redis client connection
+    """
+
+    def __init__(self, redis_client, pubsub, pubsub_thread, run_in_thread):
+        self.redis_client = redis_client
+        self.redis_pubsub = pubsub
+        self.pubsub_thread = pubsub_thread
+        self.run_in_thread = run_in_thread
+
+    def __str__(self):
+        return str(
+            {
+                "Client": repr(self.redis_client),
+                "Subscrions": self.redis_pubsub.subscribed,
+                "PubSub thread": repr(self.pubsub_thread),
+                "Run in thread": self.run_in_thread,
+            }
+        )
 
 
 class RedisBackendLock(DbBackendLockAbc):
@@ -445,7 +518,7 @@ class RedisBackendLock(DbBackendLockAbc):
     def __init__(self, ns: str, name: str, expiration: Union[int, float],
                  redis_backend: RedisBackend) -> None:
         super().__init__(ns, name)
-        self.__redis = redis_backend.get_redis_connection()
+        self.__redis = redis_backend.get_redis_connection(ns)
         with _map_to_sdl_exception():
             redis_lockname = '{' + ns + '},' + self._lock_name
             self.__redis_lock = Lock(redis=self.__redis, name=redis_lockname, timeout=expiration)
index 892d607..b1521da 100644 (file)
@@ -34,7 +34,8 @@ class DbBackendType(Enum):
 class _Configuration():
     """This class implements Shared Data Layer (SDL) configurability."""
     Params = namedtuple('Params', ['db_host', 'db_port', 'db_sentinel_port',
-                                   'db_sentinel_master_name', 'db_type'])
+                                   'db_sentinel_master_name',
+                                   'db_cluster_addr_list', 'db_type'])
 
     def __init__(self, fake_db_backend):
         self.params = self._read_configuration(fake_db_backend)
@@ -46,6 +47,7 @@ class _Configuration():
                 "DB port": self.params.db_port,
                 "DB master sentinel": self.params.db_sentinel_master_name,
                 "DB sentinel port": self.params.db_sentinel_port,
+                "DB cluster address list": self.params.db_cluster_addr_list,
                 "DB type": self.params.db_type.name,
             }
         )
@@ -72,4 +74,5 @@ class _Configuration():
                                      db_port=os.getenv('DBAAS_SERVICE_PORT'),
                                      db_sentinel_port=os.getenv('DBAAS_SERVICE_SENTINEL_PORT'),
                                      db_sentinel_master_name=os.getenv('DBAAS_MASTER_NAME'),
+                                     db_cluster_addr_list=os.getenv('DBAAS_CLUSTER_ADDR_LIST'),
                                      db_type=backend_type)
index a8c2d12..6c5fd88 100755 (executable)
@@ -53,6 +53,7 @@ def fake_dict_backend_fixture(request):
                                              db_port=None,
                                              db_sentinel_port=None,
                                              db_sentinel_master_name=None,
+                                             db_cluster_addr_list=None,
                                              db_type=DbBackendType.FAKE_DICT)
     request.cls.configuration.get_params.return_value = mock_conf_params
     request.cls.db = ricsdl.backend.get_backend_instance(request.cls.configuration)
@@ -329,6 +330,7 @@ def fake_dict_backend_lock_fixture(request):
                                              db_port=None,
                                              db_sentinel_port=None,
                                              db_sentinel_master_name=None,
+                                             db_cluster_addr_list=None,
                                              db_type=DbBackendType.FAKE_DICT)
     request.cls.configuration.get_params.return_value = mock_conf_params
     request.cls.lock = ricsdl.backend.get_backend_lock_instance(request.cls.configuration,
index c5e1b03..1d79752 100755 (executable)
@@ -19,7 +19,7 @@
 #
 
 
-from unittest.mock import patch, Mock
+from unittest.mock import patch, Mock, MagicMock, call, ANY
 import pytest
 from redis import exceptions as redis_exceptions
 import ricsdl.backend
@@ -29,8 +29,32 @@ from ricsdl.configuration import DbBackendType
 import ricsdl.exceptions
 
 
+def get_test_sdl_standby_config():
+    return _Configuration.Params(db_host='service-ricplt-dbaas-tcp-cluster-0.ricplt',
+                                 db_port=6379,
+                                 db_sentinel_port=None,
+                                 db_sentinel_master_name=None,
+                                 db_cluster_addr_list=None,
+                                 db_type=DbBackendType.REDIS)
+
+def get_test_sdl_sentinel_config():
+    return _Configuration.Params(db_host='service-ricplt-dbaas-tcp-cluster-0.ricplt',
+                                 db_port=6379,
+                                 db_sentinel_port=26379,
+                                 db_sentinel_master_name='dbaasmaster',
+                                 db_cluster_addr_list=None,
+                                 db_type=DbBackendType.REDIS)
+
+def get_test_sdl_sentinel_cluster_config():
+    return _Configuration.Params(db_host='service-ricplt-dbaas-tcp-cluster-0.ricplt',
+                                 db_port=6379,
+                                 db_sentinel_port=26379,
+                                 db_sentinel_master_name='dbaasmaster',
+                                 db_cluster_addr_list='service-ricplt-dbaas-tcp-cluster-0.ricplt,service-ricplt-dbaas-tcp-cluster-1.ricplt',
+                                 db_type=DbBackendType.REDIS)
+
 @pytest.fixture()
-def redis_backend_fixture(request):
+def redis_backend_common_fixture(request):
     request.cls.ns = 'some-ns'
     request.cls.dl_redis = [b'1', b'2']
     request.cls.dm = {'a': b'1', 'b': b'2'}
@@ -56,22 +80,86 @@ def redis_backend_fixture(request):
     request.cls.groupmembers = set([b'm1', b'm2'])
     request.cls.groupmember = b'm1'
     request.cls.channels = ['abs', 'gma']
-    request.cls.channels_and_events = {'abs': ['cbn']}
-    request.cls.channels_and_events_redis = ['{some-ns},abs', 'cbn']
+    request.cls.channels_and_events = {'abs': ['cbn'], 'gma': ['jkl']}
+    request.cls.channels_and_events_redis = ['{some-ns},abs', 'cbn', '{some-ns},gma', 'jkl']
+
+    yield
 
+@pytest.fixture(params=['standalone', 'sentinel', 'sentinel_cluster'])
+def redis_backend_fixture(request, redis_backend_common_fixture):
     request.cls.configuration = Mock()
-    mock_conf_params = _Configuration.Params(db_host=None,
-                                             db_port=None,
-                                             db_sentinel_port=None,
-                                             db_sentinel_master_name=None,
-                                             db_type=DbBackendType.REDIS)
-    request.cls.configuration.get_params.return_value = mock_conf_params
-    with patch('ricsdl.backend.redis.Redis') as mock_redis, patch(
-            'ricsdl.backend.redis.PubSub') as mock_pubsub:
-        db = ricsdl.backend.get_backend_instance(request.cls.configuration)
-        request.cls.mock_redis = mock_redis.return_value
-        request.cls.mock_pubsub = mock_pubsub.return_value
-    request.cls.db = db
+    request.cls.test_backend_type = request.param
+    if request.param == 'standalone':
+        cfg = get_test_sdl_standby_config()
+        request.cls.configuration.get_params.return_value = cfg
+        with patch('ricsdl.backend.redis.Redis') as mock_redis, patch(
+                   'ricsdl.backend.redis.PubSub') as mock_pubsub, patch(
+                   'threading.Thread') as mock_thread:
+            db = ricsdl.backend.get_backend_instance(request.cls.configuration)
+            request.cls.mock_redis = mock_redis.return_value
+            request.cls.mock_pubsub = mock_pubsub.return_value
+            request.cls.mock_pubsub_thread = mock_thread.return_value
+            request.cls.mock_pubsub_thread.is_alive.return_value = False
+        request.cls.db = db
+
+        mock_redis.assert_called_once_with(db=0, host=cfg.db_host, max_connections=20, port=cfg.db_port)
+        mock_pubsub.assert_called_once_with(request.cls.mock_redis.connection_pool, ignore_subscribe_messages=True)
+        assert request.cls.mock_redis.set_response_callback.call_count == 2
+        assert request.cls.mock_redis.set_response_callback.call_args_list == [call('SETIE', ANY), call('DELIE', ANY)]
+
+    elif request.param == 'sentinel':
+        cfg = get_test_sdl_sentinel_config()
+        request.cls.configuration.get_params.return_value = cfg
+        with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel, patch(
+                   'ricsdl.backend.redis.PubSub') as mock_pubsub, patch(
+                   'threading.Thread') as mock_thread:
+            db = ricsdl.backend.get_backend_instance(request.cls.configuration)
+            request.cls.mock_redis = mock_sentinel.return_value.master_for.return_value
+            request.cls.mock_pubsub = mock_pubsub.return_value
+            request.cls.mock_pubsub_thread = mock_thread.return_value
+            request.cls.mock_pubsub_thread.is_alive.return_value = False
+        request.cls.db = db
+
+        mock_sentinel.assert_called_once_with([(cfg.db_host, cfg.db_sentinel_port)])
+        mock_sentinel.master_for.called_once_with(cfg.db_sentinel_master_name)
+        mock_pubsub.assert_called_once_with(request.cls.mock_redis.connection_pool, ignore_subscribe_messages=True)
+        assert request.cls.mock_redis.set_response_callback.call_count == 2
+        assert request.cls.mock_redis.set_response_callback.call_args_list == [call('SETIE', ANY), call('DELIE', ANY)]
+
+    elif request.param == 'sentinel_cluster':
+        cfg = get_test_sdl_sentinel_cluster_config()
+        request.cls.configuration.get_params.return_value = cfg
+        with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel, patch(
+                   'ricsdl.backend.redis.PubSub') as mock_pubsub, patch(
+                   'threading.Thread') as mock_thread:
+            db = ricsdl.backend.get_backend_instance(request.cls.configuration)
+            request.cls.mock_redis = mock_sentinel.return_value.master_for.return_value
+            request.cls.mock_pubsub = mock_pubsub.return_value
+            request.cls.mock_pubsub_thread = mock_thread.return_value
+            request.cls.mock_pubsub_thread.is_alive.return_value = False
+        request.cls.db = db
+
+        assert mock_sentinel.call_count == 2
+        mock_sentinel.assert_has_calls([
+            call([('service-ricplt-dbaas-tcp-cluster-0.ricplt', cfg.db_sentinel_port)]),
+            call([('service-ricplt-dbaas-tcp-cluster-1.ricplt', cfg.db_sentinel_port)]),
+        ], any_order=True)
+        assert mock_sentinel.return_value.master_for.call_count == 2
+        mock_sentinel.return_value.master_for.assert_has_calls(
+            [call(cfg.db_sentinel_master_name), call(cfg.db_sentinel_master_name)], any_order=True,
+        )
+        assert mock_pubsub.call_count == 2
+        mock_pubsub.assert_has_calls([
+            call(request.cls.mock_redis.connection_pool, ignore_subscribe_messages=True),
+            call(request.cls.mock_redis.connection_pool, ignore_subscribe_messages=True),
+        ])
+        assert request.cls.mock_redis.set_response_callback.call_count == 4
+        assert request.cls.mock_redis.set_response_callback.call_args_list == [
+            call('SETIE', ANY), call('DELIE', ANY),
+            call('SETIE', ANY), call('DELIE', ANY),
+        ]
+    else:
+        raise NotImplementedError
 
     yield
 
@@ -81,7 +169,10 @@ class TestRedisBackend:
     def test_is_connected_function_success(self):
         self.mock_redis.ping.return_value = True
         ret = self.db.is_connected()
-        self.mock_redis.ping.assert_called_once()
+        if self.test_backend_type == 'sentinel_cluster':
+            assert self.mock_redis.ping.call_count == 2
+        else:
+            assert self.mock_redis.ping.call_count == 1
         assert ret is True
 
     def test_is_connected_function_returns_false_if_ping_fails(self):
@@ -437,9 +528,11 @@ class TestRedisBackend:
 
     def test_subscribe_channel_with_thread_success(self):
         cb = Mock()
-        self.db.pubsub_thread.is_alive = Mock()
-        self.db.pubsub_thread.is_alive.return_value = False
-        self.db._run_in_thread = True
+        # Call first start_event_listener() to enable run_in_thread flag. When subscribe_channel()
+        # is called thread is started if the flag is enabled. In real-life scenario it's highly
+        # advisable at first to subscribe to some events by calling subscribe_channel() and only
+        # after it start threads by calling start_event_listener().
+        self.db.start_event_listener()
         self.db.subscribe_channel(self.ns, cb, self.channels)
         self.mock_pubsub.run_in_thread.assert_called_once_with(daemon=True, sleep_time=0.001)
 
@@ -457,46 +550,137 @@ class TestRedisBackend:
         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
             self.db.unsubscribe_channel(self.ns, [self.channels[0]])
 
-    def test_start_event_listener_success(self):
-        self.db.start_event_listener()
-        assert self.db._run_in_thread
-
-    def test_start_event_listener_subscribe_first(self):
-        self.mock_pubsub.run_in_thread.return_value = Mock()
-        self.mock_redis.pubsub_channels.return_value = [b'{some-ns},abs']
+    def test_subscribe_and_start_event_listener(self):
+        self.mock_redis.pubsub_channels.return_value = self.channels_and_events_redis
         self.db.subscribe_channel(self.ns, Mock(), self.channels)
         self.db.start_event_listener()
-        self.mock_pubsub.run_in_thread.assert_called_once_with(daemon=True, sleep_time=0.001)
+
+        if self.test_backend_type == 'sentinel_cluster':
+            assert self.mock_redis.pubsub_channels.call_count == 2
+            assert self.mock_pubsub.run_in_thread.call_count == 2
+            self.mock_pubsub.run_in_thread.assert_has_calls([
+                call(daemon=True, sleep_time=0.001),
+                call(daemon=True, sleep_time=0.001),
+            ])
+        else:
+            self.mock_redis.pubsub_channels.assert_called_once()
+            self.mock_pubsub.run_in_thread.assert_called_once_with(daemon=True, sleep_time=0.001)
 
     def test_start_event_listener_fail(self):
-        self.db.pubsub_thread.is_alive = Mock()
-        self.db.pubsub_thread.is_alive.return_value = True
+        self.mock_pubsub_thread.is_alive.return_value = True
         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
             self.db.start_event_listener()
 
     def test_handle_events_success(self):
         self.db.handle_events()
-        self.mock_pubsub.get_message.assert_called_once_with(ignore_subscribe_messages=True)
+        self.db.handle_events()
+        self.db.handle_events()
+        self.db.handle_events()
+        assert self.mock_pubsub.get_message.call_count == 4
 
-    def test_handle_events_fail_already_started(self):
-        self.db.pubsub_thread.is_alive = Mock()
-        self.db.pubsub_thread.is_alive.return_value = True
+    def test_handle_events_fail_if_subsub_thread_alive(self):
+        self.mock_pubsub_thread.is_alive.return_value = True
         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
             self.db.handle_events()
 
-    def test_handle_events_fail_already_set(self):
-        self.db._run_in_thread = True
+    def test_handle_events_fail_if_event_listener_already_running(self):
+        self.db.start_event_listener()
         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
             self.db.handle_events()
 
+    def test_handle_events_ignores_message_handling_redis_runtime_exception(self):
+         self.mock_pubsub.get_message.side_effect = RuntimeError()
+         self.db.handle_events()
+         self.mock_pubsub.get_message.assert_called_once()
+
     def test_get_redis_connection_function_success(self):
-        ret = self.db.get_redis_connection()
+        ret = self.db.get_redis_connection(self.ns)
         assert ret is self.mock_redis
 
     def test_redis_backend_object_string_representation(self):
         str_out = str(self.db)
         assert str_out is not None
 
+    def test_namespace_hash_algorithm_stays_unaltered(self):
+        ret_hash = self.db._RedisBackend__get_hash('sdltoolns')
+        assert ret_hash == 2897969051
+
+def test_standalone_redis_init_exception_is_mapped_to_sdl_exeception():
+    mock_cfg = Mock()
+    cfg_params = get_test_sdl_standby_config()
+    mock_cfg.get_params.return_value = cfg_params
+    with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+        with patch('ricsdl.backend.redis.Redis') as mock_redis:
+            mock_redis.side_effect = redis_exceptions.ResponseError('redis error!')
+            ricsdl.backend.get_backend_instance(mock_cfg)
+
+def test_standalone_pubsub_init_exception_is_mapped_to_sdl_exeception():
+    mock_cfg = Mock()
+    cfg_params = get_test_sdl_standby_config()
+    mock_cfg.get_params.return_value = cfg_params
+    with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+        with patch('ricsdl.backend.redis.Redis') as mock_redis, patch(
+                   'ricsdl.backend.redis.PubSub') as mock_pubsub:
+            mock_pubsub.side_effect = redis_exceptions.ResponseError('redis error!')
+            ricsdl.backend.get_backend_instance(mock_cfg)
+
+def test_sentinel_redis_init_exception_is_mapped_to_sdl_exeception():
+    mock_cfg = Mock()
+    cfg_params = get_test_sdl_sentinel_config()
+    mock_cfg.get_params.return_value = cfg_params
+    with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+        with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel:
+            mock_sentinel.side_effect = redis_exceptions.ResponseError('redis error!')
+            ricsdl.backend.get_backend_instance(mock_cfg)
+
+def test_sentinel_pubsub_init_exception_is_mapped_to_sdl_exeception():
+    mock_cfg = Mock()
+    cfg_params = get_test_sdl_sentinel_config()
+    mock_cfg.get_params.return_value = cfg_params
+    with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+        with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel, patch(
+                   'ricsdl.backend.redis.PubSub') as mock_pubsub:
+            mock_pubsub.side_effect = redis_exceptions.ResponseError('redis error!')
+            ricsdl.backend.get_backend_instance(mock_cfg)
+
+def test_sentinel_master_for_exception_is_mapped_to_sdl_exeception():
+    mock_cfg = Mock()
+    cfg_params = get_test_sdl_sentinel_config()
+    mock_cfg.get_params.return_value = cfg_params
+    with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+        with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel, patch(
+                   'ricsdl.backend.redis.PubSub') as mock_pubsub:
+            mock_sentinel.return_value.master_for.side_effect = redis_exceptions.ResponseError('redis error!')
+            ricsdl.backend.get_backend_instance(mock_cfg)
+
+def test_sentinel_cluster_redis_init_exception_is_mapped_to_sdl_exeception():
+    mock_cfg = Mock()
+    cfg_params = get_test_sdl_sentinel_cluster_config()
+    mock_cfg.get_params.return_value = cfg_params
+    with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+        with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel:
+            mock_sentinel.side_effect = redis_exceptions.ResponseError('redis error!')
+            ricsdl.backend.get_backend_instance(mock_cfg)
+
+def test_sentinel_cluster_pubsub_init_exception_is_mapped_to_sdl_exeception():
+    mock_cfg = Mock()
+    cfg_params = get_test_sdl_sentinel_cluster_config()
+    mock_cfg.get_params.return_value = cfg_params
+    with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+        with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel, patch(
+                   'ricsdl.backend.redis.PubSub') as mock_pubsub:
+            mock_pubsub.side_effect = redis_exceptions.ResponseError('redis error!')
+            ricsdl.backend.get_backend_instance(mock_cfg)
+
+def test_sentinel_cluster_master_for_exception_is_mapped_to_sdl_exeception():
+    mock_cfg = Mock()
+    cfg_params = get_test_sdl_sentinel_cluster_config()
+    mock_cfg.get_params.return_value = cfg_params
+    with pytest.raises(ricsdl.exceptions.RejectedByBackend):
+        with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel, patch(
+                   'ricsdl.backend.redis.PubSub') as mock_pubsub:
+            mock_sentinel.return_value.master_for.side_effect = redis_exceptions.ResponseError('redis error!')
+            ricsdl.backend.get_backend_instance(mock_cfg)
 
 class MockRedisLock:
     def __init__(self, redis, name, timeout=None, sleep=0.1,
@@ -542,6 +726,7 @@ def redis_backend_lock_fixture(request, mock_redis_lock):
                                              db_port=None,
                                              db_sentinel_port=None,
                                              db_sentinel_master_name=None,
+                                             db_cluster_addr_list=None,
                                              db_type=DbBackendType.REDIS)
     request.cls.configuration.get_params.return_value = mock_conf_params
 
index 6905bf6..a9d24ba 100644 (file)
@@ -26,10 +26,11 @@ from ricsdl.configuration import DbBackendType
 
 @pytest.fixture()
 def config_fixture(request, monkeypatch):
-    monkeypatch.setenv('DBAAS_SERVICE_HOST', '10.20.30.40')
+    monkeypatch.setenv('DBAAS_SERVICE_HOST', 'service-ricplt-dbaas-tcp-cluster-0.ricplt')
     monkeypatch.setenv('DBAAS_SERVICE_PORT', '10000')
     monkeypatch.setenv('DBAAS_SERVICE_SENTINEL_PORT', '11000')
     monkeypatch.setenv('DBAAS_MASTER_NAME', 'my-master')
+    monkeypatch.setenv('DBAAS_CLUSTER_ADDR_LIST', 'service-ricplt-dbaas-tcp-cluster-0.ricplt,service-ricplt-dbaas-tcp-cluster-1.ricplt')
     request.cls.config = _Configuration(fake_db_backend=None)
 
 
@@ -39,14 +40,17 @@ def fake_db_config_fixture(request, monkeypatch):
     monkeypatch.delenv('DBAAS_SERVICE_PORT', raising=False)
     monkeypatch.delenv('DBAAS_SERVICE_SENTINEL_PORT', raising=False)
     monkeypatch.delenv('DBAAS_MASTER_NAME', raising=False)
+    monkeypatch.delenv('DBAAS_CLUSTER_ADDR_LIST', raising=False)
     request.cls.config = _Configuration(fake_db_backend='dict')
 
 
 class TestConfiguration:
     def test_get_params_function_returns_read_configuration(self, config_fixture):
-        expected_config = _Configuration.Params(db_host='10.20.30.40', db_port='10000',
+        expected_config = _Configuration.Params(db_host='service-ricplt-dbaas-tcp-cluster-0.ricplt',
+                                                db_port='10000',
                                                 db_sentinel_port='11000',
                                                 db_sentinel_master_name='my-master',
+                                                db_cluster_addr_list='service-ricplt-dbaas-tcp-cluster-0.ricplt,service-ricplt-dbaas-tcp-cluster-1.ricplt',
                                                 db_type=DbBackendType.REDIS)
         assert expected_config == self.config.get_params()
 
@@ -54,6 +58,7 @@ class TestConfiguration:
         expected_config = _Configuration.Params(db_host=None, db_port=None,
                                                 db_sentinel_port=None,
                                                 db_sentinel_master_name=None,
+                                                db_cluster_addr_list=None,
                                                 db_type=DbBackendType.FAKE_DICT)
         assert expected_config == self.config.get_params()
 
@@ -63,10 +68,11 @@ class TestConfiguration:
 
 
     def test_configuration_object_string_representation(self, config_fixture):
-        expected_config_info = {'DB host': '10.20.30.40',
+        expected_config_info = {'DB host': 'service-ricplt-dbaas-tcp-cluster-0.ricplt',
                                 'DB port': '10000',
                                 'DB master sentinel': 'my-master',
                                 'DB sentinel port': '11000',
+                                'DB cluster address list': 'service-ricplt-dbaas-tcp-cluster-0.ricplt,service-ricplt-dbaas-tcp-cluster-1.ricplt',
                                 'DB type': 'REDIS'}
         assert str(self.config) == str(expected_config_info)
 
@@ -75,5 +81,6 @@ class TestConfiguration:
                                 'DB port': None,
                                 'DB master sentinel': None,
                                 'DB sentinel port': None,
+                                'DB cluster address list': None,
                                 'DB type': 'FAKE_DICT'}
         assert str(self.config) == str(expected_config_info)