Implement sentinel based DB capacity scaling
[ric-plt/sdlpy.git] / ricsdl-package / tests / backend / test_redis.py
1 # Copyright (c) 2019 AT&T Intellectual Property.
2 # Copyright (c) 2018-2019 Nokia.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 #
17 # This source code is part of the near-RT RIC (RAN Intelligent Controller)
18 # platform project (RICP).
19 #
20
21
22 from unittest.mock import patch, Mock, MagicMock, call, ANY
23 import pytest
24 from redis import exceptions as redis_exceptions
25 import ricsdl.backend
26 from ricsdl.backend.redis import (RedisBackendLock, _map_to_sdl_exception)
27 from ricsdl.configuration import _Configuration
28 from ricsdl.configuration import DbBackendType
29 import ricsdl.exceptions
30
31
32 def get_test_sdl_standby_config():
33     return _Configuration.Params(db_host='service-ricplt-dbaas-tcp-cluster-0.ricplt',
34                                  db_port=6379,
35                                  db_sentinel_port=None,
36                                  db_sentinel_master_name=None,
37                                  db_cluster_addr_list=None,
38                                  db_type=DbBackendType.REDIS)
39
40 def get_test_sdl_sentinel_config():
41     return _Configuration.Params(db_host='service-ricplt-dbaas-tcp-cluster-0.ricplt',
42                                  db_port=6379,
43                                  db_sentinel_port=26379,
44                                  db_sentinel_master_name='dbaasmaster',
45                                  db_cluster_addr_list=None,
46                                  db_type=DbBackendType.REDIS)
47
48 def get_test_sdl_sentinel_cluster_config():
49     return _Configuration.Params(db_host='service-ricplt-dbaas-tcp-cluster-0.ricplt',
50                                  db_port=6379,
51                                  db_sentinel_port=26379,
52                                  db_sentinel_master_name='dbaasmaster',
53                                  db_cluster_addr_list='service-ricplt-dbaas-tcp-cluster-0.ricplt,service-ricplt-dbaas-tcp-cluster-1.ricplt',
54                                  db_type=DbBackendType.REDIS)
55
56 @pytest.fixture()
57 def redis_backend_common_fixture(request):
58     request.cls.ns = 'some-ns'
59     request.cls.dl_redis = [b'1', b'2']
60     request.cls.dm = {'a': b'1', 'b': b'2'}
61     request.cls.dm_redis = {'{some-ns},a': b'1', '{some-ns},b': b'2'}
62     request.cls.dm_redis_flat = ['{some-ns},a', b'1', '{some-ns},b', b'2']
63     request.cls.key = 'a'
64     request.cls.key_redis = '{some-ns},a'
65     request.cls.keys = ['a', 'b']
66     request.cls.keys_redis = ['{some-ns},a', '{some-ns},b']
67     request.cls.data = b'123'
68     request.cls.old_data = b'1'
69     request.cls.new_data = b'3'
70     request.cls.keypattern = r'[Aa]bc-\[1\].?-*'
71     request.cls.keypattern_redis = r'{some-ns},[Aa]bc-\[1\].?-*'
72     request.cls.matchedkeys = ['Abc-[1].0-def', 'abc-[1].1-ghi']
73     request.cls.matchedkeys_redis = [b'{some-ns},Abc-[1].0-def',
74                                      b'{some-ns},abc-[1].1-ghi']
75     request.cls.matcheddata_redis = [b'10', b'11']
76     request.cls.matchedkeydata = {'Abc-[1].0-def': b'10',
77                                   'abc-[1].1-ghi': b'11'}
78     request.cls.group = 'some-group'
79     request.cls.group_redis = '{some-ns},some-group'
80     request.cls.groupmembers = set([b'm1', b'm2'])
81     request.cls.groupmember = b'm1'
82     request.cls.channels = ['abs', 'gma']
83     request.cls.channels_and_events = {'abs': ['cbn'], 'gma': ['jkl']}
84     request.cls.channels_and_events_redis = ['{some-ns},abs', 'cbn', '{some-ns},gma', 'jkl']
85
86     yield
87
88 @pytest.fixture(params=['standalone', 'sentinel', 'sentinel_cluster'])
89 def redis_backend_fixture(request, redis_backend_common_fixture):
90     request.cls.configuration = Mock()
91     request.cls.test_backend_type = request.param
92     if request.param == 'standalone':
93         cfg = get_test_sdl_standby_config()
94         request.cls.configuration.get_params.return_value = cfg
95         with patch('ricsdl.backend.redis.Redis') as mock_redis, patch(
96                    'ricsdl.backend.redis.PubSub') as mock_pubsub, patch(
97                    'threading.Thread') as mock_thread:
98             db = ricsdl.backend.get_backend_instance(request.cls.configuration)
99             request.cls.mock_redis = mock_redis.return_value
100             request.cls.mock_pubsub = mock_pubsub.return_value
101             request.cls.mock_pubsub_thread = mock_thread.return_value
102             request.cls.mock_pubsub_thread.is_alive.return_value = False
103         request.cls.db = db
104
105         mock_redis.assert_called_once_with(db=0, host=cfg.db_host, max_connections=20, port=cfg.db_port)
106         mock_pubsub.assert_called_once_with(request.cls.mock_redis.connection_pool, ignore_subscribe_messages=True)
107         assert request.cls.mock_redis.set_response_callback.call_count == 2
108         assert request.cls.mock_redis.set_response_callback.call_args_list == [call('SETIE', ANY), call('DELIE', ANY)]
109
110     elif request.param == 'sentinel':
111         cfg = get_test_sdl_sentinel_config()
112         request.cls.configuration.get_params.return_value = cfg
113         with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel, patch(
114                    'ricsdl.backend.redis.PubSub') as mock_pubsub, patch(
115                    'threading.Thread') as mock_thread:
116             db = ricsdl.backend.get_backend_instance(request.cls.configuration)
117             request.cls.mock_redis = mock_sentinel.return_value.master_for.return_value
118             request.cls.mock_pubsub = mock_pubsub.return_value
119             request.cls.mock_pubsub_thread = mock_thread.return_value
120             request.cls.mock_pubsub_thread.is_alive.return_value = False
121         request.cls.db = db
122
123         mock_sentinel.assert_called_once_with([(cfg.db_host, cfg.db_sentinel_port)])
124         mock_sentinel.master_for.called_once_with(cfg.db_sentinel_master_name)
125         mock_pubsub.assert_called_once_with(request.cls.mock_redis.connection_pool, ignore_subscribe_messages=True)
126         assert request.cls.mock_redis.set_response_callback.call_count == 2
127         assert request.cls.mock_redis.set_response_callback.call_args_list == [call('SETIE', ANY), call('DELIE', ANY)]
128
129     elif request.param == 'sentinel_cluster':
130         cfg = get_test_sdl_sentinel_cluster_config()
131         request.cls.configuration.get_params.return_value = cfg
132         with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel, patch(
133                    'ricsdl.backend.redis.PubSub') as mock_pubsub, patch(
134                    'threading.Thread') as mock_thread:
135             db = ricsdl.backend.get_backend_instance(request.cls.configuration)
136             request.cls.mock_redis = mock_sentinel.return_value.master_for.return_value
137             request.cls.mock_pubsub = mock_pubsub.return_value
138             request.cls.mock_pubsub_thread = mock_thread.return_value
139             request.cls.mock_pubsub_thread.is_alive.return_value = False
140         request.cls.db = db
141
142         assert mock_sentinel.call_count == 2
143         mock_sentinel.assert_has_calls([
144             call([('service-ricplt-dbaas-tcp-cluster-0.ricplt', cfg.db_sentinel_port)]),
145             call([('service-ricplt-dbaas-tcp-cluster-1.ricplt', cfg.db_sentinel_port)]),
146         ], any_order=True)
147         assert mock_sentinel.return_value.master_for.call_count == 2
148         mock_sentinel.return_value.master_for.assert_has_calls(
149             [call(cfg.db_sentinel_master_name), call(cfg.db_sentinel_master_name)], any_order=True,
150         )
151         assert mock_pubsub.call_count == 2
152         mock_pubsub.assert_has_calls([
153             call(request.cls.mock_redis.connection_pool, ignore_subscribe_messages=True),
154             call(request.cls.mock_redis.connection_pool, ignore_subscribe_messages=True),
155         ])
156         assert request.cls.mock_redis.set_response_callback.call_count == 4
157         assert request.cls.mock_redis.set_response_callback.call_args_list == [
158             call('SETIE', ANY), call('DELIE', ANY),
159             call('SETIE', ANY), call('DELIE', ANY),
160         ]
161     else:
162         raise NotImplementedError
163
164     yield
165
166
167 @pytest.mark.usefixtures('redis_backend_fixture')
168 class TestRedisBackend:
169     def test_is_connected_function_success(self):
170         self.mock_redis.ping.return_value = True
171         ret = self.db.is_connected()
172         if self.test_backend_type == 'sentinel_cluster':
173             assert self.mock_redis.ping.call_count == 2
174         else:
175             assert self.mock_redis.ping.call_count == 1
176         assert ret is True
177
178     def test_is_connected_function_returns_false_if_ping_fails(self):
179         self.mock_redis.ping.return_value = False
180         ret = self.db.is_connected()
181         self.mock_redis.ping.assert_called_once()
182         assert ret is False
183
184     def test_is_connected_function_can_map_redis_exception_to_sdl_exception(self):
185         self.mock_redis.ping.side_effect = redis_exceptions.ResponseError('redis error!')
186         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
187             self.db.is_connected()
188
189     def test_set_function_success(self):
190         self.db.set(self.ns, self.dm)
191         self.mock_redis.mset.assert_called_once_with(self.dm_redis)
192
193     def test_set_function_can_map_redis_exception_to_sdl_exception(self):
194         self.mock_redis.mset.side_effect = redis_exceptions.ResponseError('redis error!')
195         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
196             self.db.set(self.ns, self.dm)
197
198     def test_set_if_function_success(self):
199         self.mock_redis.execute_command.return_value = True
200         ret = self.db.set_if(self.ns, self.key, self.old_data, self.new_data)
201         self.mock_redis.execute_command.assert_called_once_with('SETIE', self.key_redis,
202                                                                 self.new_data, self.old_data)
203         assert ret is True
204
205     def test_set_if_function_returns_false_if_existing_key_value_not_expected(self):
206         self.mock_redis.execute_command.return_value = False
207         ret = self.db.set_if(self.ns, self.key, self.old_data, self.new_data)
208         self.mock_redis.execute_command.assert_called_once_with('SETIE', self.key_redis,
209                                                                 self.new_data, self.old_data)
210         assert ret is False
211
212     def test_set_if_function_can_map_redis_exception_to_sdl_exception(self):
213         self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
214         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
215             self.db.set_if(self.ns, self.key, self.old_data, self.new_data)
216
217     def test_set_if_not_exists_function_success(self):
218         self.mock_redis.setnx.return_value = True
219         ret = self.db.set_if_not_exists(self.ns, self.key, self.new_data)
220         self.mock_redis.setnx.assert_called_once_with(self.key_redis, self.new_data)
221         assert ret is True
222
223     def test_set_if_not_exists_function_returns_false_if_key_already_exists(self):
224         self.mock_redis.setnx.return_value = False
225         ret = self.db.set_if_not_exists(self.ns, self.key, self.new_data)
226         self.mock_redis.setnx.assert_called_once_with(self.key_redis, self.new_data)
227         assert ret is False
228
229     def test_set_if_not_exists_function_can_map_redis_exception_to_sdl_exception(self):
230         self.mock_redis.setnx.side_effect = redis_exceptions.ResponseError('redis error!')
231         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
232             self.db.set_if_not_exists(self.ns, self.key, self.new_data)
233
234     def test_get_function_success(self):
235         self.mock_redis.mget.return_value = self.dl_redis
236         ret = self.db.get(self.ns, self.keys)
237         self.mock_redis.mget.assert_called_once_with(self.keys_redis)
238         assert ret == self.dm
239
240     def test_get_function_returns_empty_dict_when_no_key_values_exist(self):
241         self.mock_redis.mget.return_value = [None, None]
242         ret = self.db.get(self.ns, self.keys)
243         self.mock_redis.mget.assert_called_once_with(self.keys_redis)
244         assert ret == dict()
245
246     def test_get_function_returns_dict_only_with_found_key_values_when_some_keys_exist(self):
247         self.mock_redis.mget.return_value = [self.data, None]
248         ret = self.db.get(self.ns, self.keys)
249         self.mock_redis.mget.assert_called_once_with(self.keys_redis)
250         assert ret == {self.key: self.data}
251
252     def test_get_function_can_map_redis_exception_to_sdl_exception(self):
253         self.mock_redis.mget.side_effect = redis_exceptions.ResponseError('redis error!')
254         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
255             self.db.get(self.ns, self.keys)
256
257     def test_find_keys_function_success(self):
258         self.mock_redis.keys.return_value = self.matchedkeys_redis
259         ret = self.db.find_keys(self.ns, self.keypattern)
260         self.mock_redis.keys.assert_called_once_with(self.keypattern_redis)
261         assert ret == self.matchedkeys
262
263     def test_find_keys_function_returns_empty_list_when_no_matching_keys_found(self):
264         self.mock_redis.keys.return_value = []
265         ret = self.db.find_keys(self.ns, self.keypattern)
266         self.mock_redis.keys.assert_called_once_with(self.keypattern_redis)
267         assert ret == []
268
269     def test_find_keys_function_can_map_redis_exception_to_sdl_exception(self):
270         self.mock_redis.keys.side_effect = redis_exceptions.ResponseError('redis error!')
271         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
272             self.db.find_keys(self.ns, self.keypattern)
273
274     def test_find_keys_function_can_raise_exception_when_redis_key_convert_to_string_fails(self):
275         # Redis returns an illegal key, which conversion to string fails
276         corrupt_redis_key = b'\x81'
277         self.mock_redis.keys.return_value = [corrupt_redis_key]
278         with pytest.raises(ricsdl.exceptions.RejectedByBackend) as excinfo:
279             self.db.find_keys(self.ns, self.keypattern)
280         assert f"Namespace {self.ns} key:{corrupt_redis_key} "
281         "has no namespace prefix" in str(excinfo.value)
282
283     def test_find_keys_function_can_raise_exception_when_redis_key_is_without_prefix(self):
284         # Redis returns an illegal key, which doesn't have comma separated namespace prefix
285         corrupt_redis_key = 'some-corrupt-key'
286         self.mock_redis.keys.return_value = [f'{corrupt_redis_key}'.encode()]
287         with pytest.raises(ricsdl.exceptions.RejectedByBackend) as excinfo:
288             self.db.find_keys(self.ns, self.keypattern)
289         assert f"Namespace {self.ns} key:{corrupt_redis_key} "
290         "has no namespace prefix" in str(excinfo.value)
291
292     def test_find_and_get_function_success(self):
293         self.mock_redis.keys.return_value = self.matchedkeys_redis
294         self.mock_redis.mget.return_value = self.matcheddata_redis
295         ret = self.db.find_and_get(self.ns, self.keypattern)
296         self.mock_redis.keys.assert_called_once_with(self.keypattern_redis)
297         self.mock_redis.mget.assert_called_once_with([i.decode() for i in self.matchedkeys_redis])
298         assert ret == self.matchedkeydata
299
300     def test_find_and_get_function_returns_empty_dict_when_no_matching_keys_exist(self):
301         self.mock_redis.keys.return_value = list()
302         ret = self.db.find_and_get(self.ns, self.keypattern)
303         self.mock_redis.keys.assert_called_once_with(self.keypattern_redis)
304         assert not self.mock_redis.mget.called
305         assert ret == dict()
306
307     def test_find_and_get_function_can_map_redis_exception_to_sdl_exception(self):
308         self.mock_redis.keys.side_effect = redis_exceptions.ResponseError('redis error!')
309         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
310             self.db.find_and_get(self.ns, self.keypattern)
311
312     def test_find_and_get_function_can_raise_exception_when_redis_key_convert_to_string_fails(self):
313         # Redis returns an illegal key, which conversion to string fails
314         corrupt_redis_key = b'\x81'
315         self.mock_redis.keys.return_value = [corrupt_redis_key]
316         with pytest.raises(ricsdl.exceptions.RejectedByBackend) as excinfo:
317             self.db.find_and_get(self.ns, self.keypattern)
318         assert f"Namespace {self.ns} key:{corrupt_redis_key} "
319         "has no namespace prefix" in str(excinfo.value)
320
321     def test_find_and_get_function_can_raise_exception_when_redis_key_is_without_prefix(self):
322         # Redis returns an illegal key, which doesn't have comma separated namespace prefix
323         corrupt_redis_key = 'some-corrupt-key'
324         self.mock_redis.keys.return_value = [f'{corrupt_redis_key}'.encode()]
325         with pytest.raises(ricsdl.exceptions.RejectedByBackend) as excinfo:
326             self.db.find_and_get(self.ns, self.keypattern)
327         assert f"Namespace {self.ns} key:{corrupt_redis_key} "
328         "has no namespace prefix" in str(excinfo.value)
329
330     def test_remove_function_success(self):
331         self.db.remove(self.ns, self.keys)
332         self.mock_redis.delete.assert_called_once_with(*self.keys_redis)
333
334     def test_remove_function_can_map_redis_exception_to_sdl_exception(self):
335         self.mock_redis.delete.side_effect = redis_exceptions.ResponseError('redis error!')
336         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
337             self.db.remove(self.ns, self.keys)
338
339     def test_remove_if_function_success(self):
340         self.mock_redis.execute_command.return_value = True
341         ret = self.db.remove_if(self.ns, self.key, self.new_data)
342         self.mock_redis.execute_command.assert_called_once_with('DELIE', self.key_redis,
343                                                                 self.new_data)
344         assert ret is True
345
346     def test_remove_if_function_returns_false_if_data_does_not_match(self):
347         self.mock_redis.execute_command.return_value = False
348         ret = self.db.remove_if(self.ns, self.key, self.new_data)
349         self.mock_redis.execute_command.assert_called_once_with('DELIE', self.key_redis,
350                                                                 self.new_data)
351         assert ret is False
352
353     def test_remove_if_function_can_map_redis_exception_to_sdl_exception(self):
354         self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
355         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
356             self.db.remove_if(self.ns, self.key, self.new_data)
357
358     def test_add_member_function_success(self):
359         self.db.add_member(self.ns, self.group, self.groupmembers)
360         self.mock_redis.sadd.assert_called_once_with(self.group_redis, *self.groupmembers)
361
362     def test_add_member_function_can_map_redis_exception_to_sdl_exception(self):
363         self.mock_redis.sadd.side_effect = redis_exceptions.ResponseError('redis error!')
364         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
365             self.db.add_member(self.ns, self.group, self.groupmembers)
366
367     def test_remove_member_function_success(self):
368         self.db.remove_member(self.ns, self.group, self.groupmembers)
369         self.mock_redis.srem.assert_called_once_with(self.group_redis, *self.groupmembers)
370
371     def test_remove_member_function_can_map_redis_exception_to_sdl_exception(self):
372         self.mock_redis.srem.side_effect = redis_exceptions.ResponseError('redis error!')
373         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
374             self.db.remove_member(self.ns, self.group, self.groupmembers)
375
376     def test_remove_group_function_success(self):
377         self.db.remove_group(self.ns, self.group)
378         self.mock_redis.delete.assert_called_once_with(self.group_redis)
379
380     def test_remove_group_function_can_map_redis_exception_to_sdl_exception(self):
381         self.mock_redis.delete.side_effect = redis_exceptions.ResponseError('redis error!')
382         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
383             self.db.remove_group(self.ns, self.group)
384
385     def test_get_members_function_success(self):
386         self.mock_redis.smembers.return_value = self.groupmembers
387         ret = self.db.get_members(self.ns, self.group)
388         self.mock_redis.smembers.assert_called_once_with(self.group_redis)
389         assert ret is self.groupmembers
390
391     def test_get_members_function_can_map_redis_exception_to_sdl_exception(self):
392         self.mock_redis.smembers.side_effect = redis_exceptions.ResponseError('redis error!')
393         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
394             self.db.get_members(self.ns, self.group)
395
396     def test_is_member_function_success(self):
397         self.mock_redis.sismember.return_value = True
398         ret = self.db.is_member(self.ns, self.group, self.groupmember)
399         self.mock_redis.sismember.assert_called_once_with(self.group_redis, self.groupmember)
400         assert ret is True
401
402     def test_is_member_function_can_map_redis_exception_to_sdl_exception(self):
403         self.mock_redis.sismember.side_effect = redis_exceptions.ResponseError('redis error!')
404         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
405             self.db.is_member(self.ns, self.group, self.groupmember)
406
407     def test_group_size_function_success(self):
408         self.mock_redis.scard.return_value = 100
409         ret = self.db.group_size(self.ns, self.group)
410         self.mock_redis.scard.assert_called_once_with(self.group_redis)
411         assert ret == 100
412
413     def test_group_size_function_can_map_redis_exception_to_sdl_exception(self):
414         self.mock_redis.scard.side_effect = redis_exceptions.ResponseError('Some redis error!')
415         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
416             self.db.group_size(self.ns, self.group)
417
418     def test_set_and_publish_success(self):
419         self.db.set_and_publish(self.ns, self.channels_and_events, self.dm)
420         self.mock_redis.execute_command.assert_called_once_with('MSETMPUB', len(self.dm),
421                                                                 len(self.channels_and_events),
422                                                                 *self.dm_redis_flat,
423                                                                 *self.channels_and_events_redis)
424
425     def test_set_and_publish_can_map_redis_exception_to_sdl_exception(self):
426         self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
427         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
428             self.db.set_and_publish(self.ns, self.channels_and_events, self.dm)
429
430     def test_set_if_and_publish_success(self):
431         self.mock_redis.execute_command.return_value = b"OK"
432         ret = self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
433                                          self.new_data)
434         self.mock_redis.execute_command.assert_called_once_with('SETIEMPUB', self.key_redis,
435                                                                 self.new_data, self.old_data,
436                                                                 *self.channels_and_events_redis)
437         assert ret is True
438
439     def test_set_if_and_publish_returns_false_if_existing_key_value_not_expected(self):
440         self.mock_redis.execute_command.return_value = None
441         ret = self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
442                                          self.new_data)
443         self.mock_redis.execute_command.assert_called_once_with('SETIEMPUB', self.key_redis,
444                                                                 self.new_data, self.old_data,
445                                                                 *self.channels_and_events_redis)
446         assert ret is False
447
448     def test_set_if_and_publish_can_map_redis_exception_to_sdl_exception(self):
449         self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
450         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
451             self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
452                                        self.new_data)
453
454     def test_set_if_not_exists_and_publish_success(self):
455         self.mock_redis.execute_command.return_value = b"OK"
456         ret = self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
457                                                     self.new_data)
458         self.mock_redis.execute_command.assert_called_once_with('SETNXMPUB', self.key_redis,
459                                                                 self.new_data,
460                                                                 *self.channels_and_events_redis)
461         assert ret is True
462
463     def test_set_if_not_exists_and_publish_returns_false_if_key_already_exists(self):
464         self.mock_redis.execute_command.return_value = None
465         ret = self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
466                                                     self.new_data)
467         self.mock_redis.execute_command.assert_called_once_with('SETNXMPUB', self.key_redis,
468                                                                 self.new_data,
469                                                                 *self.channels_and_events_redis)
470         assert ret is False
471
472     def set_if_not_exists_and_publish_can_map_redis_exception_to_sdl_exception(self):
473         self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
474         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
475             self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
476                                                   self.new_data)
477
478     def test_remove_and_publish_success(self):
479         self.db.remove_and_publish(self.ns, self.channels_and_events, self.key)
480         self.mock_redis.execute_command.assert_called_once_with('DELMPUB', len(self.key),
481                                                                 len(self.channels_and_events),
482                                                                 self.key_redis,
483                                                                 *self.channels_and_events_redis)
484
485     def test_remove_if_and_publish_success(self):
486         self.mock_redis.execute_command.return_value = 1
487         ret = self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
488                                             self.new_data)
489         self.mock_redis.execute_command.assert_called_once_with('DELIEMPUB', self.key_redis,
490                                                                 self.new_data,
491                                                                 *self.channels_and_events_redis)
492         assert ret is True
493
494     def test_remove_if_and_publish_returns_false_if_data_does_not_match(self):
495         self.mock_redis.execute_command.return_value = 0
496         ret = self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
497                                             self.new_data)
498         self.mock_redis.execute_command.assert_called_once_with('DELIEMPUB', self.key_redis,
499                                                                 self.new_data,
500                                                                 *self.channels_and_events_redis)
501         assert ret is False
502
503     def test_remove_if_and_publish_can_map_redis_exception_to_sdl_exception(self):
504         self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
505         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
506             self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
507                                           self.new_data)
508
509     def test_remove_all_and_publish_success(self):
510         self.mock_redis.keys.return_value = ['{some-ns},a']
511         self.db.remove_all_and_publish(self.ns, self.channels_and_events)
512         self.mock_redis.keys.assert_called_once()
513         self.mock_redis.execute_command.assert_called_once_with('DELMPUB', len(self.key),
514                                                                 len(self.channels_and_events),
515                                                                 self.key_redis,
516                                                                 *self.channels_and_events_redis)
517
518     def test_remove_all_and_publish_can_map_redis_exception_to_sdl_exception(self):
519         self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
520         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
521             self.db.remove_all_and_publish(self.ns, self.channels_and_events)
522
523     def test_subscribe_channel_success(self):
524         cb = Mock()
525         self.db.subscribe_channel(self.ns, cb, self.channels)
526         for channel in self.channels:
527             self.mock_pubsub.subscribe.assert_any_call(**{f'{{some-ns}},{channel}': cb})
528
529     def test_subscribe_channel_with_thread_success(self):
530         cb = Mock()
531         # Call first start_event_listener() to enable run_in_thread flag. When subscribe_channel()
532         # is called thread is started if the flag is enabled. In real-life scenario it's highly
533         # advisable at first to subscribe to some events by calling subscribe_channel() and only
534         # after it start threads by calling start_event_listener().
535         self.db.start_event_listener()
536         self.db.subscribe_channel(self.ns, cb, self.channels)
537         self.mock_pubsub.run_in_thread.assert_called_once_with(daemon=True, sleep_time=0.001)
538
539     def test_subscribe_can_map_redis_exception_to_sdl_exeception(self):
540         self.mock_pubsub.subscribe.side_effect = redis_exceptions.ResponseError('redis error!')
541         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
542             self.db.subscribe_channel(self.ns, Mock(), self.channels)
543
544     def test_unsubscribe_channel_success(self):
545         self.db.unsubscribe_channel(self.ns, [self.channels[0]])
546         self.mock_pubsub.unsubscribe.assert_called_with('{some-ns},abs')
547
548     def test_unsubscribe_channel_can_map_redis_exception_to_sdl_exeception(self):
549         self.mock_pubsub.unsubscribe.side_effect = redis_exceptions.ResponseError('redis error!')
550         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
551             self.db.unsubscribe_channel(self.ns, [self.channels[0]])
552
553     def test_subscribe_and_start_event_listener(self):
554         self.mock_redis.pubsub_channels.return_value = self.channels_and_events_redis
555         self.db.subscribe_channel(self.ns, Mock(), self.channels)
556         self.db.start_event_listener()
557
558         if self.test_backend_type == 'sentinel_cluster':
559             assert self.mock_redis.pubsub_channels.call_count == 2
560             assert self.mock_pubsub.run_in_thread.call_count == 2
561             self.mock_pubsub.run_in_thread.assert_has_calls([
562                 call(daemon=True, sleep_time=0.001),
563                 call(daemon=True, sleep_time=0.001),
564             ])
565         else:
566             self.mock_redis.pubsub_channels.assert_called_once()
567             self.mock_pubsub.run_in_thread.assert_called_once_with(daemon=True, sleep_time=0.001)
568
569     def test_start_event_listener_fail(self):
570         self.mock_pubsub_thread.is_alive.return_value = True
571         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
572             self.db.start_event_listener()
573
574     def test_handle_events_success(self):
575         self.db.handle_events()
576         self.db.handle_events()
577         self.db.handle_events()
578         self.db.handle_events()
579         assert self.mock_pubsub.get_message.call_count == 4
580
581     def test_handle_events_fail_if_subsub_thread_alive(self):
582         self.mock_pubsub_thread.is_alive.return_value = True
583         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
584             self.db.handle_events()
585
586     def test_handle_events_fail_if_event_listener_already_running(self):
587         self.db.start_event_listener()
588         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
589             self.db.handle_events()
590
591     def test_handle_events_ignores_message_handling_redis_runtime_exception(self):
592          self.mock_pubsub.get_message.side_effect = RuntimeError()
593          self.db.handle_events()
594          self.mock_pubsub.get_message.assert_called_once()
595
596     def test_get_redis_connection_function_success(self):
597         ret = self.db.get_redis_connection(self.ns)
598         assert ret is self.mock_redis
599
600     def test_redis_backend_object_string_representation(self):
601         str_out = str(self.db)
602         assert str_out is not None
603
604     def test_namespace_hash_algorithm_stays_unaltered(self):
605         ret_hash = self.db._RedisBackend__get_hash('sdltoolns')
606         assert ret_hash == 2897969051
607
608 def test_standalone_redis_init_exception_is_mapped_to_sdl_exeception():
609     mock_cfg = Mock()
610     cfg_params = get_test_sdl_standby_config()
611     mock_cfg.get_params.return_value = cfg_params
612     with pytest.raises(ricsdl.exceptions.RejectedByBackend):
613         with patch('ricsdl.backend.redis.Redis') as mock_redis:
614             mock_redis.side_effect = redis_exceptions.ResponseError('redis error!')
615             ricsdl.backend.get_backend_instance(mock_cfg)
616
617 def test_standalone_pubsub_init_exception_is_mapped_to_sdl_exeception():
618     mock_cfg = Mock()
619     cfg_params = get_test_sdl_standby_config()
620     mock_cfg.get_params.return_value = cfg_params
621     with pytest.raises(ricsdl.exceptions.RejectedByBackend):
622         with patch('ricsdl.backend.redis.Redis') as mock_redis, patch(
623                    'ricsdl.backend.redis.PubSub') as mock_pubsub:
624             mock_pubsub.side_effect = redis_exceptions.ResponseError('redis error!')
625             ricsdl.backend.get_backend_instance(mock_cfg)
626
627 def test_sentinel_redis_init_exception_is_mapped_to_sdl_exeception():
628     mock_cfg = Mock()
629     cfg_params = get_test_sdl_sentinel_config()
630     mock_cfg.get_params.return_value = cfg_params
631     with pytest.raises(ricsdl.exceptions.RejectedByBackend):
632         with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel:
633             mock_sentinel.side_effect = redis_exceptions.ResponseError('redis error!')
634             ricsdl.backend.get_backend_instance(mock_cfg)
635
636 def test_sentinel_pubsub_init_exception_is_mapped_to_sdl_exeception():
637     mock_cfg = Mock()
638     cfg_params = get_test_sdl_sentinel_config()
639     mock_cfg.get_params.return_value = cfg_params
640     with pytest.raises(ricsdl.exceptions.RejectedByBackend):
641         with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel, patch(
642                    'ricsdl.backend.redis.PubSub') as mock_pubsub:
643             mock_pubsub.side_effect = redis_exceptions.ResponseError('redis error!')
644             ricsdl.backend.get_backend_instance(mock_cfg)
645
646 def test_sentinel_master_for_exception_is_mapped_to_sdl_exeception():
647     mock_cfg = Mock()
648     cfg_params = get_test_sdl_sentinel_config()
649     mock_cfg.get_params.return_value = cfg_params
650     with pytest.raises(ricsdl.exceptions.RejectedByBackend):
651         with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel, patch(
652                    'ricsdl.backend.redis.PubSub') as mock_pubsub:
653             mock_sentinel.return_value.master_for.side_effect = redis_exceptions.ResponseError('redis error!')
654             ricsdl.backend.get_backend_instance(mock_cfg)
655
656 def test_sentinel_cluster_redis_init_exception_is_mapped_to_sdl_exeception():
657     mock_cfg = Mock()
658     cfg_params = get_test_sdl_sentinel_cluster_config()
659     mock_cfg.get_params.return_value = cfg_params
660     with pytest.raises(ricsdl.exceptions.RejectedByBackend):
661         with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel:
662             mock_sentinel.side_effect = redis_exceptions.ResponseError('redis error!')
663             ricsdl.backend.get_backend_instance(mock_cfg)
664
665 def test_sentinel_cluster_pubsub_init_exception_is_mapped_to_sdl_exeception():
666     mock_cfg = Mock()
667     cfg_params = get_test_sdl_sentinel_cluster_config()
668     mock_cfg.get_params.return_value = cfg_params
669     with pytest.raises(ricsdl.exceptions.RejectedByBackend):
670         with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel, patch(
671                    'ricsdl.backend.redis.PubSub') as mock_pubsub:
672             mock_pubsub.side_effect = redis_exceptions.ResponseError('redis error!')
673             ricsdl.backend.get_backend_instance(mock_cfg)
674
675 def test_sentinel_cluster_master_for_exception_is_mapped_to_sdl_exeception():
676     mock_cfg = Mock()
677     cfg_params = get_test_sdl_sentinel_cluster_config()
678     mock_cfg.get_params.return_value = cfg_params
679     with pytest.raises(ricsdl.exceptions.RejectedByBackend):
680         with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel, patch(
681                    'ricsdl.backend.redis.PubSub') as mock_pubsub:
682             mock_sentinel.return_value.master_for.side_effect = redis_exceptions.ResponseError('redis error!')
683             ricsdl.backend.get_backend_instance(mock_cfg)
684
685 class MockRedisLock:
686     def __init__(self, redis, name, timeout=None, sleep=0.1,
687                  blocking=True, blocking_timeout=None, thread_local=True):
688         self.redis = redis
689         self.name = name
690         self.timeout = timeout
691         self.sleep = sleep
692         self.blocking = blocking
693         self.blocking_timeout = blocking_timeout
694         self.thread_local = bool(thread_local)
695
696
697 @pytest.fixture(scope="module")
698 def mock_redis_lock():
699     def _mock_redis_lock(name, timeout=None, sleep=0.1,
700                          blocking=True, blocking_timeout=None, thread_local=True):
701         return MockRedisLock(name, timeout, sleep, blocking, blocking_timeout, thread_local)
702     return _mock_redis_lock
703
704
705 @pytest.fixture()
706 def redis_backend_lock_fixture(request, mock_redis_lock):
707     request.cls.ns = 'some-ns'
708     request.cls.lockname = 'some-lock-name'
709     request.cls.lockname_redis = '{some-ns},some-lock-name'
710     request.cls.expiration = 10
711     request.cls.retry_interval = 0.1
712     request.cls.retry_timeout = 1
713
714     request.cls.mock_lua_get_validity_time = Mock()
715     request.cls.mock_lua_get_validity_time.return_value = 2000
716
717     request.cls.mock_redis = Mock()
718     request.cls.mock_redis.register_script = Mock()
719     request.cls.mock_redis.register_script.return_value = request.cls.mock_lua_get_validity_time
720
721     mocked_dbbackend = Mock()
722     mocked_dbbackend.get_redis_connection.return_value = request.cls.mock_redis
723
724     request.cls.configuration = Mock()
725     mock_conf_params = _Configuration.Params(db_host=None,
726                                              db_port=None,
727                                              db_sentinel_port=None,
728                                              db_sentinel_master_name=None,
729                                              db_cluster_addr_list=None,
730                                              db_type=DbBackendType.REDIS)
731     request.cls.configuration.get_params.return_value = mock_conf_params
732
733     with patch('ricsdl.backend.redis.Lock') as mock_redis_lock:
734         lock = ricsdl.backend.get_backend_lock_instance(request.cls.configuration,
735                                                         request.cls.ns, request.cls.lockname,
736                                                         request.cls.expiration, mocked_dbbackend)
737         request.cls.mock_redis_lock = mock_redis_lock.return_value
738         request.cls.lock = lock
739     yield
740     RedisBackendLock.lua_get_validity_time = None
741
742
743 @pytest.mark.usefixtures('redis_backend_lock_fixture')
744 class TestRedisBackendLock:
745     def test_acquire_function_success(self):
746         self.mock_redis_lock.acquire.return_value = True
747         ret = self.lock.acquire(self.retry_interval, self.retry_timeout)
748         self.mock_redis_lock.acquire.assert_called_once_with(blocking_timeout=self.retry_timeout)
749         assert ret is True
750
751     def test_acquire_function_returns_false_if_lock_is_not_acquired(self):
752         self.mock_redis_lock.acquire.return_value = False
753         ret = self.lock.acquire(self.retry_interval, self.retry_timeout)
754         self.mock_redis_lock.acquire.assert_called_once_with(blocking_timeout=self.retry_timeout)
755         assert ret is False
756
757     def test_acquire_function_can_map_redis_exception_to_sdl_exception(self):
758         self.mock_redis_lock.acquire.side_effect = redis_exceptions.LockError('redis lock error!')
759         with pytest.raises(ricsdl.exceptions.BackendError):
760             self.lock.acquire(self.retry_interval, self.retry_timeout)
761
762     def test_release_function_success(self):
763         self.lock.release()
764         self.mock_redis_lock.release.assert_called_once()
765
766     def test_release_function_can_map_redis_exception_to_sdl_exception(self):
767         self.mock_redis_lock.release.side_effect = redis_exceptions.LockError('redis lock error!')
768         with pytest.raises(ricsdl.exceptions.BackendError):
769             self.lock.release()
770
771     def test_refresh_function_success(self):
772         self.lock.refresh()
773         self.mock_redis_lock.reacquire.assert_called_once()
774
775     def test_refresh_function_can_map_redis_exception_to_sdl_exception(self):
776         self.mock_redis_lock.reacquire.side_effect = redis_exceptions.LockError('redis lock error!')
777         with pytest.raises(ricsdl.exceptions.BackendError):
778             self.lock.refresh()
779
780     def test_get_validity_time_function_success(self):
781         self.mock_redis_lock.name = self.lockname_redis
782         self.mock_redis_lock.local.token = 123
783
784         ret = self.lock.get_validity_time()
785         self.mock_lua_get_validity_time.assert_called_once_with(
786             keys=[self.lockname_redis], args=[123], client=self.mock_redis)
787         assert ret == 2
788
789     def test_get_validity_time_function_second_fraction_success(self):
790         self.mock_redis_lock.name = self.lockname_redis
791         self.mock_redis_lock.local.token = 123
792         self.mock_lua_get_validity_time.return_value = 234
793
794         ret = self.lock.get_validity_time()
795         self.mock_lua_get_validity_time.assert_called_once_with(
796             keys=[self.lockname_redis], args=[123], client=self.mock_redis)
797         assert ret == 0.234
798
799     def test_get_validity_time_function_can_raise_exception_if_lock_is_unlocked(self):
800         self.mock_redis_lock.name = self.lockname_redis
801         self.mock_redis_lock.local.token = None
802
803         with pytest.raises(ricsdl.exceptions.RejectedByBackend) as excinfo:
804             self.lock.get_validity_time()
805         assert f"Cannot get validity time of an unlocked lock {self.lockname}" in str(excinfo.value)
806
807     def test_get_validity_time_function_can_raise_exception_if_lua_script_fails(self):
808         self.mock_redis_lock.name = self.lockname_redis
809         self.mock_redis_lock.local.token = 123
810         self.mock_lua_get_validity_time.return_value = -10
811
812         with pytest.raises(ricsdl.exceptions.RejectedByBackend) as excinfo:
813             self.lock.get_validity_time()
814         assert f"Getting validity time of a lock {self.lockname} failed with error code: -10" in str(excinfo.value)
815
816     def test_redis_backend_lock_object_string_representation(self):
817         expected_lock_info = {'lock DB type': 'Redis',
818                               'lock namespace': 'some-ns',
819                               'lock name': 'some-lock-name',
820                               'lock status': 'locked'}
821         assert str(self.lock) == str(expected_lock_info)
822
823     def test_redis_backend_lock_object_string_representation_can_catch_redis_exception(self):
824         self.mock_redis_lock.owned.side_effect = redis_exceptions.LockError('redis lock error!')
825         expected_lock_info = {'lock DB type': 'Redis',
826                               'lock namespace': 'some-ns',
827                               'lock name': 'some-lock-name',
828                               'lock status': 'Error: redis lock error!'}
829         assert str(self.lock) == str(expected_lock_info)
830
831
832 def test_redis_response_error_exception_is_mapped_to_rejected_by_backend_sdl_exception():
833     with pytest.raises(ricsdl.exceptions.RejectedByBackend) as excinfo:
834         with _map_to_sdl_exception():
835             raise redis_exceptions.ResponseError('Some redis error!')
836     assert "SDL backend rejected the request: Some redis error!" in str(excinfo.value)
837
838
839 def test_redis_connection_error_exception_is_mapped_to_not_connected_sdl_exception():
840     with pytest.raises(ricsdl.exceptions.NotConnected) as excinfo:
841         with _map_to_sdl_exception():
842             raise redis_exceptions.ConnectionError('Some redis error!')
843     assert "SDL not connected to backend: Some redis error!" in str(excinfo.value)
844
845
846 def test_rest_redis_exceptions_are_mapped_to_backend_error_sdl_exception():
847     with pytest.raises(ricsdl.exceptions.BackendError) as excinfo:
848         with _map_to_sdl_exception():
849             raise redis_exceptions.RedisError('Some redis error!')
850     assert "SDL backend failed to process the request: Some redis error!" in str(excinfo.value)
851
852
853 def test_system_error_exceptions_are_not_mapped_to_any_sdl_exception():
854     with pytest.raises(SystemExit):
855         with _map_to_sdl_exception():
856             raise SystemExit('Fatal error')
857
858
859 class TestRedisClient:
860     @classmethod
861     def setup_class(cls):
862         cls.pubsub = ricsdl.backend.redis.PubSub(Mock())
863         cls.pubsub.channels = {b'{some-ns},abs': Mock()}
864
865     def test_handle_pubsub_message(self):
866         assert self.pubsub.handle_message([b'message', b'{some-ns},abs', b'cbn']) == ('abs', 'cbn')
867         self.pubsub.channels.get(b'{some-ns},abs').assert_called_once_with('abs', 'cbn')