1 # Copyright (c) 2019 AT&T Intellectual Property.
2 # Copyright (c) 2018-2019 Nokia.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 # This source code is part of the near-RT RIC (RAN Intelligent Controller)
18 # platform project (RICP).
22 """The module provides implementation of Shared Data Layer (SDL) database backend interface."""
24 from typing import (Dict, Set, List, Union)
25 from redis import Redis
26 from redis.sentinel import Sentinel
27 from redis.lock import Lock
28 from redis._compat import nativestr
29 from redis import exceptions as redis_exceptions
30 from ricsdl.configuration import _Configuration
31 from ricsdl.exceptions import (
36 from .dbbackend_abc import DbBackendAbc
37 from .dbbackend_abc import DbBackendLockAbc
40 @contextlib.contextmanager
41 def _map_to_sdl_exception():
42 """Translates known redis exceptions into SDL exceptions."""
45 except(redis_exceptions.ResponseError) as exc:
46 raise RejectedByBackend("SDL backend rejected the request: {}".
47 format(str(exc))) from exc
48 except(redis_exceptions.ConnectionError, redis_exceptions.TimeoutError) as exc:
49 raise NotConnected("SDL not connected to backend: {}".
50 format(str(exc))) from exc
51 except(redis_exceptions.RedisError) as exc:
52 raise BackendError("SDL backend failed to process the request: {}".
53 format(str(exc))) from exc
56 class RedisBackend(DbBackendAbc):
58 A class providing an implementation of database backend of Shared Data Layer (SDL), when
59 backend database solution is Redis.
62 configuration (_Configuration): SDL configuration, containing credentials to connect to
63 Redis database backend.
65 def __init__(self, configuration: _Configuration) -> None:
67 with _map_to_sdl_exception():
68 if configuration.get_params().db_sentinel_port:
69 sentinel_node = (configuration.get_params().db_host,
70 configuration.get_params().db_sentinel_port)
71 master_name = configuration.get_params().db_sentinel_master_name
72 self.__sentinel = Sentinel([sentinel_node])
73 self.__redis = self.__sentinel.master_for(master_name)
75 self.__redis = Redis(host=configuration.get_params().db_host,
76 port=configuration.get_params().db_port,
79 self.__redis.set_response_callback('SETIE', lambda r: r and nativestr(r) == 'OK' or False)
80 self.__redis.set_response_callback('DELIE', lambda r: r and int(r) == 1 or False)
88 "Redis connection": repr(self.__redis)
95 def set(self, ns: str, data_map: Dict[str, bytes]) -> None:
96 db_data_map = self._add_data_map_ns_prefix(ns, data_map)
97 with _map_to_sdl_exception():
98 self.__redis.mset(db_data_map)
100 def set_if(self, ns: str, key: str, old_data: bytes, new_data: bytes) -> bool:
101 db_key = self._add_key_ns_prefix(ns, key)
102 with _map_to_sdl_exception():
103 return self.__redis.execute_command('SETIE', db_key, new_data, old_data)
105 def set_if_not_exists(self, ns: str, key: str, data: bytes) -> bool:
106 db_key = self._add_key_ns_prefix(ns, key)
107 with _map_to_sdl_exception():
108 return self.__redis.setnx(db_key, data)
110 def get(self, ns: str, keys: List[str]) -> Dict[str, bytes]:
112 db_keys = self._add_keys_ns_prefix(ns, keys)
113 with _map_to_sdl_exception():
114 values = self.__redis.mget(db_keys)
115 for idx, val in enumerate(values):
116 # return only key values, which has a value
121 def find_keys(self, ns: str, key_prefix: str) -> List[str]:
122 escaped_key_prefix = self._escape_characters(key_prefix)
123 db_escaped_key_prefix = self._add_key_ns_prefix(ns, escaped_key_prefix + '*')
124 with _map_to_sdl_exception():
125 ret = self.__redis.keys(db_escaped_key_prefix)
126 return self._strip_ns_from_bin_keys(ns, ret)
128 def find_and_get(self, ns: str, key_prefix: str, atomic: bool) -> Dict[str, bytes]:
129 # todo: replace below implementation with redis 'NGET' module
130 ret = dict() # type: Dict[str, bytes]
131 with _map_to_sdl_exception():
132 matched_keys = self.find_keys(ns, key_prefix)
134 ret = self.get(ns, matched_keys)
137 def remove(self, ns: str, keys: List[str]) -> None:
138 db_keys = self._add_keys_ns_prefix(ns, keys)
139 with _map_to_sdl_exception():
140 self.__redis.delete(*db_keys)
142 def remove_if(self, ns: str, key: str, data: bytes) -> bool:
143 db_key = self._add_key_ns_prefix(ns, key)
144 with _map_to_sdl_exception():
145 return self.__redis.execute_command('DELIE', db_key, data)
147 def add_member(self, ns: str, group: str, members: Set[bytes]) -> None:
148 db_key = self._add_key_ns_prefix(ns, group)
149 with _map_to_sdl_exception():
150 self.__redis.sadd(db_key, *members)
152 def remove_member(self, ns: str, group: str, members: Set[bytes]) -> None:
153 db_key = self._add_key_ns_prefix(ns, group)
154 with _map_to_sdl_exception():
155 self.__redis.srem(db_key, *members)
157 def remove_group(self, ns: str, group: str) -> None:
158 db_key = self._add_key_ns_prefix(ns, group)
159 with _map_to_sdl_exception():
160 self.__redis.delete(db_key)
162 def get_members(self, ns: str, group: str) -> Set[bytes]:
163 db_key = self._add_key_ns_prefix(ns, group)
164 with _map_to_sdl_exception():
165 return self.__redis.smembers(db_key)
167 def is_member(self, ns: str, group: str, member: bytes) -> bool:
168 db_key = self._add_key_ns_prefix(ns, group)
169 with _map_to_sdl_exception():
170 return self.__redis.sismember(db_key, member)
172 def group_size(self, ns: str, group: str) -> int:
173 db_key = self._add_key_ns_prefix(ns, group)
174 with _map_to_sdl_exception():
175 return self.__redis.scard(db_key)
178 def _add_key_ns_prefix(cls, ns: str, key: str):
179 return '{' + ns + '},' + key
182 def _add_keys_ns_prefix(cls, ns: str, keylist: List[str]) -> List[str]:
185 ret_nskeys.append('{' + ns + '},' + k)
189 def _add_data_map_ns_prefix(cls, ns: str, data_dict: Dict[str, bytes]) -> Dict[str, bytes]:
191 for key, val in data_dict.items():
192 ret_nsdict['{' + ns + '},' + key] = val
196 def _strip_ns_from_bin_keys(cls, ns: str, nskeylist: List[bytes]) -> List[str]:
199 nskey = k.decode("utf-8").split(',', 1)
201 msg = u'Illegal namespace %s key:%s' % (ns, nskey)
202 raise RejectedByBackend(msg)
203 ret_keys.append(nskey[1])
207 def _escape_characters(cls, pattern: str) -> str:
208 return pattern.translate(str.maketrans(
217 def get_redis_connection(self):
218 """Return existing Redis database connection."""
222 class RedisBackendLock(DbBackendLockAbc):
224 A class providing an implementation of database backend lock of Shared Data Layer (SDL), when
225 backend database solution is Redis.
228 ns (str): Namespace under which this lock is targeted.
229 name (str): Lock name, identifies the lock key in a Redis database backend.
230 expiration (int, float): Lock expiration time after which the lock is removed if it hasn't
231 been released earlier by a 'release' method.
232 redis_backend (RedisBackend): Database backend object containing connection to Redis
235 lua_get_validity_time = None
236 # KEYS[1] - lock name
238 # return < 0 in case of failure, otherwise return lock validity time in milliseconds.
239 LUA_GET_VALIDITY_TIME_SCRIPT = """
240 local token = redis.call('get', KEYS[1])
244 if token ~= ARGV[1] then
247 return redis.call('pttl', KEYS[1])
250 def __init__(self, ns: str, name: str, expiration: Union[int, float],
251 redis_backend: RedisBackend) -> None:
252 super().__init__(ns, name)
253 self.__redis = redis_backend.get_redis_connection()
254 with _map_to_sdl_exception():
255 redis_lockname = '{' + ns + '},' + self._lock_name
256 self.__redis_lock = Lock(redis=self.__redis, name=redis_lockname, timeout=expiration)
257 self._register_scripts()
262 "lock namespace": self._ns,
263 "lock name": self._lock_name,
264 "lock status": self._lock_status_to_string()
268 def acquire(self, retry_interval: Union[int, float] = 0.1,
269 retry_timeout: Union[int, float] = 10) -> bool:
271 self.__redis_lock.sleep = retry_interval
272 with _map_to_sdl_exception():
273 succeeded = self.__redis_lock.acquire(blocking_timeout=retry_timeout)
276 def release(self) -> None:
277 with _map_to_sdl_exception():
278 self.__redis_lock.release()
280 def refresh(self) -> None:
281 with _map_to_sdl_exception():
282 self.__redis_lock.reacquire()
284 def get_validity_time(self) -> Union[int, float]:
286 if self.__redis_lock.local.token is None:
287 msg = u'Cannot get validity time of an unlocked lock %s' % self._lock_name
288 raise RejectedByBackend(msg)
290 with _map_to_sdl_exception():
291 validity = self.lua_get_validity_time(keys=[self.__redis_lock.name],
292 args=[self.__redis_lock.local.token],
295 msg = (u'Getting validity time of a lock %s failed with error code: %d'
296 % (self._lock_name, validity))
297 raise RejectedByBackend(msg)
298 ftime = validity / 1000.0
299 if ftime.is_integer():
303 def _register_scripts(self):
305 client = self.__redis
306 if cls.lua_get_validity_time is None:
307 cls.lua_get_validity_time = client.register_script(cls.LUA_GET_VALIDITY_TIME_SCRIPT)
309 def _lock_status_to_string(self) -> str:
311 if self.__redis_lock.locked():
312 if self.__redis_lock.owned():
314 return 'locked by someone else'
316 except(redis_exceptions.RedisError) as exc:
317 return f'Error: {str(exc)}'