request.cls.lock_name = 'some-lock-name'
request.cls.lock_int_expiration = 10
request.cls.lock_float_expiration = 1.1
+ request.cls.channels = {'abs', 'cbn'}
+ request.cls.channels_and_events = {'abs': 'cbn'}
with patch('ricsdl.backend.get_backend_instance') as mock_db_backend:
storage = SyncStorage()
with pytest.raises(SdlTypeError):
self.storage.group_size(self.ns, 0xbad)
+ def test_set_and_publish_function_success(self):
+ self.storage.set_and_publish(self.ns, self.channels_and_events, self.dm)
+ self.mock_db_backend.set_and_publish.assert_called_once_with(self.ns,
+ self.channels_and_events,
+ self.dm)
+
+ def test_set_and_publish_can_raise_exception_for_wrong_argument(self):
+ with pytest.raises(SdlTypeError):
+ self.storage.set_and_publish(123, self.channels_and_events, {'a': b'v1'})
+ with pytest.raises(SdlTypeError):
+ self.storage.set_and_publish('ns', self.channels_and_events, [1, 2])
+ with pytest.raises(SdlTypeError):
+ self.storage.set_and_publish('ns', self.channels_and_events, {0xbad: b'v1'})
+ with pytest.raises(SdlTypeError):
+ self.storage.set_and_publish('ns', self.channels_and_events, {'a': 0xbad})
+
+ def test_set_if_and_publish_success(self):
+ self.mock_db_backend.set_if_and_publish.return_value = True
+ ret = self.storage.set_if_and_publish(self.ns, self.channels_and_events, self.key,
+ self.old_data, self.new_data)
+ self.mock_db_backend.set_if_and_publish.assert_called_once_with(
+ self.ns, self.channels_and_events, self.key, self.old_data, self.new_data)
+ assert ret is True
+
+ def test_set_if_and_publish_can_return_false_if_same_data_already_exists(self):
+ self.mock_db_backend.set_if_and_publish.return_value = False
+ ret = self.storage.set_if_and_publish(self.ns, self.channels_and_events, self.key,
+ self.old_data, self.new_data)
+ self.mock_db_backend.set_if_and_publish.assert_called_once_with(
+ self.ns, self.channels_and_events, self.key, self.old_data, self.new_data)
+ assert ret is False
+
+ def test_set_if_and_publish_can_raise_exception_for_wrong_argument(self):
+ with pytest.raises(SdlTypeError):
+ self.storage.set_if_and_publish(0xbad, self.channels_and_events, 'key', b'v1', b'v2')
+ with pytest.raises(SdlTypeError):
+ self.storage.set_if_and_publish('ns', self.channels_and_events, 0xbad, b'v1', b'v2')
+ with pytest.raises(SdlTypeError):
+ self.storage.set_if_and_publish('ns', self.channels_and_events, 'key', 0xbad, b'v2')
+ with pytest.raises(SdlTypeError):
+ self.storage.set_if_and_publish('ns', self.channels_and_events, 'key', b'v1', 0xbad)
+
+ def test_set_if_not_exists_and_publish_success(self):
+ self.mock_db_backend.set_if_not_exists_and_publish.return_value = True
+ ret = self.storage.set_if_not_exists_and_publish(self.ns, self.channels_and_events,
+ self.key, self.new_data)
+ self.mock_db_backend.set_if_not_exists_and_publish.assert_called_once_with(
+ self.ns, self.channels_and_events, self.key, self.new_data)
+ assert ret is True
+
+ def test_set_if_not_exists_and_publish_function_can_return_false_if_key_already_exists(self):
+ self.mock_db_backend.set_if_not_exists_and_publish.return_value = False
+ ret = self.storage.set_if_not_exists_and_publish(self.ns, self.channels_and_events,
+ self.key, self.new_data)
+ self.mock_db_backend.set_if_not_exists_and_publish.assert_called_once_with(
+ self.ns, self.channels_and_events, self.key, self.new_data)
+ assert ret is False
+
+ def test_set_if_not_exists_and_publish_can_raise_exception_for_wrong_argument(self):
+ with pytest.raises(SdlTypeError):
+ self.storage.set_if_not_exists_and_publish(0xbad, self.channels_and_events, 'key',
+ b'v1')
+ with pytest.raises(SdlTypeError):
+ self.storage.set_if_not_exists_and_publish('ns', self.channels_and_events, 0xbad, b'v1')
+ with pytest.raises(SdlTypeError):
+ self.storage.set_if_not_exists_and_publish('ns', self.channels_and_events, 'key', 0xbad)
+
+ def test_remove_and_publish_function_success(self):
+ self.storage.remove_and_publish(self.ns, self.channels_and_events, self.keys)
+ self.mock_db_backend.remove_and_publish.assert_called_once_with(
+ self.ns, self.channels_and_events, list(self.keys))
+
+ def test_remove_and_publish_can_raise_exception_for_wrong_argument(self):
+ with pytest.raises(SdlTypeError):
+ self.storage.remove_and_publish(0xbad, self.channels_and_events, self.keys)
+ with pytest.raises(SdlTypeError):
+ self.storage.remove(self.ns, self.channels_and_events, 0xbad)
+
+ def test_remove_if_and_publish_success(self):
+ self.mock_db_backend.remove_if_and_publish.return_value = True
+ ret = self.storage.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
+ self.new_data)
+ self.mock_db_backend.remove_if_and_publish.assert_called_once_with(
+ self.ns, self.channels_and_events, self.key, self.new_data)
+ assert ret is True
+
+ def test_remove_if_remove_and_publish_can_return_false_if_data_does_not_match(self):
+ self.mock_db_backend.remove_if_and_publish.return_value = False
+ ret = self.storage.remove_if_and_publish(self.ns, self.channels_and_events, self.key,
+ self.old_data)
+ self.mock_db_backend.remove_if_and_publish.assert_called_once_with(
+ self.ns, self.channels_and_events, self.key, self.old_data)
+ assert ret is False
+
+ def test_remove_if_remove_and_publish_can_raise_exception_for_wrong_argument(self):
+ with pytest.raises(SdlTypeError):
+ self.storage.remove_if_and_publish(0xbad, self.channels_and_events, self.keys,
+ self.old_data)
+ with pytest.raises(SdlTypeError):
+ self.storage.remove_if_and_publish(self.ns, self.channels_and_events, 0xbad,
+ self.old_data)
+ with pytest.raises(SdlTypeError):
+ self.storage.remove_if_and_publish(self.ns, self.channels_and_events, self.keys, 0xbad)
+
+ def test_remove_all_and_publish_success(self):
+ self.storage.remove_all_and_publish(self.ns, self.channels_and_events)
+ self.mock_db_backend.remove_all_and_publish.assert_called_once_with(
+ self.ns, self.channels_and_events)
+
+ def test_remove_all_and_publish_can_raise_exception_for_wrong_argument(self):
+ with pytest.raises(SdlTypeError):
+ self.storage.remove_all_and_publish(0xbad, self.channels_and_events)
+
+ def test_subscribe_function_success(self):
+ def cb(channel, message):
+ pass
+ self.storage.subscribe_channel(self.ns, cb, self.channels)
+ self.mock_db_backend.subscribe_channel.assert_called_once_with(
+ self.ns, cb, list(self.channels))
+
+ def test_subscribe_can_raise_exception_for_wrong_argument(self):
+ def cb3(channel, message, extra):
+ pass
+ def cb1(channel):
+ pass
+ with pytest.raises(SdlTypeError):
+ self.storage.subscribe_channel(self.ns, cb3, self.channels)
+ with pytest.raises(SdlTypeError):
+ self.storage.subscribe_channel(self.ns, cb1, self.channels)
+
+ def test_unsubscribe_function_success(self):
+ self.storage.unsubscribe_channel(self.ns, self.channels)
+ self.mock_db_backend.unsubscribe_channel.assert_called_once_with(
+ self.ns, list(self.channels))
+
+ def test_start_event_listener_success(self):
+ self.storage.start_event_listener()
+ self.mock_db_backend.start_event_listener.assert_called()
+
+ def test_handle_events_success(self):
+ self.storage.handle_events()
+ self.mock_db_backend.handle_events.assert_called()
+
@patch('ricsdl.syncstorage.SyncLock')
def test_get_lock_resource_function_success_when_expiration_time_is_integer(self, mock_db_lock):
ret = self.storage.get_lock_resource(self.ns, self.lock_name, self.lock_int_expiration)