1 # Copyright (c) 2019 AT&T Intellectual Property.
2 # Copyright (c) 2018-2019 Nokia.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 # This source code is part of the near-RT RIC (RAN Intelligent Controller)
18 # platform project (RICP).
23 from unittest.mock import Mock
26 from ricsdl.configuration import _Configuration
27 from ricsdl.configuration import DbBackendType
31 def fake_dict_backend_fixture(request):
32 request.cls.ns = 'some-ns'
33 request.cls.dm = {'abc': b'1', 'bcd': b'2'}
34 request.cls.new_dm = {'abc': b'3', 'bcd': b'2'}
35 request.cls.dm2 = {'cdf': b'4'}
36 request.cls.remove_dm = {'bcd': b'2'}
37 request.cls.key = 'abc'
38 request.cls.keys = ['abc', 'bcd']
39 request.cls.key2 = ['cdf']
40 request.cls.old_data = b'1'
41 request.cls.new_data = b'3'
42 request.cls.keypattern = r'*bc*'
43 request.cls.group = 'some-group'
44 request.cls.groupmember = b'm1'
45 request.cls.groupmembers = set([b'm1', b'm2'])
46 request.cls.new_groupmembers = set(b'm3')
47 request.cls.all_groupmembers = request.cls.groupmembers | request.cls.new_groupmembers
48 request.cls.channels = ['abs', 'gma']
49 request.cls.channels_and_events = {'abs': ['cbn']}
51 request.cls.configuration = Mock()
52 mock_conf_params = _Configuration.Params(db_host=None,
54 db_sentinel_port=None,
55 db_sentinel_master_name=None,
56 db_type=DbBackendType.FAKE_DICT)
57 request.cls.configuration.get_params.return_value = mock_conf_params
58 request.cls.db = ricsdl.backend.get_backend_instance(request.cls.configuration)
61 @pytest.mark.usefixtures('fake_dict_backend_fixture')
62 class TestFakeDictBackend:
63 def test_is_connected_function_success(self):
64 ret = self.db.is_connected()
67 def test_set_function_success(self):
68 self.db.set(self.ns, self.dm)
69 self.db.set(self.ns, self.dm2)
70 ret = self.db.get(self.ns, self.keys)
72 ret = self.db.get(self.ns, self.key2)
73 assert ret == self.dm2
75 def test_set_if_function_success(self):
76 self.db.set(self.ns, self.dm)
77 ret = self.db.set_if(self.ns, self.key, self.old_data, self.new_data)
79 ret = self.db.get(self.ns, self.keys)
80 assert ret == self.new_dm
82 def test_set_if_function_returns_false_if_existing_key_value_not_expected(self):
83 self.db.set_if(self.ns, self.key, self.old_data, self.new_data)
84 self.db.set(self.ns, self.new_dm)
85 ret = self.db.set_if(self.ns, self.key, self.old_data, self.new_data)
88 def test_set_if_not_exists_function_success(self):
89 ret = self.db.set_if_not_exists(self.ns, self.key, self.new_data)
91 ret = self.db.get(self.ns, self.keys)
92 assert ret == {self.key: self.new_data}
94 def test_set_if_not_exists_function_returns_false_if_key_already_exists(self):
95 self.db.set(self.ns, self.dm)
96 ret = self.db.set_if_not_exists(self.ns, self.key, self.new_data)
99 def test_find_keys_function_success(self):
100 self.db.set(self.ns, self.dm)
101 ret = self.db.find_keys(self.ns, self.keypattern)
102 assert ret == self.keys
104 def test_find_keys_function_returns_empty_list_when_no_matching_keys_found(self):
105 ret = self.db.find_keys(self.ns, self.keypattern)
108 def test_find_and_get_function_success(self):
109 self.db.set(self.ns, self.dm)
110 ret = self.db.find_and_get(self.ns, self.keypattern)
111 assert ret == self.dm
113 def test_find_and_get_function_returns_empty_dict_when_no_matching_keys_exist(self):
114 ret = self.db.find_and_get(self.ns, self.keypattern)
117 def test_remove_function_success(self):
118 self.db.set(self.ns, self.dm)
119 self.db.remove(self.ns, self.keys)
120 ret = self.db.get(self.ns, self.keys)
123 def test_remove_if_function_success(self):
124 self.db.set(self.ns, self.dm)
125 ret = self.db.remove_if(self.ns, self.key, self.old_data)
127 ret = self.db.get(self.ns, self.keys)
128 assert ret == self.remove_dm
130 def test_remove_if_function_returns_false_if_data_does_not_match(self):
131 ret = self.db.remove_if(self.ns, self.key, self.old_data)
133 self.db.set(self.ns, self.dm)
134 ret = self.db.remove_if(self.ns, self.key, self.new_data)
137 def test_add_member_function_success(self):
138 self.db.add_member(self.ns, self.group, self.groupmembers)
139 ret = self.db.get_members(self.ns, self.group)
140 assert ret == self.groupmembers
142 self.db.add_member(self.ns, self.group, self.new_groupmembers)
143 ret = self.db.get_members(self.ns, self.group)
144 assert ret == self.all_groupmembers
146 def test_remove_member_function_success(self):
147 self.db.remove_member(self.ns, self.group, self.groupmembers)
148 self.db.add_member(self.ns, self.group, self.groupmembers)
149 self.db.remove_member(self.ns, self.group, self.groupmembers)
150 ret = self.db.get_members(self.ns, self.group)
153 def test_remove_group_function_success(self):
154 self.db.remove_group(self.ns, self.group)
155 ret = self.db.get_members(self.ns, self.group)
158 def test_is_member_function_success(self):
159 ret = self.db.is_member(self.ns, self.group, b'not member')
161 self.db.add_member(self.ns, self.group, self.groupmembers)
162 ret = self.db.is_member(self.ns, self.group, self.groupmember)
164 ret = self.db.is_member(self.ns, self.group, b'not member')
167 def test_group_size_function_success(self):
168 ret = self.db.group_size(self.ns, self.group)
170 self.db.add_member(self.ns, self.group, self.groupmembers)
171 ret = self.db.group_size(self.ns, self.group)
172 assert ret == len(self.groupmembers)
174 def test_fake_dict_backend_object_string_representation(self):
175 assert str(self.db) == str({'DB type': 'FAKE DB'})
177 def test_set_and_publish_function_success(self):
178 self.db.set_and_publish(self.ns, self.channels_and_events, self.dm)
179 ret = self.db.get(self.ns, self.keys)
180 assert ret == self.dm
181 assert self.db._queue.qsize() == 1
183 def test_set_if_and_publish_success(self):
184 self.db.set(self.ns, self.dm)
185 ret = self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
188 ret = self.db.get(self.ns, self.keys)
189 assert ret == self.new_dm
190 assert self.db._queue.qsize() == 1
192 def test_set_if_and_publish_returns_false_if_existing_key_value_not_expected(self):
193 self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
195 self.db.set(self.ns, self.new_dm)
196 ret = self.db.set_if(self.ns, self.key, self.old_data, self.new_data)
198 assert self.db._queue.qsize() == 0
200 def test_set_if_not_exists_and_publish_success(self):
201 ret = self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
204 ret = self.db.get(self.ns, self.keys)
205 assert ret == {self.key: self.new_data}
206 assert self.db._queue.qsize() == 1
208 def test_set_if_not_exists_and_publish_returns_false_if_key_already_exists(self):
209 self.db.set(self.ns, self.dm)
210 ret = self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
213 assert self.db._queue.qsize() == 0
215 def test_remove_and_publish_function_success(self):
216 self.db.set(self.ns, self.dm)
217 self.db.remove_and_publish(self.ns, self.channels_and_events, self.keys)
218 ret = self.db.get(self.ns, self.keys)
220 assert self.db._queue.qsize() == 1
222 def test_remove_if_and_publish_success(self):
223 self.db.set(self.ns, self.dm)
224 ret = self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
227 ret = self.db.get(self.ns, self.keys)
228 assert ret == self.remove_dm
229 assert self.db._queue.qsize() == 1
231 def test_remove_if_and_publish_returns_false_if_data_does_not_match(self):
232 ret = self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
235 self.db.set(self.ns, self.dm)
236 ret = self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
239 assert self.db._queue.qsize() == 0
241 def test_remove_all_publish_success(self):
242 self.db.set(self.ns, self.dm)
243 self.db.remove_all_and_publish(self.ns, self.channels_and_events)
244 ret = self.db.get(self.ns, self.keys)
246 assert self.db._queue.qsize() == 1
248 def test_subscribe_channel_success(self):
250 self.db.subscribe_channel(self.ns, cb, self.channels)
251 for channel in self.channels:
252 assert self.db._channel_cbs.get(channel, None)
253 assert not self.db._listen_thread.is_alive()
255 def test_subscribe_channel_event_loop_success(self):
257 self.db.start_event_listener()
258 self.db.subscribe_channel(self.ns, cb, self.channels)
259 for channel in self.channels:
260 assert self.db._channel_cbs.get(channel, None)
261 assert self.db._listen_thread.is_alive()
263 def test_unsubscribe_channel_success(self):
264 self.db.subscribe_channel(self.ns, Mock(), self.channels)
265 self.db.unsubscribe_channel(self.ns, [self.channels[0]])
266 assert self.db._channel_cbs.get(self.channels[0], None) is None
267 assert self.db._channel_cbs.get(self.channels[1], None)
269 def test_listen(self):
271 self.db.start_event_listener()
272 self.db.subscribe_channel(self.ns, cb, self.channels)
273 self.db._queue.put(("abs", "cbn"))
275 assert self.db._queue.qsize() == 0
277 def test_start_event_listener_success(self):
278 self.db.start_event_listener()
279 assert self.db._run_in_thread
281 def test_start_event_listener_subscribe_first(self):
282 self.db._listen_thread.start = Mock()
284 self.db._channel_cbs = {'abs': mock_cb}
285 self.db.subscribe_channel(self.ns, Mock(), self.channels)
286 self.db.start_event_listener()
287 self.db._listen_thread.start.assert_called_once()
289 def test_start_event_listener_fail(self):
290 self.db._listen_thread.is_alive = Mock()
291 self.db._listen_thread.is_alive.return_value = True
292 with pytest.raises(Exception):
293 self.db.start_event_listener()
295 def test_handle_events_success(self):
296 self.db._queue = Mock()
297 self.db._queue.get.return_value = ('abs', 'cbn')
299 self.db._channel_cbs = {'abs': mock_cb}
300 assert self.db.handle_events() == ('abs', 'cbn')
301 mock_cb.assert_called_once_with('abs', 'cbn')
303 def test_handle_events_success_no_notification(self):
304 self.db._queue = Mock()
305 self.db._queue.get.side_effect = queue.Empty
306 assert self.db.handle_events() is None
308 def test_handle_events_fail_already_started(self):
309 self.db._listen_thread = Mock()
310 self.db._listen_thread.is_alive.return_value = True
311 with pytest.raises(Exception):
312 self.db.handle_events()
314 def test_handle_events_fail_already_set(self):
315 self.db._run_in_thread = True
316 with pytest.raises(Exception):
317 self.db.handle_events()
320 def fake_dict_backend_lock_fixture(request):
321 request.cls.ns = 'some-ns'
322 request.cls.lockname = 'some-lock-name'
323 request.cls.expiration = 10
324 request.cls.retry_interval = 0.1
325 request.cls.retry_timeout = 1
327 request.cls.configuration = Mock()
328 mock_conf_params = _Configuration.Params(db_host=None,
330 db_sentinel_port=None,
331 db_sentinel_master_name=None,
332 db_type=DbBackendType.FAKE_DICT)
333 request.cls.configuration.get_params.return_value = mock_conf_params
334 request.cls.lock = ricsdl.backend.get_backend_lock_instance(request.cls.configuration,
336 request.cls.lockname,
337 request.cls.expiration,
341 @pytest.mark.usefixtures('fake_dict_backend_lock_fixture')
342 class TestFakeDictBackendLock:
343 def test_acquire_function_success(self):
344 ret = self.lock.acquire(self.retry_interval, self.retry_timeout)
347 def test_acquire_function_returns_false_if_lock_is_not_acquired(self):
348 self.lock.acquire(self.retry_interval, self.retry_timeout)
349 ret = self.lock.acquire(self.retry_interval, self.retry_timeout)
352 def test_release_function_success(self):
353 self.lock.acquire(self.retry_interval, self.retry_timeout)
354 ret = self.lock.acquire(self.retry_interval, self.retry_timeout)
357 ret = self.lock.acquire(self.retry_interval, self.retry_timeout)
360 def test_get_validity_time_function_success(self):
361 ret = self.lock.get_validity_time()
362 assert ret == self.expiration
364 def test_fake_dict_backend_lock_object_string_representation(self):
365 expected_lock_info = {'lock DB type': 'FAKE DB',
366 'lock namespace': 'some-ns',
367 'lock name': 'some-lock-name',
368 'lock status': 'unlocked'}
369 assert str(self.lock) == str(expected_lock_info)