1 # Copyright (c) 2019 AT&T Intellectual Property.
2 # Copyright (c) 2018-2019 Nokia.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 # This source code is part of the near-RT RIC (RAN Intelligent Controller)
18 # platform project (RICP).
22 """The module provides implementation of Shared Data Layer (SDL) database backend interface."""
24 from typing import (Dict, Set, List, Union)
25 from redis import Redis
26 from redis.sentinel import Sentinel
27 from redis.lock import Lock
28 from redis._compat import nativestr
29 from redis import exceptions as redis_exceptions
30 from ricsdl.configuration import _Configuration
31 from ricsdl.exceptions import (
36 from .dbbackend_abc import DbBackendAbc
37 from .dbbackend_abc import DbBackendLockAbc
40 @contextlib.contextmanager
41 def _map_to_sdl_exception():
42 """Translates known redis exceptions into SDL exceptions."""
45 except(redis_exceptions.ResponseError) as exc:
46 raise RejectedByBackend("SDL backend rejected the request: {}".
47 format(str(exc))) from exc
48 except(redis_exceptions.ConnectionError, redis_exceptions.TimeoutError) as exc:
49 raise NotConnected("SDL not connected to backend: {}".
50 format(str(exc))) from exc
51 except(redis_exceptions.RedisError) as exc:
52 raise BackendError("SDL backend failed to process the request: {}".
53 format(str(exc))) from exc
56 class RedisBackend(DbBackendAbc):
58 A class providing an implementation of database backend of Shared Data Layer (SDL), when
59 backend database solution is Redis.
62 configuration (_Configuration): SDL configuration, containing credentials to connect to
63 Redis database backend.
65 def __init__(self, configuration: _Configuration) -> None:
67 with _map_to_sdl_exception():
68 if configuration.get_params().db_sentinel_port:
69 sentinel_node = (configuration.get_params().db_host,
70 configuration.get_params().db_sentinel_port)
71 master_name = configuration.get_params().db_sentinel_master_name
72 self.__sentinel = Sentinel([sentinel_node])
73 self.__redis = self.__sentinel.master_for(master_name)
75 self.__redis = Redis(host=configuration.get_params().db_host,
76 port=configuration.get_params().db_port,
79 self.__redis.set_response_callback('SETIE', lambda r: r and nativestr(r) == 'OK' or False)
80 self.__redis.set_response_callback('DELIE', lambda r: r and int(r) == 1 or False)
89 "Redis connection": repr(self.__redis)
93 def is_connected(self):
94 with _map_to_sdl_exception():
95 return self.__redis.ping()
100 def set(self, ns: str, data_map: Dict[str, bytes]) -> None:
101 db_data_map = self._add_data_map_ns_prefix(ns, data_map)
102 with _map_to_sdl_exception():
103 self.__redis.mset(db_data_map)
105 def set_if(self, ns: str, key: str, old_data: bytes, new_data: bytes) -> bool:
106 db_key = self._add_key_ns_prefix(ns, key)
107 with _map_to_sdl_exception():
108 return self.__redis.execute_command('SETIE', db_key, new_data, old_data)
110 def set_if_not_exists(self, ns: str, key: str, data: bytes) -> bool:
111 db_key = self._add_key_ns_prefix(ns, key)
112 with _map_to_sdl_exception():
113 return self.__redis.setnx(db_key, data)
115 def get(self, ns: str, keys: List[str]) -> Dict[str, bytes]:
117 db_keys = self._add_keys_ns_prefix(ns, keys)
118 with _map_to_sdl_exception():
119 values = self.__redis.mget(db_keys)
120 for idx, val in enumerate(values):
121 # return only key values, which has a value
126 def find_keys(self, ns: str, key_pattern: str) -> List[str]:
127 db_key_pattern = self._add_key_ns_prefix(ns, key_pattern)
128 with _map_to_sdl_exception():
129 ret = self.__redis.keys(db_key_pattern)
130 return self._strip_ns_from_bin_keys(ns, ret)
132 def find_and_get(self, ns: str, key_pattern: str) -> Dict[str, bytes]:
133 # todo: replace below implementation with redis 'NGET' module
134 ret = dict() # type: Dict[str, bytes]
135 with _map_to_sdl_exception():
136 matched_keys = self.find_keys(ns, key_pattern)
138 ret = self.get(ns, matched_keys)
141 def remove(self, ns: str, keys: List[str]) -> None:
142 db_keys = self._add_keys_ns_prefix(ns, keys)
143 with _map_to_sdl_exception():
144 self.__redis.delete(*db_keys)
146 def remove_if(self, ns: str, key: str, data: bytes) -> bool:
147 db_key = self._add_key_ns_prefix(ns, key)
148 with _map_to_sdl_exception():
149 return self.__redis.execute_command('DELIE', db_key, data)
151 def add_member(self, ns: str, group: str, members: Set[bytes]) -> None:
152 db_key = self._add_key_ns_prefix(ns, group)
153 with _map_to_sdl_exception():
154 self.__redis.sadd(db_key, *members)
156 def remove_member(self, ns: str, group: str, members: Set[bytes]) -> None:
157 db_key = self._add_key_ns_prefix(ns, group)
158 with _map_to_sdl_exception():
159 self.__redis.srem(db_key, *members)
161 def remove_group(self, ns: str, group: str) -> None:
162 db_key = self._add_key_ns_prefix(ns, group)
163 with _map_to_sdl_exception():
164 self.__redis.delete(db_key)
166 def get_members(self, ns: str, group: str) -> Set[bytes]:
167 db_key = self._add_key_ns_prefix(ns, group)
168 with _map_to_sdl_exception():
169 return self.__redis.smembers(db_key)
171 def is_member(self, ns: str, group: str, member: bytes) -> bool:
172 db_key = self._add_key_ns_prefix(ns, group)
173 with _map_to_sdl_exception():
174 return self.__redis.sismember(db_key, member)
176 def group_size(self, ns: str, group: str) -> int:
177 db_key = self._add_key_ns_prefix(ns, group)
178 with _map_to_sdl_exception():
179 return self.__redis.scard(db_key)
182 def _add_key_ns_prefix(cls, ns: str, key: str):
183 return '{' + ns + '},' + key
186 def _add_keys_ns_prefix(cls, ns: str, keylist: List[str]) -> List[str]:
189 ret_nskeys.append('{' + ns + '},' + k)
193 def _add_data_map_ns_prefix(cls, ns: str, data_dict: Dict[str, bytes]) -> Dict[str, bytes]:
195 for key, val in data_dict.items():
196 ret_nsdict['{' + ns + '},' + key] = val
200 def _strip_ns_from_bin_keys(cls, ns: str, nskeylist: List[bytes]) -> List[str]:
204 redis_key = k.decode("utf-8")
205 except UnicodeDecodeError as exc:
206 msg = u'Namespace %s key conversion to string failed: %s' % (ns, str(exc))
207 raise RejectedByBackend(msg)
208 nskey = redis_key.split(',', 1)
210 msg = u'Namespace %s key:%s has no namespace prefix' % (ns, redis_key)
211 raise RejectedByBackend(msg)
212 ret_keys.append(nskey[1])
215 def get_redis_connection(self):
216 """Return existing Redis database connection."""
220 class RedisBackendLock(DbBackendLockAbc):
222 A class providing an implementation of database backend lock of Shared Data Layer (SDL), when
223 backend database solution is Redis.
226 ns (str): Namespace under which this lock is targeted.
227 name (str): Lock name, identifies the lock key in a Redis database backend.
228 expiration (int, float): Lock expiration time after which the lock is removed if it hasn't
229 been released earlier by a 'release' method.
230 redis_backend (RedisBackend): Database backend object containing connection to Redis
233 lua_get_validity_time = None
234 # KEYS[1] - lock name
236 # return < 0 in case of failure, otherwise return lock validity time in milliseconds.
237 LUA_GET_VALIDITY_TIME_SCRIPT = """
238 local token = redis.call('get', KEYS[1])
242 if token ~= ARGV[1] then
245 return redis.call('pttl', KEYS[1])
248 def __init__(self, ns: str, name: str, expiration: Union[int, float],
249 redis_backend: RedisBackend) -> None:
250 super().__init__(ns, name)
251 self.__redis = redis_backend.get_redis_connection()
252 with _map_to_sdl_exception():
253 redis_lockname = '{' + ns + '},' + self._lock_name
254 self.__redis_lock = Lock(redis=self.__redis, name=redis_lockname, timeout=expiration)
255 self._register_scripts()
260 "lock DB type": "Redis",
261 "lock namespace": self._ns,
262 "lock name": self._lock_name,
263 "lock status": self._lock_status_to_string()
267 def acquire(self, retry_interval: Union[int, float] = 0.1,
268 retry_timeout: Union[int, float] = 10) -> bool:
270 self.__redis_lock.sleep = retry_interval
271 with _map_to_sdl_exception():
272 succeeded = self.__redis_lock.acquire(blocking_timeout=retry_timeout)
275 def release(self) -> None:
276 with _map_to_sdl_exception():
277 self.__redis_lock.release()
279 def refresh(self) -> None:
280 with _map_to_sdl_exception():
281 self.__redis_lock.reacquire()
283 def get_validity_time(self) -> Union[int, float]:
285 if self.__redis_lock.local.token is None:
286 msg = u'Cannot get validity time of an unlocked lock %s' % self._lock_name
287 raise RejectedByBackend(msg)
289 with _map_to_sdl_exception():
290 validity = self.lua_get_validity_time(keys=[self.__redis_lock.name],
291 args=[self.__redis_lock.local.token],
294 msg = (u'Getting validity time of a lock %s failed with error code: %d'
295 % (self._lock_name, validity))
296 raise RejectedByBackend(msg)
297 ftime = validity / 1000.0
298 if ftime.is_integer():
302 def _register_scripts(self):
304 client = self.__redis
305 if cls.lua_get_validity_time is None:
306 cls.lua_get_validity_time = client.register_script(cls.LUA_GET_VALIDITY_TIME_SCRIPT)
308 def _lock_status_to_string(self) -> str:
310 if self.__redis_lock.locked():
311 if self.__redis_lock.owned():
313 return 'locked by someone else'
315 except(redis_exceptions.RedisError) as exc:
316 return f'Error: {str(exc)}'