Add support for notifications
[ric-plt/sdlpy.git] / ricsdl-package / tests / backend / test_fake_dict_db.py
1 # Copyright (c) 2019 AT&T Intellectual Property.
2 # Copyright (c) 2018-2019 Nokia.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 #
17 # This source code is part of the near-RT RIC (RAN Intelligent Controller)
18 # platform project (RICP).
19 #
20
21 import queue
22 import time
23 from unittest.mock import Mock
24 import pytest
25 import ricsdl.backend
26 from ricsdl.configuration import _Configuration
27 from ricsdl.configuration import DbBackendType
28
29
30 @pytest.fixture()
31 def fake_dict_backend_fixture(request):
32     request.cls.ns = 'some-ns'
33     request.cls.dm = {'abc': b'1', 'bcd': b'2'}
34     request.cls.new_dm = {'abc': b'3', 'bcd': b'2'}
35     request.cls.dm2 = {'cdf': b'4'}
36     request.cls.remove_dm = {'bcd': b'2'}
37     request.cls.key = 'abc'
38     request.cls.keys = ['abc', 'bcd']
39     request.cls.key2 = ['cdf']
40     request.cls.old_data = b'1'
41     request.cls.new_data = b'3'
42     request.cls.keypattern = r'*bc*'
43     request.cls.group = 'some-group'
44     request.cls.groupmember = b'm1'
45     request.cls.groupmembers = set([b'm1', b'm2'])
46     request.cls.new_groupmembers = set(b'm3')
47     request.cls.all_groupmembers = request.cls.groupmembers | request.cls.new_groupmembers
48     request.cls.channels = ['abs', 'gma']
49     request.cls.channels_and_events = {'abs': ['cbn']}
50
51     request.cls.configuration = Mock()
52     mock_conf_params = _Configuration.Params(db_host=None,
53                                              db_port=None,
54                                              db_sentinel_port=None,
55                                              db_sentinel_master_name=None,
56                                              db_type=DbBackendType.FAKE_DICT)
57     request.cls.configuration.get_params.return_value = mock_conf_params
58     request.cls.db = ricsdl.backend.get_backend_instance(request.cls.configuration)
59
60
61 @pytest.mark.usefixtures('fake_dict_backend_fixture')
62 class TestFakeDictBackend:
63     def test_is_connected_function_success(self):
64         ret = self.db.is_connected()
65         assert ret is True
66
67     def test_set_function_success(self):
68         self.db.set(self.ns, self.dm)
69         self.db.set(self.ns, self.dm2)
70         ret = self.db.get(self.ns, self.keys)
71         assert ret == self.dm
72         ret = self.db.get(self.ns, self.key2)
73         assert ret == self.dm2
74
75     def test_set_if_function_success(self):
76         self.db.set(self.ns, self.dm)
77         ret = self.db.set_if(self.ns, self.key, self.old_data, self.new_data)
78         assert ret is True
79         ret = self.db.get(self.ns, self.keys)
80         assert ret == self.new_dm
81
82     def test_set_if_function_returns_false_if_existing_key_value_not_expected(self):
83         self.db.set_if(self.ns, self.key, self.old_data, self.new_data)
84         self.db.set(self.ns, self.new_dm)
85         ret = self.db.set_if(self.ns, self.key, self.old_data, self.new_data)
86         assert ret is False
87
88     def test_set_if_not_exists_function_success(self):
89         ret = self.db.set_if_not_exists(self.ns, self.key, self.new_data)
90         assert ret is True
91         ret = self.db.get(self.ns, self.keys)
92         assert ret == {self.key: self.new_data}
93
94     def test_set_if_not_exists_function_returns_false_if_key_already_exists(self):
95         self.db.set(self.ns, self.dm)
96         ret = self.db.set_if_not_exists(self.ns, self.key, self.new_data)
97         assert ret is False
98
99     def test_find_keys_function_success(self):
100         self.db.set(self.ns, self.dm)
101         ret = self.db.find_keys(self.ns, self.keypattern)
102         assert ret == self.keys
103
104     def test_find_keys_function_returns_empty_list_when_no_matching_keys_found(self):
105         ret = self.db.find_keys(self.ns, self.keypattern)
106         assert ret == []
107
108     def test_find_and_get_function_success(self):
109         self.db.set(self.ns, self.dm)
110         ret = self.db.find_and_get(self.ns, self.keypattern)
111         assert ret == self.dm
112
113     def test_find_and_get_function_returns_empty_dict_when_no_matching_keys_exist(self):
114         ret = self.db.find_and_get(self.ns, self.keypattern)
115         assert ret == dict()
116
117     def test_remove_function_success(self):
118         self.db.set(self.ns, self.dm)
119         self.db.remove(self.ns, self.keys)
120         ret = self.db.get(self.ns, self.keys)
121         assert ret == dict()
122
123     def test_remove_if_function_success(self):
124         self.db.set(self.ns, self.dm)
125         ret = self.db.remove_if(self.ns, self.key, self.old_data)
126         assert ret is True
127         ret = self.db.get(self.ns, self.keys)
128         assert ret == self.remove_dm
129
130     def test_remove_if_function_returns_false_if_data_does_not_match(self):
131         ret = self.db.remove_if(self.ns, self.key, self.old_data)
132         assert ret is False
133         self.db.set(self.ns, self.dm)
134         ret = self.db.remove_if(self.ns, self.key, self.new_data)
135         assert ret is False
136
137     def test_add_member_function_success(self):
138         self.db.add_member(self.ns, self.group, self.groupmembers)
139         ret = self.db.get_members(self.ns, self.group)
140         assert ret == self.groupmembers
141
142         self.db.add_member(self.ns, self.group, self.new_groupmembers)
143         ret = self.db.get_members(self.ns, self.group)
144         assert ret == self.all_groupmembers
145
146     def test_remove_member_function_success(self):
147         self.db.remove_member(self.ns, self.group, self.groupmembers)
148         self.db.add_member(self.ns, self.group, self.groupmembers)
149         self.db.remove_member(self.ns, self.group, self.groupmembers)
150         ret = self.db.get_members(self.ns, self.group)
151         assert ret == set()
152
153     def test_remove_group_function_success(self):
154         self.db.remove_group(self.ns, self.group)
155         ret = self.db.get_members(self.ns, self.group)
156         assert ret == set()
157
158     def test_is_member_function_success(self):
159         ret = self.db.is_member(self.ns, self.group, b'not member')
160         assert ret is False
161         self.db.add_member(self.ns, self.group, self.groupmembers)
162         ret = self.db.is_member(self.ns, self.group, self.groupmember)
163         assert ret is True
164         ret = self.db.is_member(self.ns, self.group, b'not member')
165         assert ret is False
166
167     def test_group_size_function_success(self):
168         ret = self.db.group_size(self.ns, self.group)
169         assert ret == 0
170         self.db.add_member(self.ns, self.group, self.groupmembers)
171         ret = self.db.group_size(self.ns, self.group)
172         assert ret == len(self.groupmembers)
173
174     def test_fake_dict_backend_object_string_representation(self):
175         assert str(self.db) == str({'DB type': 'FAKE DB'})
176
177     def test_set_and_publish_function_success(self):
178         self.db.set_and_publish(self.ns, self.channels_and_events, self.dm)
179         ret = self.db.get(self.ns, self.keys)
180         assert ret == self.dm
181         assert self.db._queue.qsize() == 1
182
183     def test_set_if_and_publish_success(self):
184         self.db.set(self.ns, self.dm)
185         ret = self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
186                                          self.new_data)
187         assert ret is True
188         ret = self.db.get(self.ns, self.keys)
189         assert ret == self.new_dm
190         assert self.db._queue.qsize() == 1
191
192     def test_set_if_and_publish_returns_false_if_existing_key_value_not_expected(self):
193         self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
194                                    self.new_data)
195         self.db.set(self.ns, self.new_dm)
196         ret = self.db.set_if(self.ns, self.key, self.old_data, self.new_data)
197         assert ret is False
198         assert self.db._queue.qsize() == 0
199
200     def test_set_if_not_exists_and_publish_success(self):
201         ret = self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
202                                                     self.new_data)
203         assert ret is True
204         ret = self.db.get(self.ns, self.keys)
205         assert ret == {self.key: self.new_data}
206         assert self.db._queue.qsize() == 1
207
208     def test_set_if_not_exists_and_publish_returns_false_if_key_already_exists(self):
209         self.db.set(self.ns, self.dm)
210         ret = self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
211                                                     self.new_data)
212         assert ret is False
213         assert self.db._queue.qsize() == 0
214
215     def test_remove_and_publish_function_success(self):
216         self.db.set(self.ns, self.dm)
217         self.db.remove_and_publish(self.ns, self.channels_and_events, self.keys)
218         ret = self.db.get(self.ns, self.keys)
219         assert ret == dict()
220         assert self.db._queue.qsize() == 1
221
222     def test_remove_if_and_publish_success(self):
223         self.db.set(self.ns, self.dm)
224         ret = self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
225                                             self.old_data)
226         assert ret is True
227         ret = self.db.get(self.ns, self.keys)
228         assert ret == self.remove_dm
229         assert self.db._queue.qsize() == 1
230
231     def test_remove_if_and_publish_returns_false_if_data_does_not_match(self):
232         ret = self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
233                                             self.old_data)
234         assert ret is False
235         self.db.set(self.ns, self.dm)
236         ret = self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
237                                             self.new_data)
238         assert ret is False
239         assert self.db._queue.qsize() == 0
240
241     def test_remove_all_publish_success(self):
242         self.db.set(self.ns, self.dm)
243         self.db.remove_all_and_publish(self.ns, self.channels_and_events)
244         ret = self.db.get(self.ns, self.keys)
245         assert ret == dict()
246         assert self.db._queue.qsize() == 1
247
248     def test_subscribe_channel_success(self):
249         cb = Mock()
250         self.db.subscribe_channel(self.ns, cb, self.channels)
251         for channel in self.channels:
252             assert self.db._channel_cbs.get(channel, None)
253         assert not self.db._listen_thread.is_alive()
254
255     def test_subscribe_channel_event_loop_success(self):
256         cb = Mock()
257         self.db.start_event_listener()
258         self.db.subscribe_channel(self.ns, cb, self.channels)
259         for channel in self.channels:
260             assert self.db._channel_cbs.get(channel, None)
261         assert self.db._listen_thread.is_alive()
262
263     def test_unsubscribe_channel_success(self):
264         self.db.subscribe_channel(self.ns, Mock(), self.channels)
265         self.db.unsubscribe_channel(self.ns, [self.channels[0]])
266         assert self.db._channel_cbs.get(self.channels[0], None) is None
267         assert self.db._channel_cbs.get(self.channels[1], None)
268
269     def test_listen(self):
270         cb = Mock()
271         self.db.start_event_listener()
272         self.db.subscribe_channel(self.ns, cb, self.channels)
273         self.db._queue.put(("abs", "cbn"))
274         time.sleep(0.5)
275         assert self.db._queue.qsize() == 0
276
277     def test_start_event_listener_success(self):
278         self.db.start_event_listener()
279         assert self.db._run_in_thread
280
281     def test_start_event_listener_subscribe_first(self):
282         self.db._listen_thread.start = Mock()
283         mock_cb = Mock()
284         self.db._channel_cbs = {'abs': mock_cb}
285         self.db.subscribe_channel(self.ns, Mock(), self.channels)
286         self.db.start_event_listener()
287         self.db._listen_thread.start.assert_called_once()
288
289     def test_start_event_listener_fail(self):
290         self.db._listen_thread.is_alive = Mock()
291         self.db._listen_thread.is_alive.return_value = True
292         with pytest.raises(Exception):
293             self.db.start_event_listener()
294
295     def test_handle_events_success(self):
296         self.db._queue = Mock()
297         self.db._queue.get.return_value = ('abs', 'cbn')
298         mock_cb = Mock()
299         self.db._channel_cbs = {'abs': mock_cb}
300         assert self.db.handle_events() == ('abs', 'cbn')
301         mock_cb.assert_called_once_with('abs', 'cbn')
302
303     def test_handle_events_success_no_notification(self):
304         self.db._queue = Mock()
305         self.db._queue.get.side_effect = queue.Empty
306         assert self.db.handle_events() is None
307
308     def test_handle_events_fail_already_started(self):
309         self.db._listen_thread = Mock()
310         self.db._listen_thread.is_alive.return_value = True
311         with pytest.raises(Exception):
312             self.db.handle_events()
313
314     def test_handle_events_fail_already_set(self):
315         self.db._run_in_thread = True
316         with pytest.raises(Exception):
317             self.db.handle_events()
318
319 @pytest.fixture()
320 def fake_dict_backend_lock_fixture(request):
321     request.cls.ns = 'some-ns'
322     request.cls.lockname = 'some-lock-name'
323     request.cls.expiration = 10
324     request.cls.retry_interval = 0.1
325     request.cls.retry_timeout = 1
326
327     request.cls.configuration = Mock()
328     mock_conf_params = _Configuration.Params(db_host=None,
329                                              db_port=None,
330                                              db_sentinel_port=None,
331                                              db_sentinel_master_name=None,
332                                              db_type=DbBackendType.FAKE_DICT)
333     request.cls.configuration.get_params.return_value = mock_conf_params
334     request.cls.lock = ricsdl.backend.get_backend_lock_instance(request.cls.configuration,
335                                                                 request.cls.ns,
336                                                                 request.cls.lockname,
337                                                                 request.cls.expiration,
338                                                                 Mock())
339
340
341 @pytest.mark.usefixtures('fake_dict_backend_lock_fixture')
342 class TestFakeDictBackendLock:
343     def test_acquire_function_success(self):
344         ret = self.lock.acquire(self.retry_interval, self.retry_timeout)
345         assert ret is True
346
347     def test_acquire_function_returns_false_if_lock_is_not_acquired(self):
348         self.lock.acquire(self.retry_interval, self.retry_timeout)
349         ret = self.lock.acquire(self.retry_interval, self.retry_timeout)
350         assert ret is False
351
352     def test_release_function_success(self):
353         self.lock.acquire(self.retry_interval, self.retry_timeout)
354         ret = self.lock.acquire(self.retry_interval, self.retry_timeout)
355         assert ret is False
356         self.lock.release()
357         ret = self.lock.acquire(self.retry_interval, self.retry_timeout)
358         assert ret is True
359
360     def test_get_validity_time_function_success(self):
361         ret = self.lock.get_validity_time()
362         assert ret == self.expiration
363
364     def test_fake_dict_backend_lock_object_string_representation(self):
365         expected_lock_info = {'lock DB type': 'FAKE DB',
366                               'lock namespace': 'some-ns',
367                               'lock name': 'some-lock-name',
368                               'lock status': 'unlocked'}
369         assert str(self.lock) == str(expected_lock_info)