1 # Copyright (c) 2019 AT&T Intellectual Property.
2 # Copyright (c) 2018-2019 Nokia.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 # This source code is part of the near-RT RIC (RAN Intelligent Controller)
18 # platform project (RICP).
23 from unittest.mock import Mock
26 from ricsdl.configuration import _Configuration
27 from ricsdl.configuration import DbBackendType
31 def fake_dict_backend_fixture(request):
32 request.cls.ns = 'some-ns'
33 request.cls.dm = {'abc': b'1', 'bcd': b'2'}
34 request.cls.new_dm = {'abc': b'3', 'bcd': b'2'}
35 request.cls.dm2 = {'cdf': b'4'}
36 request.cls.remove_dm = {'bcd': b'2'}
37 request.cls.key = 'abc'
38 request.cls.keys = ['abc', 'bcd']
39 request.cls.key2 = ['cdf']
40 request.cls.old_data = b'1'
41 request.cls.new_data = b'3'
42 request.cls.keypattern = r'*bc*'
43 request.cls.group = 'some-group'
44 request.cls.groupmember = b'm1'
45 request.cls.groupmembers = set([b'm1', b'm2'])
46 request.cls.new_groupmembers = set(b'm3')
47 request.cls.all_groupmembers = request.cls.groupmembers | request.cls.new_groupmembers
48 request.cls.channels = ['abs', 'gma']
49 request.cls.channels_and_events = {'abs': ['cbn']}
51 request.cls.configuration = Mock()
52 mock_conf_params = _Configuration.Params(db_host=None,
54 db_sentinel_port=None,
55 db_sentinel_master_name=None,
56 db_cluster_addr_list=None,
57 db_type=DbBackendType.FAKE_DICT)
58 request.cls.configuration.get_params.return_value = mock_conf_params
59 request.cls.db = ricsdl.backend.get_backend_instance(request.cls.configuration)
62 @pytest.mark.usefixtures('fake_dict_backend_fixture')
63 class TestFakeDictBackend:
64 def test_is_connected_function_success(self):
65 ret = self.db.is_connected()
68 def test_set_function_success(self):
69 self.db.set(self.ns, self.dm)
70 self.db.set(self.ns, self.dm2)
71 ret = self.db.get(self.ns, self.keys)
73 ret = self.db.get(self.ns, self.key2)
74 assert ret == self.dm2
76 def test_set_if_function_success(self):
77 self.db.set(self.ns, self.dm)
78 ret = self.db.set_if(self.ns, self.key, self.old_data, self.new_data)
80 ret = self.db.get(self.ns, self.keys)
81 assert ret == self.new_dm
83 def test_set_if_function_returns_false_if_existing_key_value_not_expected(self):
84 self.db.set_if(self.ns, self.key, self.old_data, self.new_data)
85 self.db.set(self.ns, self.new_dm)
86 ret = self.db.set_if(self.ns, self.key, self.old_data, self.new_data)
89 def test_set_if_not_exists_function_success(self):
90 ret = self.db.set_if_not_exists(self.ns, self.key, self.new_data)
92 ret = self.db.get(self.ns, self.keys)
93 assert ret == {self.key: self.new_data}
95 def test_set_if_not_exists_function_returns_false_if_key_already_exists(self):
96 self.db.set(self.ns, self.dm)
97 ret = self.db.set_if_not_exists(self.ns, self.key, self.new_data)
100 def test_find_keys_function_success(self):
101 self.db.set(self.ns, self.dm)
102 ret = self.db.find_keys(self.ns, self.keypattern)
103 assert ret == self.keys
105 def test_find_keys_function_returns_empty_list_when_no_matching_keys_found(self):
106 ret = self.db.find_keys(self.ns, self.keypattern)
109 def test_find_and_get_function_success(self):
110 self.db.set(self.ns, self.dm)
111 ret = self.db.find_and_get(self.ns, self.keypattern)
112 assert ret == self.dm
114 def test_find_and_get_function_returns_empty_dict_when_no_matching_keys_exist(self):
115 ret = self.db.find_and_get(self.ns, self.keypattern)
118 def test_remove_function_success(self):
119 self.db.set(self.ns, self.dm)
120 self.db.remove(self.ns, self.keys)
121 ret = self.db.get(self.ns, self.keys)
124 def test_remove_if_function_success(self):
125 self.db.set(self.ns, self.dm)
126 ret = self.db.remove_if(self.ns, self.key, self.old_data)
128 ret = self.db.get(self.ns, self.keys)
129 assert ret == self.remove_dm
131 def test_remove_if_function_returns_false_if_data_does_not_match(self):
132 ret = self.db.remove_if(self.ns, self.key, self.old_data)
134 self.db.set(self.ns, self.dm)
135 ret = self.db.remove_if(self.ns, self.key, self.new_data)
138 def test_add_member_function_success(self):
139 self.db.add_member(self.ns, self.group, self.groupmembers)
140 ret = self.db.get_members(self.ns, self.group)
141 assert ret == self.groupmembers
143 self.db.add_member(self.ns, self.group, self.new_groupmembers)
144 ret = self.db.get_members(self.ns, self.group)
145 assert ret == self.all_groupmembers
147 def test_remove_member_function_success(self):
148 self.db.remove_member(self.ns, self.group, self.groupmembers)
149 self.db.add_member(self.ns, self.group, self.groupmembers)
150 self.db.remove_member(self.ns, self.group, self.groupmembers)
151 ret = self.db.get_members(self.ns, self.group)
154 def test_remove_group_function_success(self):
155 self.db.remove_group(self.ns, self.group)
156 ret = self.db.get_members(self.ns, self.group)
159 def test_is_member_function_success(self):
160 ret = self.db.is_member(self.ns, self.group, b'not member')
162 self.db.add_member(self.ns, self.group, self.groupmembers)
163 ret = self.db.is_member(self.ns, self.group, self.groupmember)
165 ret = self.db.is_member(self.ns, self.group, b'not member')
168 def test_group_size_function_success(self):
169 ret = self.db.group_size(self.ns, self.group)
171 self.db.add_member(self.ns, self.group, self.groupmembers)
172 ret = self.db.group_size(self.ns, self.group)
173 assert ret == len(self.groupmembers)
175 def test_fake_dict_backend_object_string_representation(self):
176 assert str(self.db) == str({'DB type': 'FAKE DB'})
178 def test_set_and_publish_function_success(self):
179 self.db.set_and_publish(self.ns, self.channels_and_events, self.dm)
180 ret = self.db.get(self.ns, self.keys)
181 assert ret == self.dm
182 assert self.db._queue.qsize() == 1
184 def test_set_if_and_publish_success(self):
185 self.db.set(self.ns, self.dm)
186 ret = self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
189 ret = self.db.get(self.ns, self.keys)
190 assert ret == self.new_dm
191 assert self.db._queue.qsize() == 1
193 def test_set_if_and_publish_returns_false_if_existing_key_value_not_expected(self):
194 self.db.set_if_and_publish(self.ns, self.channels_and_events, self.key, self.old_data,
196 self.db.set(self.ns, self.new_dm)
197 ret = self.db.set_if(self.ns, self.key, self.old_data, self.new_data)
199 assert self.db._queue.qsize() == 0
201 def test_set_if_not_exists_and_publish_success(self):
202 ret = self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
205 ret = self.db.get(self.ns, self.keys)
206 assert ret == {self.key: self.new_data}
207 assert self.db._queue.qsize() == 1
209 def test_set_if_not_exists_and_publish_returns_false_if_key_already_exists(self):
210 self.db.set(self.ns, self.dm)
211 ret = self.db.set_if_not_exists_and_publish(self.ns, self.channels_and_events, self.key,
214 assert self.db._queue.qsize() == 0
216 def test_remove_and_publish_function_success(self):
217 self.db.set(self.ns, self.dm)
218 self.db.remove_and_publish(self.ns, self.channels_and_events, self.keys)
219 ret = self.db.get(self.ns, self.keys)
221 assert self.db._queue.qsize() == 1
223 def test_remove_if_and_publish_success(self):
224 self.db.set(self.ns, self.dm)
225 ret = self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
228 ret = self.db.get(self.ns, self.keys)
229 assert ret == self.remove_dm
230 assert self.db._queue.qsize() == 1
232 def test_remove_if_and_publish_returns_false_if_data_does_not_match(self):
233 ret = self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
236 self.db.set(self.ns, self.dm)
237 ret = self.db.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
240 assert self.db._queue.qsize() == 0
242 def test_remove_all_publish_success(self):
243 self.db.set(self.ns, self.dm)
244 self.db.remove_all_and_publish(self.ns, self.channels_and_events)
245 ret = self.db.get(self.ns, self.keys)
247 assert self.db._queue.qsize() == 1
249 def test_subscribe_channel_success(self):
251 self.db.subscribe_channel(self.ns, cb, self.channels)
252 for channel in self.channels:
253 assert self.db._channel_cbs.get(channel, None)
254 assert not self.db._listen_thread.is_alive()
256 def test_subscribe_channel_event_loop_success(self):
258 self.db.start_event_listener()
259 self.db.subscribe_channel(self.ns, cb, self.channels)
260 for channel in self.channels:
261 assert self.db._channel_cbs.get(channel, None)
262 assert self.db._listen_thread.is_alive()
264 def test_unsubscribe_channel_success(self):
265 self.db.subscribe_channel(self.ns, Mock(), self.channels)
266 self.db.unsubscribe_channel(self.ns, [self.channels[0]])
267 assert self.db._channel_cbs.get(self.channels[0], None) is None
268 assert self.db._channel_cbs.get(self.channels[1], None)
270 def test_listen(self):
272 self.db.start_event_listener()
273 self.db.subscribe_channel(self.ns, cb, self.channels)
274 self.db._queue.put(("abs", "cbn"))
276 assert self.db._queue.qsize() == 0
278 def test_start_event_listener_success(self):
279 self.db.start_event_listener()
280 assert self.db._run_in_thread
282 def test_start_event_listener_subscribe_first(self):
283 self.db._listen_thread.start = Mock()
285 self.db._channel_cbs = {'abs': mock_cb}
286 self.db.subscribe_channel(self.ns, Mock(), self.channels)
287 self.db.start_event_listener()
288 self.db._listen_thread.start.assert_called_once()
290 def test_start_event_listener_fail(self):
291 self.db._listen_thread.is_alive = Mock()
292 self.db._listen_thread.is_alive.return_value = True
293 with pytest.raises(Exception):
294 self.db.start_event_listener()
296 def test_handle_events_success(self):
297 self.db._queue = Mock()
298 self.db._queue.get.return_value = ('abs', 'cbn')
300 self.db._channel_cbs = {'abs': mock_cb}
301 assert self.db.handle_events() == ('abs', 'cbn')
302 mock_cb.assert_called_once_with('abs', 'cbn')
304 def test_handle_events_success_no_notification(self):
305 self.db._queue = Mock()
306 self.db._queue.get.side_effect = queue.Empty
307 assert self.db.handle_events() is None
309 def test_handle_events_fail_already_started(self):
310 self.db._listen_thread = Mock()
311 self.db._listen_thread.is_alive.return_value = True
312 with pytest.raises(Exception):
313 self.db.handle_events()
315 def test_handle_events_fail_already_set(self):
316 self.db._run_in_thread = True
317 with pytest.raises(Exception):
318 self.db.handle_events()
321 def fake_dict_backend_lock_fixture(request):
322 request.cls.ns = 'some-ns'
323 request.cls.lockname = 'some-lock-name'
324 request.cls.expiration = 10
325 request.cls.retry_interval = 0.1
326 request.cls.retry_timeout = 1
328 request.cls.configuration = Mock()
329 mock_conf_params = _Configuration.Params(db_host=None,
331 db_sentinel_port=None,
332 db_sentinel_master_name=None,
333 db_cluster_addr_list=None,
334 db_type=DbBackendType.FAKE_DICT)
335 request.cls.configuration.get_params.return_value = mock_conf_params
336 request.cls.lock = ricsdl.backend.get_backend_lock_instance(request.cls.configuration,
338 request.cls.lockname,
339 request.cls.expiration,
343 @pytest.mark.usefixtures('fake_dict_backend_lock_fixture')
344 class TestFakeDictBackendLock:
345 def test_acquire_function_success(self):
346 ret = self.lock.acquire(self.retry_interval, self.retry_timeout)
349 def test_acquire_function_returns_false_if_lock_is_not_acquired(self):
350 self.lock.acquire(self.retry_interval, self.retry_timeout)
351 ret = self.lock.acquire(self.retry_interval, self.retry_timeout)
354 def test_release_function_success(self):
355 self.lock.acquire(self.retry_interval, self.retry_timeout)
356 ret = self.lock.acquire(self.retry_interval, self.retry_timeout)
359 ret = self.lock.acquire(self.retry_interval, self.retry_timeout)
362 def test_get_validity_time_function_success(self):
363 ret = self.lock.get_validity_time()
364 assert ret == self.expiration
366 def test_fake_dict_backend_lock_object_string_representation(self):
367 expected_lock_info = {'lock DB type': 'FAKE DB',
368 'lock namespace': 'some-ns',
369 'lock name': 'some-lock-name',
370 'lock status': 'unlocked'}
371 assert str(self.lock) == str(expected_lock_info)