Add a new SDL storage API function 'is_active()'
[ric-plt/sdlpy.git] / ricsdl-package / ricsdl / backend / redis.py
1 # Copyright (c) 2019 AT&T Intellectual Property.
2 # Copyright (c) 2018-2019 Nokia.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 #
17 # This source code is part of the near-RT RIC (RAN Intelligent Controller)
18 # platform project (RICP).
19 #
20
21
22 """The module provides implementation of Shared Data Layer (SDL) database backend interface."""
23 import contextlib
24 from typing import (Dict, Set, List, Union)
25 from redis import Redis
26 from redis.sentinel import Sentinel
27 from redis.lock import Lock
28 from redis._compat import nativestr
29 from redis import exceptions as redis_exceptions
30 from ricsdl.configuration import _Configuration
31 from ricsdl.exceptions import (
32     RejectedByBackend,
33     NotConnected,
34     BackendError
35 )
36 from .dbbackend_abc import DbBackendAbc
37 from .dbbackend_abc import DbBackendLockAbc
38
39
40 @contextlib.contextmanager
41 def _map_to_sdl_exception():
42     """Translates known redis exceptions into SDL exceptions."""
43     try:
44         yield
45     except(redis_exceptions.ResponseError) as exc:
46         raise RejectedByBackend("SDL backend rejected the request: {}".
47                                 format(str(exc))) from exc
48     except(redis_exceptions.ConnectionError, redis_exceptions.TimeoutError) as exc:
49         raise NotConnected("SDL not connected to backend: {}".
50                            format(str(exc))) from exc
51     except(redis_exceptions.RedisError) as exc:
52         raise BackendError("SDL backend failed to process the request: {}".
53                            format(str(exc))) from exc
54
55
56 class RedisBackend(DbBackendAbc):
57     """
58     A class providing an implementation of database backend of Shared Data Layer (SDL), when
59     backend database solution is Redis.
60
61     Args:
62         configuration (_Configuration): SDL configuration, containing credentials to connect to
63                                         Redis database backend.
64     """
65     def __init__(self, configuration: _Configuration) -> None:
66         super().__init__()
67         with _map_to_sdl_exception():
68             if configuration.get_params().db_sentinel_port:
69                 sentinel_node = (configuration.get_params().db_host,
70                                  configuration.get_params().db_sentinel_port)
71                 master_name = configuration.get_params().db_sentinel_master_name
72                 self.__sentinel = Sentinel([sentinel_node])
73                 self.__redis = self.__sentinel.master_for(master_name)
74             else:
75                 self.__redis = Redis(host=configuration.get_params().db_host,
76                                      port=configuration.get_params().db_port,
77                                      db=0,
78                                      max_connections=20)
79         self.__redis.set_response_callback('SETIE', lambda r: r and nativestr(r) == 'OK' or False)
80         self.__redis.set_response_callback('DELIE', lambda r: r and int(r) == 1 or False)
81
82     def __del__(self):
83         self.close()
84
85     def __str__(self):
86         return str(
87             {
88                 "DB type": "Redis",
89                 "Redis connection": repr(self.__redis)
90             }
91         )
92
93     def is_connected(self):
94         with _map_to_sdl_exception():
95             return self.__redis.ping()
96
97     def close(self):
98         self.__redis.close()
99
100     def set(self, ns: str, data_map: Dict[str, bytes]) -> None:
101         db_data_map = self._add_data_map_ns_prefix(ns, data_map)
102         with _map_to_sdl_exception():
103             self.__redis.mset(db_data_map)
104
105     def set_if(self, ns: str, key: str, old_data: bytes, new_data: bytes) -> bool:
106         db_key = self._add_key_ns_prefix(ns, key)
107         with _map_to_sdl_exception():
108             return self.__redis.execute_command('SETIE', db_key, new_data, old_data)
109
110     def set_if_not_exists(self, ns: str, key: str, data: bytes) -> bool:
111         db_key = self._add_key_ns_prefix(ns, key)
112         with _map_to_sdl_exception():
113             return self.__redis.setnx(db_key, data)
114
115     def get(self, ns: str, keys: List[str]) -> Dict[str, bytes]:
116         ret = dict()
117         db_keys = self._add_keys_ns_prefix(ns, keys)
118         with _map_to_sdl_exception():
119             values = self.__redis.mget(db_keys)
120             for idx, val in enumerate(values):
121                 # return only key values, which has a value
122                 if val:
123                     ret[keys[idx]] = val
124             return ret
125
126     def find_keys(self, ns: str, key_pattern: str) -> List[str]:
127         db_key_pattern = self._add_key_ns_prefix(ns, key_pattern)
128         with _map_to_sdl_exception():
129             ret = self.__redis.keys(db_key_pattern)
130             return self._strip_ns_from_bin_keys(ns, ret)
131
132     def find_and_get(self, ns: str, key_pattern: str) -> Dict[str, bytes]:
133         # todo: replace below implementation with redis 'NGET' module
134         ret = dict()  # type: Dict[str, bytes]
135         with _map_to_sdl_exception():
136             matched_keys = self.find_keys(ns, key_pattern)
137             if matched_keys:
138                 ret = self.get(ns, matched_keys)
139         return ret
140
141     def remove(self, ns: str, keys: List[str]) -> None:
142         db_keys = self._add_keys_ns_prefix(ns, keys)
143         with _map_to_sdl_exception():
144             self.__redis.delete(*db_keys)
145
146     def remove_if(self, ns: str, key: str, data: bytes) -> bool:
147         db_key = self._add_key_ns_prefix(ns, key)
148         with _map_to_sdl_exception():
149             return self.__redis.execute_command('DELIE', db_key, data)
150
151     def add_member(self, ns: str, group: str, members: Set[bytes]) -> None:
152         db_key = self._add_key_ns_prefix(ns, group)
153         with _map_to_sdl_exception():
154             self.__redis.sadd(db_key, *members)
155
156     def remove_member(self, ns: str, group: str, members: Set[bytes]) -> None:
157         db_key = self._add_key_ns_prefix(ns, group)
158         with _map_to_sdl_exception():
159             self.__redis.srem(db_key, *members)
160
161     def remove_group(self, ns: str, group: str) -> None:
162         db_key = self._add_key_ns_prefix(ns, group)
163         with _map_to_sdl_exception():
164             self.__redis.delete(db_key)
165
166     def get_members(self, ns: str, group: str) -> Set[bytes]:
167         db_key = self._add_key_ns_prefix(ns, group)
168         with _map_to_sdl_exception():
169             return self.__redis.smembers(db_key)
170
171     def is_member(self, ns: str, group: str, member: bytes) -> bool:
172         db_key = self._add_key_ns_prefix(ns, group)
173         with _map_to_sdl_exception():
174             return self.__redis.sismember(db_key, member)
175
176     def group_size(self, ns: str, group: str) -> int:
177         db_key = self._add_key_ns_prefix(ns, group)
178         with _map_to_sdl_exception():
179             return self.__redis.scard(db_key)
180
181     @classmethod
182     def _add_key_ns_prefix(cls, ns: str, key: str):
183         return '{' + ns + '},' + key
184
185     @classmethod
186     def _add_keys_ns_prefix(cls, ns: str, keylist: List[str]) -> List[str]:
187         ret_nskeys = []
188         for k in keylist:
189             ret_nskeys.append('{' + ns + '},' + k)
190         return ret_nskeys
191
192     @classmethod
193     def _add_data_map_ns_prefix(cls, ns: str, data_dict: Dict[str, bytes]) -> Dict[str, bytes]:
194         ret_nsdict = {}
195         for key, val in data_dict.items():
196             ret_nsdict['{' + ns + '},' + key] = val
197         return ret_nsdict
198
199     @classmethod
200     def _strip_ns_from_bin_keys(cls, ns: str, nskeylist: List[bytes]) -> List[str]:
201         ret_keys = []
202         for k in nskeylist:
203             try:
204                 redis_key = k.decode("utf-8")
205             except UnicodeDecodeError as exc:
206                 msg = u'Namespace %s key conversion to string failed: %s' % (ns, str(exc))
207                 raise RejectedByBackend(msg)
208             nskey = redis_key.split(',', 1)
209             if len(nskey) != 2:
210                 msg = u'Namespace %s key:%s has no namespace prefix' % (ns, redis_key)
211                 raise RejectedByBackend(msg)
212             ret_keys.append(nskey[1])
213         return ret_keys
214
215     def get_redis_connection(self):
216         """Return existing Redis database connection."""
217         return self.__redis
218
219
220 class RedisBackendLock(DbBackendLockAbc):
221     """
222     A class providing an implementation of database backend lock of Shared Data Layer (SDL), when
223     backend database solution is Redis.
224
225     Args:
226         ns (str): Namespace under which this lock is targeted.
227         name (str): Lock name, identifies the lock key in a Redis database backend.
228         expiration (int, float): Lock expiration time after which the lock is removed if it hasn't
229                                  been released earlier by a 'release' method.
230         redis_backend (RedisBackend): Database backend object containing connection to Redis
231                                       database.
232     """
233     lua_get_validity_time = None
234     # KEYS[1] - lock name
235     # ARGS[1] - token
236     # return < 0 in case of failure, otherwise return lock validity time in milliseconds.
237     LUA_GET_VALIDITY_TIME_SCRIPT = """
238         local token = redis.call('get', KEYS[1])
239         if not token then
240             return -10
241         end
242         if token ~= ARGV[1] then
243             return -11
244         end
245         return redis.call('pttl', KEYS[1])
246     """
247
248     def __init__(self, ns: str, name: str, expiration: Union[int, float],
249                  redis_backend: RedisBackend) -> None:
250         super().__init__(ns, name)
251         self.__redis = redis_backend.get_redis_connection()
252         with _map_to_sdl_exception():
253             redis_lockname = '{' + ns + '},' + self._lock_name
254             self.__redis_lock = Lock(redis=self.__redis, name=redis_lockname, timeout=expiration)
255             self._register_scripts()
256
257     def __str__(self):
258         return str(
259             {
260                 "lock DB type": "Redis",
261                 "lock namespace": self._ns,
262                 "lock name": self._lock_name,
263                 "lock status": self._lock_status_to_string()
264             }
265         )
266
267     def acquire(self, retry_interval: Union[int, float] = 0.1,
268                 retry_timeout: Union[int, float] = 10) -> bool:
269         succeeded = False
270         self.__redis_lock.sleep = retry_interval
271         with _map_to_sdl_exception():
272             succeeded = self.__redis_lock.acquire(blocking_timeout=retry_timeout)
273         return succeeded
274
275     def release(self) -> None:
276         with _map_to_sdl_exception():
277             self.__redis_lock.release()
278
279     def refresh(self) -> None:
280         with _map_to_sdl_exception():
281             self.__redis_lock.reacquire()
282
283     def get_validity_time(self) -> Union[int, float]:
284         validity = 0
285         if self.__redis_lock.local.token is None:
286             msg = u'Cannot get validity time of an unlocked lock %s' % self._lock_name
287             raise RejectedByBackend(msg)
288
289         with _map_to_sdl_exception():
290             validity = self.lua_get_validity_time(keys=[self.__redis_lock.name],
291                                                   args=[self.__redis_lock.local.token],
292                                                   client=self.__redis)
293         if validity < 0:
294             msg = (u'Getting validity time of a lock %s failed with error code: %d'
295                    % (self._lock_name, validity))
296             raise RejectedByBackend(msg)
297         ftime = validity / 1000.0
298         if ftime.is_integer():
299             return int(ftime)
300         return ftime
301
302     def _register_scripts(self):
303         cls = self.__class__
304         client = self.__redis
305         if cls.lua_get_validity_time is None:
306             cls.lua_get_validity_time = client.register_script(cls.LUA_GET_VALIDITY_TIME_SCRIPT)
307
308     def _lock_status_to_string(self) -> str:
309         try:
310             if self.__redis_lock.locked():
311                 if self.__redis_lock.owned():
312                     return 'locked'
313                 return 'locked by someone else'
314             return 'unlocked'
315         except(redis_exceptions.RedisError) as exc:
316             return f'Error: {str(exc)}'