Code Review
/
ric-plt
/
sdlpy.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
RIC:1060: Change in PTL
[ric-plt/sdlpy.git]
/
ricsdl-package
/
ricsdl
/
backend
/
redis.py
diff --git
a/ricsdl-package/ricsdl/backend/redis.py
b/ricsdl-package/ricsdl/backend/redis.py
index
bc4b43b
..
d7139fb
100755
(executable)
--- a/
ricsdl-package/ricsdl/backend/redis.py
+++ b/
ricsdl-package/ricsdl/backend/redis.py
@@
-1,5
+1,5
@@
# Copyright (c) 2019 AT&T Intellectual Property.
# Copyright (c) 2019 AT&T Intellectual Property.
-# Copyright (c) 2018-20
19
Nokia.
+# Copyright (c) 2018-20
22
Nokia.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@
-28,7
+28,7
@@
import redis
from redis import Redis
from redis.sentinel import Sentinel
from redis.lock import Lock
from redis import Redis
from redis.sentinel import Sentinel
from redis.lock import Lock
-from redis.
_compat import nativestr
+from redis.
utils import str_if_bytes
from redis import exceptions as redis_exceptions
from ricsdl.configuration import _Configuration
from ricsdl.exceptions import (
from redis import exceptions as redis_exceptions
from ricsdl.configuration import _Configuration
from ricsdl.exceptions import (
@@
-45,13
+45,13
@@
def _map_to_sdl_exception():
"""Translates known redis exceptions into SDL exceptions."""
try:
yield
"""Translates known redis exceptions into SDL exceptions."""
try:
yield
- except
(redis_exceptions.ResponseError)
as exc:
+ except
redis_exceptions.ResponseError
as exc:
raise RejectedByBackend("SDL backend rejected the request: {}".
format(str(exc))) from exc
raise RejectedByBackend("SDL backend rejected the request: {}".
format(str(exc))) from exc
- except(redis_exceptions.ConnectionError, redis_exceptions.TimeoutError) as exc:
+ except
(redis_exceptions.ConnectionError, redis_exceptions.TimeoutError) as exc:
raise NotConnected("SDL not connected to backend: {}".
format(str(exc))) from exc
raise NotConnected("SDL not connected to backend: {}".
format(str(exc))) from exc
- except
(redis_exceptions.RedisError)
as exc:
+ except
redis_exceptions.RedisError
as exc:
raise BackendError("SDL backend failed to process the request: {}".
format(str(exc))) from exc
raise BackendError("SDL backend failed to process the request: {}".
format(str(exc))) from exc
@@
-69,7
+69,7
@@
class PubSub(redis.client.PubSub):
Adapted from: https://github.com/andymccurdy/redis-py/blob/master/redis/client.py
"""
Adapted from: https://github.com/andymccurdy/redis-py/blob/master/redis/client.py
"""
- message_type =
nativestr
(response[0])
+ message_type =
str_if_bytes
(response[0])
if message_type == 'pmessage':
message = {
'type': message_type,
if message_type == 'pmessage':
message = {
'type': message_type,
@@
-117,9
+117,8
@@
class PubSub(redis.client.PubSub):
message_channel = self._strip_ns_from_bin_key('', message['channel'])
message_data = message['data'].decode('utf-8')
messages = message_data.split(self.event_separator)
message_channel = self._strip_ns_from_bin_key('', message['channel'])
message_data = message['data'].decode('utf-8')
messages = message_data.split(self.event_separator)
- notification = messages[0] if len(messages) == 1 else messages
- handler(message_channel, notification)
- return message_channel, notification
+ handler(message_channel, messages)
+ return message_channel, messages
elif message_type != 'pong':
# this is a subscribe/unsubscribe message. ignore if we don't
# want them
elif message_type != 'pong':
# this is a subscribe/unsubscribe message. ignore if we don't
# want them
@@
-330,8
+329,7
@@
class RedisBackend(DbBackendAbc):
*channels_and_events_prepared,
)
*channels_and_events_prepared,
)
- def subscribe_channel(self, ns: str,
- cb: Union[Callable[[str, str], None], Callable[[str, List[str]], None]],
+ def subscribe_channel(self, ns: str, cb: Callable[[str, List[str]], None],
channels: List[str]) -> None:
channels = self.__add_keys_ns_prefix(ns, channels)
for channel in channels:
channels: List[str]) -> None:
channels = self.__add_keys_ns_prefix(ns, channels)
for channel in channels:
@@
-357,7
+355,7
@@
class RedisBackend(DbBackendAbc):
redis_ctx.pubsub_thread = redis_ctx.redis_pubsub.run_in_thread(sleep_time=0.001, daemon=True)
redis_ctx.run_in_thread = True
redis_ctx.pubsub_thread = redis_ctx.redis_pubsub.run_in_thread(sleep_time=0.001, daemon=True)
redis_ctx.run_in_thread = True
- def handle_events(self) -> Optional[
Union[Tuple[str, str], Tuple[str, List[str]
]]]:
+ def handle_events(self) -> Optional[
Tuple[str, List[str
]]]:
if self.next_client_event >= len(self.clients):
self.next_client_event = 0
redis_ctx = self.clients[self.next_client_event]
if self.next_client_event >= len(self.clients):
self.next_client_event = 0
redis_ctx = self.clients[self.next_client_event]
@@
-372,29
+370,26
@@
class RedisBackend(DbBackendAbc):
def __create_redis_clients(self, config):
clients = list()
cfg_params = config.get_params()
def __create_redis_clients(self, config):
clients = list()
cfg_params = config.get_params()
- if cfg_params.db_cluster_addr_list is None:
- clients.append(self.__create_legacy_redis_client(cfg_params))
- else:
- for addr in cfg_params.db_cluster_addr_list.split(","):
- client = self.__create_redis_client(cfg_params, addr)
- clients.append(client)
- return clients
+ for i, addr in enumerate(cfg_params.db_cluster_addrs):
+ port = cfg_params.db_ports[i] if i < len(cfg_params.db_ports) else ""
+ sport = cfg_params.db_sentinel_ports[i] if i < len(cfg_params.db_sentinel_ports) else ""
+ name = cfg_params.db_sentinel_master_names[i] if i < len(cfg_params.db_sentinel_master_names) else ""
- def __create_legacy_redis_client(self, cfg_params):
- return self.__create_redis_client(cfg_params, cfg_params.db_host)
+ client = self.__create_redis_client(addr, port, sport, name)
+ clients.append(client)
+ return clients
- def __create_redis_client(self,
cfg_params, addr
):
+ def __create_redis_client(self,
addr, port, sentinel_port, master_name
):
new_sentinel = None
new_redis = None
new_sentinel = None
new_redis = None
- if
cfg_params.db_sentinel_port is None
:
- new_redis = Redis(host=addr, port=
cfg_params.db_
port, db=0, max_connections=20)
+ if
len(sentinel_port) == 0
:
+ new_redis = Redis(host=addr, port=port, db=0, max_connections=20)
else:
else:
- sentinel_node = (addr, cfg_params.db_sentinel_port)
- master_name = cfg_params.db_sentinel_master_name
+ sentinel_node = (addr, sentinel_port)
new_sentinel = Sentinel([sentinel_node])
new_redis = new_sentinel.master_for(master_name)
new_sentinel = Sentinel([sentinel_node])
new_redis = new_sentinel.master_for(master_name)
- new_redis.set_response_callback('SETIE', lambda r: r and
nativestr
(r) == 'OK' or False)
+ new_redis.set_response_callback('SETIE', lambda r: r and
str_if_bytes
(r) == 'OK' or False)
new_redis.set_response_callback('DELIE', lambda r: r and int(r) == 1 or False)
redis_pubsub = PubSub(self.event_separator, new_redis.connection_pool, ignore_subscribe_messages=True)
new_redis.set_response_callback('DELIE', lambda r: r and int(r) == 1 or False)
redis_pubsub = PubSub(self.event_separator, new_redis.connection_pool, ignore_subscribe_messages=True)
@@
-593,5
+588,5
@@
class RedisBackendLock(DbBackendLockAbc):
return 'locked'
return 'locked by someone else'
return 'unlocked'
return 'locked'
return 'locked by someone else'
return 'unlocked'
- except
(redis_exceptions.RedisError)
as exc:
+ except
redis_exceptions.RedisError
as exc:
return f'Error: {str(exc)}'
return f'Error: {str(exc)}'