X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=ricsdl-package%2Fricsdl%2Fbackend%2Fredis.py;h=d7139fb06aa74f6dbce2116d6e31463aaa3ccd04;hb=HEAD;hp=bc4b43b107c9f7cd2b3fb345b8e19cfee18f0334;hpb=db7753971931247abf7fed67921074e518ec1f6d;p=ric-plt%2Fsdlpy.git diff --git a/ricsdl-package/ricsdl/backend/redis.py b/ricsdl-package/ricsdl/backend/redis.py index bc4b43b..d7139fb 100755 --- a/ricsdl-package/ricsdl/backend/redis.py +++ b/ricsdl-package/ricsdl/backend/redis.py @@ -1,5 +1,5 @@ # Copyright (c) 2019 AT&T Intellectual Property. -# Copyright (c) 2018-2019 Nokia. +# Copyright (c) 2018-2022 Nokia. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -28,7 +28,7 @@ import redis from redis import Redis from redis.sentinel import Sentinel from redis.lock import Lock -from redis._compat import nativestr +from redis.utils import str_if_bytes from redis import exceptions as redis_exceptions from ricsdl.configuration import _Configuration from ricsdl.exceptions import ( @@ -45,13 +45,13 @@ def _map_to_sdl_exception(): """Translates known redis exceptions into SDL exceptions.""" try: yield - except(redis_exceptions.ResponseError) as exc: + except redis_exceptions.ResponseError as exc: raise RejectedByBackend("SDL backend rejected the request: {}". format(str(exc))) from exc - except(redis_exceptions.ConnectionError, redis_exceptions.TimeoutError) as exc: + except (redis_exceptions.ConnectionError, redis_exceptions.TimeoutError) as exc: raise NotConnected("SDL not connected to backend: {}". format(str(exc))) from exc - except(redis_exceptions.RedisError) as exc: + except redis_exceptions.RedisError as exc: raise BackendError("SDL backend failed to process the request: {}". format(str(exc))) from exc @@ -69,7 +69,7 @@ class PubSub(redis.client.PubSub): Adapted from: https://github.com/andymccurdy/redis-py/blob/master/redis/client.py """ - message_type = nativestr(response[0]) + message_type = str_if_bytes(response[0]) if message_type == 'pmessage': message = { 'type': message_type, @@ -117,9 +117,8 @@ class PubSub(redis.client.PubSub): message_channel = self._strip_ns_from_bin_key('', message['channel']) message_data = message['data'].decode('utf-8') messages = message_data.split(self.event_separator) - notification = messages[0] if len(messages) == 1 else messages - handler(message_channel, notification) - return message_channel, notification + handler(message_channel, messages) + return message_channel, messages elif message_type != 'pong': # this is a subscribe/unsubscribe message. ignore if we don't # want them @@ -330,8 +329,7 @@ class RedisBackend(DbBackendAbc): *channels_and_events_prepared, ) - def subscribe_channel(self, ns: str, - cb: Union[Callable[[str, str], None], Callable[[str, List[str]], None]], + def subscribe_channel(self, ns: str, cb: Callable[[str, List[str]], None], channels: List[str]) -> None: channels = self.__add_keys_ns_prefix(ns, channels) for channel in channels: @@ -357,7 +355,7 @@ class RedisBackend(DbBackendAbc): redis_ctx.pubsub_thread = redis_ctx.redis_pubsub.run_in_thread(sleep_time=0.001, daemon=True) redis_ctx.run_in_thread = True - def handle_events(self) -> Optional[Union[Tuple[str, str], Tuple[str, List[str]]]]: + def handle_events(self) -> Optional[Tuple[str, List[str]]]: if self.next_client_event >= len(self.clients): self.next_client_event = 0 redis_ctx = self.clients[self.next_client_event] @@ -372,29 +370,26 @@ class RedisBackend(DbBackendAbc): def __create_redis_clients(self, config): clients = list() cfg_params = config.get_params() - if cfg_params.db_cluster_addr_list is None: - clients.append(self.__create_legacy_redis_client(cfg_params)) - else: - for addr in cfg_params.db_cluster_addr_list.split(","): - client = self.__create_redis_client(cfg_params, addr) - clients.append(client) - return clients + for i, addr in enumerate(cfg_params.db_cluster_addrs): + port = cfg_params.db_ports[i] if i < len(cfg_params.db_ports) else "" + sport = cfg_params.db_sentinel_ports[i] if i < len(cfg_params.db_sentinel_ports) else "" + name = cfg_params.db_sentinel_master_names[i] if i < len(cfg_params.db_sentinel_master_names) else "" - def __create_legacy_redis_client(self, cfg_params): - return self.__create_redis_client(cfg_params, cfg_params.db_host) + client = self.__create_redis_client(addr, port, sport, name) + clients.append(client) + return clients - def __create_redis_client(self, cfg_params, addr): + def __create_redis_client(self, addr, port, sentinel_port, master_name): new_sentinel = None new_redis = None - if cfg_params.db_sentinel_port is None: - new_redis = Redis(host=addr, port=cfg_params.db_port, db=0, max_connections=20) + if len(sentinel_port) == 0: + new_redis = Redis(host=addr, port=port, db=0, max_connections=20) else: - sentinel_node = (addr, cfg_params.db_sentinel_port) - master_name = cfg_params.db_sentinel_master_name + sentinel_node = (addr, sentinel_port) new_sentinel = Sentinel([sentinel_node]) new_redis = new_sentinel.master_for(master_name) - new_redis.set_response_callback('SETIE', lambda r: r and nativestr(r) == 'OK' or False) + new_redis.set_response_callback('SETIE', lambda r: r and str_if_bytes(r) == 'OK' or False) new_redis.set_response_callback('DELIE', lambda r: r and int(r) == 1 or False) redis_pubsub = PubSub(self.event_separator, new_redis.connection_pool, ignore_subscribe_messages=True) @@ -593,5 +588,5 @@ class RedisBackendLock(DbBackendLockAbc): return 'locked' return 'locked by someone else' return 'unlocked' - except(redis_exceptions.RedisError) as exc: + except redis_exceptions.RedisError as exc: return f'Error: {str(exc)}'