Read list type of environment variables
[ric-plt/sdlpy.git] / ricsdl-package / tests / backend / test_redis.py
1 # Copyright (c) 2019 AT&T Intellectual Property.
2 # Copyright (c) 2018-2022 Nokia.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 #
17 # This source code is part of the near-RT RIC (RAN Intelligent Controller)
18 # platform project (RICP).
19 #
20
21
22 from unittest.mock import patch, Mock, MagicMock, call, ANY
23 import pytest
24 from redis import exceptions as redis_exceptions
25 import ricsdl.backend
26 from ricsdl.backend.redis import (RedisBackendLock, _map_to_sdl_exception)
27 from ricsdl.configuration import _Configuration
28 from ricsdl.configuration import DbBackendType
29 import ricsdl.exceptions
30
31 EVENT_SEPARATOR = "___"
32
33 def get_test_sdl_standby_config():
34     return _Configuration.Params(db_host='service-ricplt-dbaas-tcp-cluster-0.ricplt',
35                                  db_ports=['6379'],
36                                  db_sentinel_ports=[],
37                                  db_sentinel_master_names=[],
38                                  db_cluster_addrs=['service-ricplt-dbaas-tcp-cluster-0.ricplt'],
39                                  db_type=DbBackendType.REDIS)
40
41 def get_test_sdl_sentinel_config():
42     return _Configuration.Params(db_host='service-ricplt-dbaas-tcp-cluster-0.ricplt',
43                                  db_ports=['6379'],
44                                  db_sentinel_ports=['26379'],
45                                  db_sentinel_master_names=['dbaasmaster'],
46                                  db_cluster_addrs=['service-ricplt-dbaas-tcp-cluster-0.ricplt'],
47                                  db_type=DbBackendType.REDIS)
48
49 def get_test_sdl_sentinel_cluster_config():
50     return _Configuration.Params(db_host='service-ricplt-dbaas-tcp-cluster-0.ricplt',
51                                  db_ports=['6379','6380'],
52                                  db_sentinel_ports=['26379','26380'],
53                                  db_sentinel_master_names=['dbaasmaster-cluster-0','dbaasmaster-cluster-1'],
54                                  db_cluster_addrs=['service-ricplt-dbaas-tcp-cluster-0.ricplt','service-ricplt-dbaas-tcp-cluster-1.ricplt'],
55                                  db_type=DbBackendType.REDIS)
56
57 @pytest.fixture()
58 def redis_backend_common_fixture(request):
59     request.cls.ns = 'some-ns'
60     request.cls.dl_redis = [b'1', b'2']
61     request.cls.dm = {'a': b'1', 'b': b'2'}
62     request.cls.dm_redis = {'{some-ns},a': b'1', '{some-ns},b': b'2'}
63     request.cls.dm_redis_flat = ['{some-ns},a', b'1', '{some-ns},b', b'2']
64     request.cls.key = 'a'
65     request.cls.key_redis = '{some-ns},a'
66     request.cls.keys = ['a', 'b']
67     request.cls.keys_redis = ['{some-ns},a', '{some-ns},b']
68     request.cls.data = b'123'
69     request.cls.old_data = b'1'
70     request.cls.new_data = b'3'
71     request.cls.keypattern = r'[Aa]bc-\[1\].?-*'
72     request.cls.keypattern_redis = r'{some-ns},[Aa]bc-\[1\].?-*'
73     request.cls.matchedkeys = ['Abc-[1].0-def', 'abc-[1].1-ghi']
74     request.cls.matchedkeys_redis = [b'{some-ns},Abc-[1].0-def',
75                                      b'{some-ns},abc-[1].1-ghi']
76     request.cls.matcheddata_redis = [b'10', b'11']
77     request.cls.matchedkeydata = {'Abc-[1].0-def': b'10',
78                                   'abc-[1].1-ghi': b'11'}
79     request.cls.group = 'some-group'
80     request.cls.group_redis = '{some-ns},some-group'
81     request.cls.groupmembers = set([b'm1', b'm2'])
82     request.cls.groupmember = b'm1'
83     request.cls.channels = ['ch1', 'ch2']
84     request.cls.channels_and_events = {'ch1': ['ev1'], 'ch2': ['ev2', 'ev3']}
85     request.cls.channels_and_events_redis = ['{some-ns},ch1', 'ev1',
86                                              '{some-ns},ch2', 'ev2' + EVENT_SEPARATOR + 'ev3']
87
88     yield
89
90 @pytest.fixture(params=['standalone', 'sentinel', 'sentinel_cluster'])
91 def redis_backend_fixture(request, redis_backend_common_fixture):
92     request.cls.configuration = Mock()
93     request.cls.configuration.get_event_separator.return_value = EVENT_SEPARATOR
94     request.cls.test_backend_type = request.param
95     if request.param == 'standalone':
96         cfg = get_test_sdl_standby_config()
97         request.cls.configuration.get_params.return_value = cfg
98         with patch('ricsdl.backend.redis.Redis') as mock_redis, patch(
99                    'ricsdl.backend.redis.PubSub') as mock_pubsub, patch(
100                    'threading.Thread') as mock_thread:
101             db = ricsdl.backend.get_backend_instance(request.cls.configuration)
102             request.cls.mock_redis = mock_redis.return_value
103             request.cls.mock_pubsub = mock_pubsub.return_value
104             request.cls.mock_pubsub_thread = mock_thread.return_value
105             request.cls.mock_pubsub_thread.is_alive.return_value = False
106         request.cls.db = db
107
108         mock_redis.assert_called_once_with(db=0, host=cfg.db_host, max_connections=20, port=cfg.db_ports[0])
109         mock_pubsub.assert_called_once_with(EVENT_SEPARATOR, request.cls.mock_redis.connection_pool,
110                                             ignore_subscribe_messages=True)
111         assert request.cls.mock_redis.set_response_callback.call_count == 2
112         assert request.cls.mock_redis.set_response_callback.call_args_list == [call('SETIE', ANY), call('DELIE', ANY)]
113
114     elif request.param == 'sentinel':
115         cfg = get_test_sdl_sentinel_config()
116         request.cls.configuration.get_params.return_value = cfg
117         with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel, patch(
118                    'ricsdl.backend.redis.PubSub') as mock_pubsub, patch(
119                    'threading.Thread') as mock_thread:
120             db = ricsdl.backend.get_backend_instance(request.cls.configuration)
121             request.cls.mock_redis = mock_sentinel.return_value.master_for.return_value
122             request.cls.mock_pubsub = mock_pubsub.return_value
123             request.cls.mock_pubsub_thread = mock_thread.return_value
124             request.cls.mock_pubsub_thread.is_alive.return_value = False
125         request.cls.db = db
126
127         mock_sentinel.assert_called_once_with([(cfg.db_host, cfg.db_sentinel_ports[0])])
128         mock_sentinel.master_for.called_once_with(cfg.db_sentinel_master_names[0])
129         mock_pubsub.assert_called_once_with(EVENT_SEPARATOR, request.cls.mock_redis.connection_pool,
130                                             ignore_subscribe_messages=True)
131         assert request.cls.mock_redis.set_response_callback.call_count == 2
132         assert request.cls.mock_redis.set_response_callback.call_args_list == [call('SETIE', ANY), call('DELIE', ANY)]
133
134     elif request.param == 'sentinel_cluster':
135         cfg = get_test_sdl_sentinel_cluster_config()
136         request.cls.configuration.get_params.return_value = cfg
137         with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel, patch(
138                    'ricsdl.backend.redis.PubSub') as mock_pubsub, patch(
139                    'threading.Thread') as mock_thread:
140             db = ricsdl.backend.get_backend_instance(request.cls.configuration)
141             request.cls.mock_redis = mock_sentinel.return_value.master_for.return_value
142             request.cls.mock_pubsub = mock_pubsub.return_value
143             request.cls.mock_pubsub_thread = mock_thread.return_value
144             request.cls.mock_pubsub_thread.is_alive.return_value = False
145         request.cls.db = db
146
147         assert mock_sentinel.call_count == 2
148         mock_sentinel.assert_has_calls([
149             call([('service-ricplt-dbaas-tcp-cluster-0.ricplt', '26379')]),
150             call([('service-ricplt-dbaas-tcp-cluster-1.ricplt', '26380')]),
151         ], any_order=True)
152         assert mock_sentinel.return_value.master_for.call_count == 2
153         mock_sentinel.return_value.master_for.assert_has_calls(
154             [call('dbaasmaster-cluster-0'), call('dbaasmaster-cluster-1')], any_order=True,
155         )
156         assert mock_pubsub.call_count == 2
157         mock_pubsub.assert_has_calls([
158             call(EVENT_SEPARATOR, request.cls.mock_redis.connection_pool, ignore_subscribe_messages=True),
159             call(EVENT_SEPARATOR, request.cls.mock_redis.connection_pool, ignore_subscribe_messages=True),
160         ])
161         assert request.cls.mock_redis.set_response_callback.call_count == 4
162         assert request.cls.mock_redis.set_response_callback.call_args_list == [
163             call('SETIE', ANY), call('DELIE', ANY),
164             call('SETIE', ANY), call('DELIE', ANY),
165         ]
166     else:
167         raise NotImplementedError
168
169     yield
170
171
172 @pytest.mark.usefixtures('redis_backend_fixture')
173 class TestRedisBackend:
174     def test_is_connected_function_success(self):
175         self.mock_redis.ping.return_value = True
176         ret = self.db.is_connected()
177         if self.test_backend_type == 'sentinel_cluster':
178             assert self.mock_redis.ping.call_count == 2
179         else:
180             assert self.mock_redis.ping.call_count == 1
181         assert ret is True
182
183     def test_is_connected_function_returns_false_if_ping_fails(self):
184         self.mock_redis.ping.return_value = False
185         ret = self.db.is_connected()
186         self.mock_redis.ping.assert_called_once()
187         assert ret is False
188
189     def test_is_connected_function_can_map_redis_exception_to_sdl_exception(self):
190         self.mock_redis.ping.side_effect = redis_exceptions.ResponseError('redis error!')
191         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
192             self.db.is_connected()
193
194     def test_set_function_success(self):
195         self.db.set(self.ns, self.dm)
196         self.mock_redis.mset.assert_called_once_with(self.dm_redis)
197
198     def test_set_function_can_map_redis_exception_to_sdl_exception(self):
199         self.mock_redis.mset.side_effect = redis_exceptions.ResponseError('redis error!')
200         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
201             self.db.set(self.ns, self.dm)
202
203     def test_set_if_function_success(self):
204         self.mock_redis.execute_command.return_value = True
205         ret = self.db.set_if(self.ns, self.key, self.old_data, self.new_data)
206         self.mock_redis.execute_command.assert_called_once_with('SETIE', self.key_redis,
207                                                                 self.new_data, self.old_data)
208         assert ret is True
209
210     def test_set_if_function_returns_false_if_existing_key_value_not_expected(self):
211         self.mock_redis.execute_command.return_value = False
212         ret = self.db.set_if(self.ns, self.key, self.old_data, self.new_data)
213         self.mock_redis.execute_command.assert_called_once_with('SETIE', self.key_redis,
214                                                                 self.new_data, self.old_data)
215         assert ret is False
216
217     def test_set_if_function_can_map_redis_exception_to_sdl_exception(self):
218         self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
219         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
220             self.db.set_if(self.ns, self.key, self.old_data, self.new_data)
221
222     def test_set_if_not_exists_function_success(self):
223         self.mock_redis.setnx.return_value = True
224         ret = self.db.set_if_not_exists(self.ns, self.key, self.new_data)
225         self.mock_redis.setnx.assert_called_once_with(self.key_redis, self.new_data)
226         assert ret is True
227
228     def test_set_if_not_exists_function_returns_false_if_key_already_exists(self):
229         self.mock_redis.setnx.return_value = False
230         ret = self.db.set_if_not_exists(self.ns, self.key, self.new_data)
231         self.mock_redis.setnx.assert_called_once_with(self.key_redis, self.new_data)
232         assert ret is False
233
234     def test_set_if_not_exists_function_can_map_redis_exception_to_sdl_exception(self):
235         self.mock_redis.setnx.side_effect = redis_exceptions.ResponseError('redis error!')
236         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
237             self.db.set_if_not_exists(self.ns, self.key, self.new_data)
238
239     def test_get_function_success(self):
240         self.mock_redis.mget.return_value = self.dl_redis
241         ret = self.db.get(self.ns, self.keys)
242         self.mock_redis.mget.assert_called_once_with(self.keys_redis)
243         assert ret == self.dm
244
245     def test_get_function_returns_empty_dict_when_no_key_values_exist(self):
246         self.mock_redis.mget.return_value = [None, None]
247         ret = self.db.get(self.ns, self.keys)
248         self.mock_redis.mget.assert_called_once_with(self.keys_redis)
249         assert ret == dict()
250
251     def test_get_function_returns_dict_only_with_found_key_values_when_some_keys_exist(self):
252         self.mock_redis.mget.return_value = [self.data, None]
253         ret = self.db.get(self.ns, self.keys)
254         self.mock_redis.mget.assert_called_once_with(self.keys_redis)
255         assert ret == {self.key: self.data}
256
257     def test_get_function_can_map_redis_exception_to_sdl_exception(self):
258         self.mock_redis.mget.side_effect = redis_exceptions.ResponseError('redis error!')
259         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
260             self.db.get(self.ns, self.keys)
261
262     def test_find_keys_function_success(self):
263         self.mock_redis.keys.return_value = self.matchedkeys_redis
264         ret = self.db.find_keys(self.ns, self.keypattern)
265         self.mock_redis.keys.assert_called_once_with(self.keypattern_redis)
266         assert ret == self.matchedkeys
267
268     def test_find_keys_function_returns_empty_list_when_no_matching_keys_found(self):
269         self.mock_redis.keys.return_value = []
270         ret = self.db.find_keys(self.ns, self.keypattern)
271         self.mock_redis.keys.assert_called_once_with(self.keypattern_redis)
272         assert ret == []
273
274     def test_find_keys_function_can_map_redis_exception_to_sdl_exception(self):
275         self.mock_redis.keys.side_effect = redis_exceptions.ResponseError('redis error!')
276         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
277             self.db.find_keys(self.ns, self.keypattern)
278
279     def test_find_keys_function_can_raise_exception_when_redis_key_convert_to_string_fails(self):
280         # Redis returns an illegal key, which conversion to string fails
281         corrupt_redis_key = b'\x81'
282         self.mock_redis.keys.return_value = [corrupt_redis_key]
283         with pytest.raises(ricsdl.exceptions.RejectedByBackend) as excinfo:
284             self.db.find_keys(self.ns, self.keypattern)
285         assert f"Namespace {self.ns} key:{corrupt_redis_key} "
286         "has no namespace prefix" in str(excinfo.value)
287
288     def test_find_keys_function_can_raise_exception_when_redis_key_is_without_prefix(self):
289         # Redis returns an illegal key, which doesn't have comma separated namespace prefix
290         corrupt_redis_key = 'some-corrupt-key'
291         self.mock_redis.keys.return_value = [f'{corrupt_redis_key}'.encode()]
292         with pytest.raises(ricsdl.exceptions.RejectedByBackend) as excinfo:
293             self.db.find_keys(self.ns, self.keypattern)
294         assert f"Namespace {self.ns} key:{corrupt_redis_key} "
295         "has no namespace prefix" in str(excinfo.value)
296
297     def test_find_and_get_function_success(self):
298         self.mock_redis.keys.return_value = self.matchedkeys_redis
299         self.mock_redis.mget.return_value = self.matcheddata_redis
300         ret = self.db.find_and_get(self.ns, self.keypattern)
301         self.mock_redis.keys.assert_called_once_with(self.keypattern_redis)
302         self.mock_redis.mget.assert_called_once_with([i.decode() for i in self.matchedkeys_redis])
303         assert ret == self.matchedkeydata
304
305     def test_find_and_get_function_returns_empty_dict_when_no_matching_keys_exist(self):
306         self.mock_redis.keys.return_value = list()
307         ret = self.db.find_and_get(self.ns, self.keypattern)
308         self.mock_redis.keys.assert_called_once_with(self.keypattern_redis)
309         assert not self.mock_redis.mget.called
310         assert ret == dict()
311
312     def test_find_and_get_function_can_map_redis_exception_to_sdl_exception(self):
313         self.mock_redis.keys.side_effect = redis_exceptions.ResponseError('redis error!')
314         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
315             self.db.find_and_get(self.ns, self.keypattern)
316
317     def test_find_and_get_function_can_raise_exception_when_redis_key_convert_to_string_fails(self):
318         # Redis returns an illegal key, which conversion to string fails
319         corrupt_redis_key = b'\x81'
320         self.mock_redis.keys.return_value = [corrupt_redis_key]
321         with pytest.raises(ricsdl.exceptions.RejectedByBackend) as excinfo:
322             self.db.find_and_get(self.ns, self.keypattern)
323         assert f"Namespace {self.ns} key:{corrupt_redis_key} "
324         "has no namespace prefix" in str(excinfo.value)
325
326     def test_find_and_get_function_can_raise_exception_when_redis_key_is_without_prefix(self):
327         # Redis returns an illegal key, which doesn't have comma separated namespace prefix
328         corrupt_redis_key = 'some-corrupt-key'
329         self.mock_redis.keys.return_value = [f'{corrupt_redis_key}'.encode()]
330         with pytest.raises(ricsdl.exceptions.RejectedByBackend) as excinfo:
331             self.db.find_and_get(self.ns, self.keypattern)
332         assert f"Namespace {self.ns} key:{corrupt_redis_key} "
333         "has no namespace prefix" in str(excinfo.value)
334
335     def test_remove_function_success(self):
336         self.db.remove(self.ns, self.keys)
337         self.mock_redis.delete.assert_called_once_with(*self.keys_redis)
338
339     def test_remove_function_can_map_redis_exception_to_sdl_exception(self):
340         self.mock_redis.delete.side_effect = redis_exceptions.ResponseError('redis error!')
341         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
342             self.db.remove(self.ns, self.keys)
343
344     def test_remove_if_function_success(self):
345         self.mock_redis.execute_command.return_value = True
346         ret = self.db.remove_if(self.ns, self.key, self.new_data)
347         self.mock_redis.execute_command.assert_called_once_with('DELIE', self.key_redis,
348                                                                 self.new_data)
349         assert ret is True
350
351     def test_remove_if_function_returns_false_if_data_does_not_match(self):
352         self.mock_redis.execute_command.return_value = False
353         ret = self.db.remove_if(self.ns, self.key, self.new_data)
354         self.mock_redis.execute_command.assert_called_once_with('DELIE', self.key_redis,
355                                                                 self.new_data)
356         assert ret is False
357
358     def test_remove_if_function_can_map_redis_exception_to_sdl_exception(self):
359         self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
360         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
361             self.db.remove_if(self.ns, self.key, self.new_data)
362
363     def test_add_member_function_success(self):
364         self.db.add_member(self.ns, self.group, self.groupmembers)
365         self.mock_redis.sadd.assert_called_once_with(self.group_redis, *self.groupmembers)
366
367     def test_add_member_function_can_map_redis_exception_to_sdl_exception(self):
368         self.mock_redis.sadd.side_effect = redis_exceptions.ResponseError('redis error!')
369         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
370             self.db.add_member(self.ns, self.group, self.groupmembers)
371
372     def test_remove_member_function_success(self):
373         self.db.remove_member(self.ns, self.group, self.groupmembers)
374         self.mock_redis.srem.assert_called_once_with(self.group_redis, *self.groupmembers)
375
376     def test_remove_member_function_can_map_redis_exception_to_sdl_exception(self):
377         self.mock_redis.srem.side_effect = redis_exceptions.ResponseError('redis error!')
378         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
379             self.db.remove_member(self.ns, self.group, self.groupmembers)
380
381     def test_remove_group_function_success(self):
382         self.db.remove_group(self.ns, self.group)
383         self.mock_redis.delete.assert_called_once_with(self.group_redis)
384
385     def test_remove_group_function_can_map_redis_exception_to_sdl_exception(self):
386         self.mock_redis.delete.side_effect = redis_exceptions.ResponseError('redis error!')
387         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
388             self.db.remove_group(self.ns, self.group)
389
390     def test_get_members_function_success(self):
391         self.mock_redis.smembers.return_value = self.groupmembers
392         ret = self.db.get_members(self.ns, self.group)
393         self.mock_redis.smembers.assert_called_once_with(self.group_redis)
394         assert ret is self.groupmembers
395
396     def test_get_members_function_can_map_redis_exception_to_sdl_exception(self):
397         self.mock_redis.smembers.side_effect = redis_exceptions.ResponseError('redis error!')
398         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
399             self.db.get_members(self.ns, self.group)
400
401     def test_is_member_function_success(self):
402         self.mock_redis.sismember.return_value = True
403         ret = self.db.is_member(self.ns, self.group, self.groupmember)
404         self.mock_redis.sismember.assert_called_once_with(self.group_redis, self.groupmember)
405         assert ret is True
406
407     def test_is_member_function_can_map_redis_exception_to_sdl_exception(self):
408         self.mock_redis.sismember.side_effect = redis_exceptions.ResponseError('redis error!')
409         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
410             self.db.is_member(self.ns, self.group, self.groupmember)
411
412     def test_group_size_function_success(self):
413         self.mock_redis.scard.return_value = 100
414         ret = self.db.group_size(self.ns, self.group)
415         self.mock_redis.scard.assert_called_once_with(self.group_redis)
416         assert ret == 100
417
418     def test_group_size_function_can_map_redis_exception_to_sdl_exception(self):
419         self.mock_redis.scard.side_effect = redis_exceptions.ResponseError('Some redis error!')
420         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
421             self.db.group_size(self.ns, self.group)
422
423     def test_set_and_publish_success(self):
424         self.db.set_and_publish(self.ns, self.channels_and_events, self.dm)
425         self.mock_redis.execute_command.assert_called_once_with('MSETMPUB', len(self.dm),
426                                                                 len(self.channels_and_events),
427                                                                 *self.dm_redis_flat,
428                                                                 *self.channels_and_events_redis)
429
430     def test_set_and_publish_can_map_redis_exception_to_sdl_exception(self):
431         self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
432         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
433             self.db.set_and_publish(self.ns, self.channels_and_events, self.dm)
434
435     def test_set_if_and_publish_success(self):
436         self.mock_redis.execute_command.return_value = b"OK"
437         ret = self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
438                                          self.new_data)
439         self.mock_redis.execute_command.assert_called_once_with('SETIEMPUB', self.key_redis,
440                                                                 self.new_data, self.old_data,
441                                                                 *self.channels_and_events_redis)
442         assert ret is True
443
444     def test_set_if_and_publish_returns_false_if_existing_key_value_not_expected(self):
445         self.mock_redis.execute_command.return_value = None
446         ret = self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
447                                          self.new_data)
448         self.mock_redis.execute_command.assert_called_once_with('SETIEMPUB', self.key_redis,
449                                                                 self.new_data, self.old_data,
450                                                                 *self.channels_and_events_redis)
451         assert ret is False
452
453     def test_set_if_and_publish_can_map_redis_exception_to_sdl_exception(self):
454         self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
455         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
456             self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
457                                        self.new_data)
458
459     def test_set_if_not_exists_and_publish_success(self):
460         self.mock_redis.execute_command.return_value = b"OK"
461         ret = self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
462                                                     self.new_data)
463         self.mock_redis.execute_command.assert_called_once_with('SETNXMPUB', self.key_redis,
464                                                                 self.new_data,
465                                                                 *self.channels_and_events_redis)
466         assert ret is True
467
468     def test_set_if_not_exists_and_publish_returns_false_if_key_already_exists(self):
469         self.mock_redis.execute_command.return_value = None
470         ret = self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
471                                                     self.new_data)
472         self.mock_redis.execute_command.assert_called_once_with('SETNXMPUB', self.key_redis,
473                                                                 self.new_data,
474                                                                 *self.channels_and_events_redis)
475         assert ret is False
476
477     def set_if_not_exists_and_publish_can_map_redis_exception_to_sdl_exception(self):
478         self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
479         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
480             self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
481                                                   self.new_data)
482
483     def test_remove_and_publish_success(self):
484         self.db.remove_and_publish(self.ns, self.channels_and_events, self.key)
485         self.mock_redis.execute_command.assert_called_once_with('DELMPUB', len(self.key),
486                                                                 len(self.channels_and_events),
487                                                                 self.key_redis,
488                                                                 *self.channels_and_events_redis)
489
490     def test_remove_if_and_publish_success(self):
491         self.mock_redis.execute_command.return_value = 1
492         ret = self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
493                                             self.new_data)
494         self.mock_redis.execute_command.assert_called_once_with('DELIEMPUB', self.key_redis,
495                                                                 self.new_data,
496                                                                 *self.channels_and_events_redis)
497         assert ret is True
498
499     def test_remove_if_and_publish_returns_false_if_data_does_not_match(self):
500         self.mock_redis.execute_command.return_value = 0
501         ret = self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
502                                             self.new_data)
503         self.mock_redis.execute_command.assert_called_once_with('DELIEMPUB', self.key_redis,
504                                                                 self.new_data,
505                                                                 *self.channels_and_events_redis)
506         assert ret is False
507
508     def test_remove_if_and_publish_can_map_redis_exception_to_sdl_exception(self):
509         self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
510         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
511             self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
512                                           self.new_data)
513
514     def test_remove_all_and_publish_success(self):
515         self.mock_redis.keys.return_value = ['{some-ns},a']
516         self.db.remove_all_and_publish(self.ns, self.channels_and_events)
517         self.mock_redis.keys.assert_called_once()
518         self.mock_redis.execute_command.assert_called_once_with('DELMPUB', len(self.key),
519                                                                 len(self.channels_and_events),
520                                                                 self.key_redis,
521                                                                 *self.channels_and_events_redis)
522
523     def test_remove_all_and_publish_can_map_redis_exception_to_sdl_exception(self):
524         self.mock_redis.execute_command.side_effect = redis_exceptions.ResponseError('redis error!')
525         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
526             self.db.remove_all_and_publish(self.ns, self.channels_and_events)
527
528     def test_subscribe_channel_success(self):
529         cb = Mock()
530         self.db.subscribe_channel(self.ns, cb, self.channels)
531         for channel in self.channels:
532             self.mock_pubsub.subscribe.assert_any_call(**{f'{{some-ns}},{channel}': cb})
533
534     def test_subscribe_channel_with_thread_success(self):
535         cb = Mock()
536         # Call first start_event_listener() to enable run_in_thread flag. When subscribe_channel()
537         # is called thread is started if the flag is enabled. In real-life scenario it's highly
538         # advisable at first to subscribe to some events by calling subscribe_channel() and only
539         # after it start threads by calling start_event_listener().
540         self.db.start_event_listener()
541         self.db.subscribe_channel(self.ns, cb, self.channels)
542         self.mock_pubsub.run_in_thread.assert_called_once_with(daemon=True, sleep_time=0.001)
543
544     def test_subscribe_can_map_redis_exception_to_sdl_exeception(self):
545         self.mock_pubsub.subscribe.side_effect = redis_exceptions.ResponseError('redis error!')
546         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
547             self.db.subscribe_channel(self.ns, Mock(), self.channels)
548
549     def test_unsubscribe_channel_success(self):
550         self.db.unsubscribe_channel(self.ns, [self.channels[0]])
551         self.mock_pubsub.unsubscribe.assert_called_with('{some-ns},ch1')
552
553     def test_unsubscribe_channel_can_map_redis_exception_to_sdl_exeception(self):
554         self.mock_pubsub.unsubscribe.side_effect = redis_exceptions.ResponseError('redis error!')
555         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
556             self.db.unsubscribe_channel(self.ns, [self.channels[0]])
557
558     def test_subscribe_and_start_event_listener(self):
559         self.mock_redis.pubsub_channels.return_value = self.channels_and_events_redis
560         self.db.subscribe_channel(self.ns, Mock(), self.channels)
561         self.db.start_event_listener()
562
563         if self.test_backend_type == 'sentinel_cluster':
564             assert self.mock_redis.pubsub_channels.call_count == 2
565             assert self.mock_pubsub.run_in_thread.call_count == 2
566             self.mock_pubsub.run_in_thread.assert_has_calls([
567                 call(daemon=True, sleep_time=0.001),
568                 call(daemon=True, sleep_time=0.001),
569             ])
570         else:
571             self.mock_redis.pubsub_channels.assert_called_once()
572             self.mock_pubsub.run_in_thread.assert_called_once_with(daemon=True, sleep_time=0.001)
573
574     def test_start_event_listener_fail(self):
575         self.mock_pubsub_thread.is_alive.return_value = True
576         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
577             self.db.start_event_listener()
578
579     def test_handle_events_success(self):
580         self.db.handle_events()
581         self.db.handle_events()
582         self.db.handle_events()
583         self.db.handle_events()
584         assert self.mock_pubsub.get_message.call_count == 4
585
586     def test_handle_events_fail_if_subsub_thread_alive(self):
587         self.mock_pubsub_thread.is_alive.return_value = True
588         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
589             self.db.handle_events()
590
591     def test_handle_events_fail_if_event_listener_already_running(self):
592         self.db.start_event_listener()
593         with pytest.raises(ricsdl.exceptions.RejectedByBackend):
594             self.db.handle_events()
595
596     def test_handle_events_ignores_message_handling_redis_runtime_exception(self):
597          self.mock_pubsub.get_message.side_effect = RuntimeError()
598          self.db.handle_events()
599          self.mock_pubsub.get_message.assert_called_once()
600
601     def test_get_redis_connection_function_success(self):
602         ret = self.db.get_redis_connection(self.ns)
603         assert ret is self.mock_redis
604
605     def test_redis_backend_object_string_representation(self):
606         str_out = str(self.db)
607         assert str_out is not None
608
609     def test_namespace_hash_algorithm_stays_unaltered(self):
610         ret_hash = self.db._RedisBackend__get_hash('sdltoolns')
611         assert ret_hash == 2897969051
612
613 def test_standalone_redis_init_exception_is_mapped_to_sdl_exeception():
614     mock_cfg = Mock()
615     cfg_params = get_test_sdl_standby_config()
616     mock_cfg.get_params.return_value = cfg_params
617     with pytest.raises(ricsdl.exceptions.RejectedByBackend):
618         with patch('ricsdl.backend.redis.Redis') as mock_redis:
619             mock_redis.side_effect = redis_exceptions.ResponseError('redis error!')
620             ricsdl.backend.get_backend_instance(mock_cfg)
621
622 def test_standalone_pubsub_init_exception_is_mapped_to_sdl_exeception():
623     mock_cfg = Mock()
624     cfg_params = get_test_sdl_standby_config()
625     mock_cfg.get_params.return_value = cfg_params
626     with pytest.raises(ricsdl.exceptions.RejectedByBackend):
627         with patch('ricsdl.backend.redis.Redis') as mock_redis, patch(
628                    'ricsdl.backend.redis.PubSub') as mock_pubsub:
629             mock_pubsub.side_effect = redis_exceptions.ResponseError('redis error!')
630             ricsdl.backend.get_backend_instance(mock_cfg)
631
632 def test_sentinel_redis_init_exception_is_mapped_to_sdl_exeception():
633     mock_cfg = Mock()
634     cfg_params = get_test_sdl_sentinel_config()
635     mock_cfg.get_params.return_value = cfg_params
636     with pytest.raises(ricsdl.exceptions.RejectedByBackend):
637         with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel:
638             mock_sentinel.side_effect = redis_exceptions.ResponseError('redis error!')
639             ricsdl.backend.get_backend_instance(mock_cfg)
640
641 def test_sentinel_pubsub_init_exception_is_mapped_to_sdl_exeception():
642     mock_cfg = Mock()
643     cfg_params = get_test_sdl_sentinel_config()
644     mock_cfg.get_params.return_value = cfg_params
645     with pytest.raises(ricsdl.exceptions.RejectedByBackend):
646         with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel, patch(
647                    'ricsdl.backend.redis.PubSub') as mock_pubsub:
648             mock_pubsub.side_effect = redis_exceptions.ResponseError('redis error!')
649             ricsdl.backend.get_backend_instance(mock_cfg)
650
651 def test_sentinel_master_for_exception_is_mapped_to_sdl_exeception():
652     mock_cfg = Mock()
653     cfg_params = get_test_sdl_sentinel_config()
654     mock_cfg.get_params.return_value = cfg_params
655     with pytest.raises(ricsdl.exceptions.RejectedByBackend):
656         with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel, patch(
657                    'ricsdl.backend.redis.PubSub') as mock_pubsub:
658             mock_sentinel.return_value.master_for.side_effect = redis_exceptions.ResponseError('redis error!')
659             ricsdl.backend.get_backend_instance(mock_cfg)
660
661 def test_sentinel_cluster_redis_init_exception_is_mapped_to_sdl_exeception():
662     mock_cfg = Mock()
663     cfg_params = get_test_sdl_sentinel_cluster_config()
664     mock_cfg.get_params.return_value = cfg_params
665     with pytest.raises(ricsdl.exceptions.RejectedByBackend):
666         with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel:
667             mock_sentinel.side_effect = redis_exceptions.ResponseError('redis error!')
668             ricsdl.backend.get_backend_instance(mock_cfg)
669
670 def test_sentinel_cluster_pubsub_init_exception_is_mapped_to_sdl_exeception():
671     mock_cfg = Mock()
672     cfg_params = get_test_sdl_sentinel_cluster_config()
673     mock_cfg.get_params.return_value = cfg_params
674     with pytest.raises(ricsdl.exceptions.RejectedByBackend):
675         with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel, patch(
676                    'ricsdl.backend.redis.PubSub') as mock_pubsub:
677             mock_pubsub.side_effect = redis_exceptions.ResponseError('redis error!')
678             ricsdl.backend.get_backend_instance(mock_cfg)
679
680 def test_sentinel_cluster_master_for_exception_is_mapped_to_sdl_exeception():
681     mock_cfg = Mock()
682     cfg_params = get_test_sdl_sentinel_cluster_config()
683     mock_cfg.get_params.return_value = cfg_params
684     with pytest.raises(ricsdl.exceptions.RejectedByBackend):
685         with patch('ricsdl.backend.redis.Sentinel') as mock_sentinel, patch(
686                    'ricsdl.backend.redis.PubSub') as mock_pubsub:
687             mock_sentinel.return_value.master_for.side_effect = redis_exceptions.ResponseError('redis error!')
688             ricsdl.backend.get_backend_instance(mock_cfg)
689
690 class MockRedisLock:
691     def __init__(self, redis, name, timeout=None, sleep=0.1,
692                  blocking=True, blocking_timeout=None, thread_local=True):
693         self.redis = redis
694         self.name = name
695         self.timeout = timeout
696         self.sleep = sleep
697         self.blocking = blocking
698         self.blocking_timeout = blocking_timeout
699         self.thread_local = bool(thread_local)
700
701
702 @pytest.fixture(scope="module")
703 def mock_redis_lock():
704     def _mock_redis_lock(name, timeout=None, sleep=0.1,
705                          blocking=True, blocking_timeout=None, thread_local=True):
706         return MockRedisLock(name, timeout, sleep, blocking, blocking_timeout, thread_local)
707     return _mock_redis_lock
708
709
710 @pytest.fixture()
711 def redis_backend_lock_fixture(request, mock_redis_lock):
712     request.cls.ns = 'some-ns'
713     request.cls.lockname = 'some-lock-name'
714     request.cls.lockname_redis = '{some-ns},some-lock-name'
715     request.cls.expiration = 10
716     request.cls.retry_interval = 0.1
717     request.cls.retry_timeout = 1
718
719     request.cls.mock_lua_get_validity_time = Mock()
720     request.cls.mock_lua_get_validity_time.return_value = 2000
721
722     request.cls.mock_redis = Mock()
723     request.cls.mock_redis.register_script = Mock()
724     request.cls.mock_redis.register_script.return_value = request.cls.mock_lua_get_validity_time
725
726     mocked_dbbackend = Mock()
727     mocked_dbbackend.get_redis_connection.return_value = request.cls.mock_redis
728
729     request.cls.configuration = Mock()
730     mock_conf_params = _Configuration.Params(db_host=None,
731                                              db_ports=None,
732                                              db_sentinel_ports=None,
733                                              db_sentinel_master_names=None,
734                                              db_cluster_addrs=None,
735                                              db_type=DbBackendType.REDIS)
736     request.cls.configuration.get_params.return_value = mock_conf_params
737
738     with patch('ricsdl.backend.redis.Lock') as mock_redis_lock:
739         lock = ricsdl.backend.get_backend_lock_instance(request.cls.configuration,
740                                                         request.cls.ns, request.cls.lockname,
741                                                         request.cls.expiration, mocked_dbbackend)
742         request.cls.mock_redis_lock = mock_redis_lock.return_value
743         request.cls.lock = lock
744     yield
745     RedisBackendLock.lua_get_validity_time = None
746
747
748 @pytest.mark.usefixtures('redis_backend_lock_fixture')
749 class TestRedisBackendLock:
750     def test_acquire_function_success(self):
751         self.mock_redis_lock.acquire.return_value = True
752         ret = self.lock.acquire(self.retry_interval, self.retry_timeout)
753         self.mock_redis_lock.acquire.assert_called_once_with(blocking_timeout=self.retry_timeout)
754         assert ret is True
755
756     def test_acquire_function_returns_false_if_lock_is_not_acquired(self):
757         self.mock_redis_lock.acquire.return_value = False
758         ret = self.lock.acquire(self.retry_interval, self.retry_timeout)
759         self.mock_redis_lock.acquire.assert_called_once_with(blocking_timeout=self.retry_timeout)
760         assert ret is False
761
762     def test_acquire_function_can_map_redis_exception_to_sdl_exception(self):
763         self.mock_redis_lock.acquire.side_effect = redis_exceptions.LockError('redis lock error!')
764         with pytest.raises(ricsdl.exceptions.BackendError):
765             self.lock.acquire(self.retry_interval, self.retry_timeout)
766
767     def test_release_function_success(self):
768         self.lock.release()
769         self.mock_redis_lock.release.assert_called_once()
770
771     def test_release_function_can_map_redis_exception_to_sdl_exception(self):
772         self.mock_redis_lock.release.side_effect = redis_exceptions.LockError('redis lock error!')
773         with pytest.raises(ricsdl.exceptions.BackendError):
774             self.lock.release()
775
776     def test_refresh_function_success(self):
777         self.lock.refresh()
778         self.mock_redis_lock.reacquire.assert_called_once()
779
780     def test_refresh_function_can_map_redis_exception_to_sdl_exception(self):
781         self.mock_redis_lock.reacquire.side_effect = redis_exceptions.LockError('redis lock error!')
782         with pytest.raises(ricsdl.exceptions.BackendError):
783             self.lock.refresh()
784
785     def test_get_validity_time_function_success(self):
786         self.mock_redis_lock.name = self.lockname_redis
787         self.mock_redis_lock.local.token = 123
788
789         ret = self.lock.get_validity_time()
790         self.mock_lua_get_validity_time.assert_called_once_with(
791             keys=[self.lockname_redis], args=[123], client=self.mock_redis)
792         assert ret == 2
793
794     def test_get_validity_time_function_second_fraction_success(self):
795         self.mock_redis_lock.name = self.lockname_redis
796         self.mock_redis_lock.local.token = 123
797         self.mock_lua_get_validity_time.return_value = 234
798
799         ret = self.lock.get_validity_time()
800         self.mock_lua_get_validity_time.assert_called_once_with(
801             keys=[self.lockname_redis], args=[123], client=self.mock_redis)
802         assert ret == 0.234
803
804     def test_get_validity_time_function_can_raise_exception_if_lock_is_unlocked(self):
805         self.mock_redis_lock.name = self.lockname_redis
806         self.mock_redis_lock.local.token = None
807
808         with pytest.raises(ricsdl.exceptions.RejectedByBackend) as excinfo:
809             self.lock.get_validity_time()
810         assert f"Cannot get validity time of an unlocked lock {self.lockname}" in str(excinfo.value)
811
812     def test_get_validity_time_function_can_raise_exception_if_lua_script_fails(self):
813         self.mock_redis_lock.name = self.lockname_redis
814         self.mock_redis_lock.local.token = 123
815         self.mock_lua_get_validity_time.return_value = -10
816
817         with pytest.raises(ricsdl.exceptions.RejectedByBackend) as excinfo:
818             self.lock.get_validity_time()
819         assert f"Getting validity time of a lock {self.lockname} failed with error code: -10" in str(excinfo.value)
820
821     def test_redis_backend_lock_object_string_representation(self):
822         expected_lock_info = {'lock DB type': 'Redis',
823                               'lock namespace': 'some-ns',
824                               'lock name': 'some-lock-name',
825                               'lock status': 'locked'}
826         assert str(self.lock) == str(expected_lock_info)
827
828     def test_redis_backend_lock_object_string_representation_can_catch_redis_exception(self):
829         self.mock_redis_lock.owned.side_effect = redis_exceptions.LockError('redis lock error!')
830         expected_lock_info = {'lock DB type': 'Redis',
831                               'lock namespace': 'some-ns',
832                               'lock name': 'some-lock-name',
833                               'lock status': 'Error: redis lock error!'}
834         assert str(self.lock) == str(expected_lock_info)
835
836
837 def test_redis_response_error_exception_is_mapped_to_rejected_by_backend_sdl_exception():
838     with pytest.raises(ricsdl.exceptions.RejectedByBackend) as excinfo:
839         with _map_to_sdl_exception():
840             raise redis_exceptions.ResponseError('Some redis error!')
841     assert "SDL backend rejected the request: Some redis error!" in str(excinfo.value)
842
843
844 def test_redis_connection_error_exception_is_mapped_to_not_connected_sdl_exception():
845     with pytest.raises(ricsdl.exceptions.NotConnected) as excinfo:
846         with _map_to_sdl_exception():
847             raise redis_exceptions.ConnectionError('Some redis error!')
848     assert "SDL not connected to backend: Some redis error!" in str(excinfo.value)
849
850
851 def test_rest_redis_exceptions_are_mapped_to_backend_error_sdl_exception():
852     with pytest.raises(ricsdl.exceptions.BackendError) as excinfo:
853         with _map_to_sdl_exception():
854             raise redis_exceptions.RedisError('Some redis error!')
855     assert "SDL backend failed to process the request: Some redis error!" in str(excinfo.value)
856
857
858 def test_system_error_exceptions_are_not_mapped_to_any_sdl_exception():
859     with pytest.raises(SystemExit):
860         with _map_to_sdl_exception():
861             raise SystemExit('Fatal error')
862
863
864 class TestRedisClient:
865     @classmethod
866     def setup_class(cls):
867         cls.pubsub = ricsdl.backend.redis.PubSub(EVENT_SEPARATOR, Mock())
868         cls.pubsub.channels = {b'{some-ns},ch1': Mock()}
869
870     def test_handle_pubsub_message(self):
871         assert self.pubsub.handle_message([b'message', b'{some-ns},ch1', b'cbn']) == ('ch1', ['cbn'])
872         self.pubsub.channels.get(b'{some-ns},ch1').assert_called_once_with('ch1', ['cbn'])