X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=ricsdl-package%2Ftests%2Ftest_syncstorage.py;fp=ricsdl-package%2Ftests%2Ftest_syncstorage.py;h=839d159eee5f51f37b3af98047cd5e4dbc6f2164;hb=db7753971931247abf7fed67921074e518ec1f6d;hp=332a2fa6b254f580bcba681fbf4e15602ec48144;hpb=8bc881b5681e485add12dbffd5c7ea3f37a04d4f;p=ric-plt%2Fsdlpy.git diff --git a/ricsdl-package/tests/test_syncstorage.py b/ricsdl-package/tests/test_syncstorage.py index 332a2fa..839d159 100755 --- a/ricsdl-package/tests/test_syncstorage.py +++ b/ricsdl-package/tests/test_syncstorage.py @@ -26,6 +26,7 @@ from ricsdl.syncstorage import SyncLock from ricsdl.syncstorage import func_arg_checker from ricsdl.exceptions import (SdlTypeError, NotConnected) +EVENT_SEPARATOR = "___" @pytest.fixture() def sync_storage_fixture(request): @@ -44,7 +45,8 @@ def sync_storage_fixture(request): request.cls.lock_int_expiration = 10 request.cls.lock_float_expiration = 1.1 request.cls.channels = {'abs', 'cbn'} - request.cls.channels_and_events = {'abs': 'cbn'} + request.cls.channels_and_events = {'ch1': 'ev1', 'ch2': ['ev1', 'ev2', 'ev3']} + request.cls.ill_event = "illegal" + EVENT_SEPARATOR + "ev" with patch('ricsdl.backend.get_backend_instance') as mock_db_backend: storage = SyncStorage() @@ -322,13 +324,25 @@ class TestSyncStorage: def test_set_and_publish_can_raise_exception_for_wrong_argument(self): with pytest.raises(SdlTypeError): - self.storage.set_and_publish(123, self.channels_and_events, {'a': b'v1'}) + self.storage.set_and_publish(123, self.channels_and_events, self.dm) with pytest.raises(SdlTypeError): - self.storage.set_and_publish('ns', self.channels_and_events, [1, 2]) + self.storage.set_and_publish(self.ns, None, self.dm) with pytest.raises(SdlTypeError): - self.storage.set_and_publish('ns', self.channels_and_events, {0xbad: b'v1'}) + self.storage.set_and_publish(self.ns, {0xbad: "ev1"}, self.dm) with pytest.raises(SdlTypeError): - self.storage.set_and_publish('ns', self.channels_and_events, {'a': 0xbad}) + self.storage.set_and_publish(self.ns, {"ch1": 0xbad}, self.dm) + with pytest.raises(SdlTypeError): + self.storage.set_and_publish(self.ns, {"ch1": self.ill_event}, self.dm) + with pytest.raises(SdlTypeError): + self.storage.set_and_publish(self.ns, {"ch1": ["ev1", 0xbad]}, self.dm) + with pytest.raises(SdlTypeError): + self.storage.set_and_publish(self.ns, {"ch1": ["ev1", self.ill_event]}, self.dm) + with pytest.raises(SdlTypeError): + self.storage.set_and_publish(self.ns, self.channels_and_events, [1, 2]) + with pytest.raises(SdlTypeError): + self.storage.set_and_publish(self.ns, self.channels_and_events, {0xbad: b'v1'}) + with pytest.raises(SdlTypeError): + self.storage.set_and_publish(self.ns, self.channels_and_events, {'a': 0xbad}) def test_set_if_and_publish_success(self): self.mock_db_backend.set_if_and_publish.return_value = True @@ -348,13 +362,35 @@ class TestSyncStorage: def test_set_if_and_publish_can_raise_exception_for_wrong_argument(self): with pytest.raises(SdlTypeError): - self.storage.set_if_and_publish(0xbad, self.channels_and_events, 'key', b'v1', b'v2') + self.storage.set_if_and_publish(0xbad, self.channels_and_events, self.key, + self.old_data, self.new_data) + with pytest.raises(SdlTypeError): + self.storage.set_if_and_publish(self.ns, None, self.key, self.old_data, + self.new_data) + with pytest.raises(SdlTypeError): + self.storage.set_if_and_publish(self.ns, {0xbad: "ev1"}, self.key, + self.old_data, self.new_data) + with pytest.raises(SdlTypeError): + self.storage.set_if_and_publish(self.ns, {"ch1": 0xbad}, self.key, + self.old_data, self.new_data) + with pytest.raises(SdlTypeError): + self.storage.set_if_and_publish(self.ns, {"ch1": self.ill_event}, self.key, + self.old_data, self.new_data) + with pytest.raises(SdlTypeError): + self.storage.set_if_and_publish(self.ns, {"ch1": ["ev1", 0xbad]}, self.key, + self.old_data, self.new_data) with pytest.raises(SdlTypeError): - self.storage.set_if_and_publish('ns', self.channels_and_events, 0xbad, b'v1', b'v2') + self.storage.set_if_and_publish(self.ns, {"ch1": ["ev1", self.ill_event]}, self.key, + self.old_data, self.new_data) with pytest.raises(SdlTypeError): - self.storage.set_if_and_publish('ns', self.channels_and_events, 'key', 0xbad, b'v2') + self.storage.set_if_and_publish(self.ns, self.channels_and_events, 0xbad, + self.old_data, self.new_data) with pytest.raises(SdlTypeError): - self.storage.set_if_and_publish('ns', self.channels_and_events, 'key', b'v1', 0xbad) + self.storage.set_if_and_publish(self.ns, self.channels_and_events, self.key, + 0xbad, self.new_data) + with pytest.raises(SdlTypeError): + self.storage.set_if_and_publish(self.ns, self.channels_and_events, self.key, + self.old_data, 0xbad) def test_set_if_not_exists_and_publish_success(self): self.mock_db_backend.set_if_not_exists_and_publish.return_value = True @@ -374,12 +410,32 @@ class TestSyncStorage: def test_set_if_not_exists_and_publish_can_raise_exception_for_wrong_argument(self): with pytest.raises(SdlTypeError): - self.storage.set_if_not_exists_and_publish(0xbad, self.channels_and_events, 'key', - b'v1') + self.storage.set_if_not_exists_and_publish(0xbad, self.channels_and_events, + self.key, self.new_data) + with pytest.raises(SdlTypeError): + self.storage.set_if_not_exists_and_publish(self.ns, None, self.key, + self.new_data) + with pytest.raises(SdlTypeError): + self.storage.set_if_not_exists_and_publish(self.ns, {0xbad: "ev1"}, + self.key, self.new_data) + with pytest.raises(SdlTypeError): + self.storage.set_if_not_exists_and_publish(self.ns, {"ch1": 0xbad}, + self.key, self.new_data) + with pytest.raises(SdlTypeError): + self.storage.set_if_not_exists_and_publish(self.ns, {"ch1": self.ill_event}, + self.key, self.new_data) with pytest.raises(SdlTypeError): - self.storage.set_if_not_exists_and_publish('ns', self.channels_and_events, 0xbad, b'v1') + self.storage.set_if_not_exists_and_publish(self.ns, {"ch1": ["ev1", 0xbad]}, + self.key, self.new_data) with pytest.raises(SdlTypeError): - self.storage.set_if_not_exists_and_publish('ns', self.channels_and_events, 'key', 0xbad) + self.storage.set_if_not_exists_and_publish(self.ns, {"ch1": ["ev1", self.ill_event]}, + self.key, self.new_data) + with pytest.raises(SdlTypeError): + self.storage.set_if_not_exists_and_publish(self.ns, self.channels_and_events, + 0xbad, b'v1') + with pytest.raises(SdlTypeError): + self.storage.set_if_not_exists_and_publish(self.ns, self.channels_and_events, + self.key, 0xbad) def test_remove_and_publish_function_success(self): self.storage.remove_and_publish(self.ns, self.channels_and_events, self.keys) @@ -390,7 +446,19 @@ class TestSyncStorage: with pytest.raises(SdlTypeError): self.storage.remove_and_publish(0xbad, self.channels_and_events, self.keys) with pytest.raises(SdlTypeError): - self.storage.remove(self.ns, self.channels_and_events, 0xbad) + self.storage.remove_and_publish(self.ns, None, self.keys) + with pytest.raises(SdlTypeError): + self.storage.remove_and_publish(self.ns, {0xbad: "ev1"}, self.keys) + with pytest.raises(SdlTypeError): + self.storage.remove_and_publish(self.ns, {"ch1": 0xbad}, self.keys) + with pytest.raises(SdlTypeError): + self.storage.remove_and_publish(self.ns, {"ch1": self.ill_event}, self.keys) + with pytest.raises(SdlTypeError): + self.storage.remove_and_publish(self.ns, {"ch1": ["ev1", 0xbad]}, self.keys) + with pytest.raises(SdlTypeError): + self.storage.remove_and_publish(self.ns, {"ch1": ["ev1", self.ill_event]}, self.keys) + with pytest.raises(SdlTypeError): + self.storage.remove_and_publish(self.ns, self.channels_and_events, 0xbad) def test_remove_if_and_publish_success(self): self.mock_db_backend.remove_if_and_publish.return_value = True @@ -412,6 +480,23 @@ class TestSyncStorage: with pytest.raises(SdlTypeError): self.storage.remove_if_and_publish(0xbad, self.channels_and_events, self.keys, self.old_data) + with pytest.raises(SdlTypeError): + self.storage.remove_if_and_publish(self.ns, None, self.keys, self.old_data) + with pytest.raises(SdlTypeError): + self.storage.remove_if_and_publish(self.ns, {0xbad: "ev1"}, self.keys, + self.old_data) + with pytest.raises(SdlTypeError): + self.storage.remove_if_and_publish(self.ns, {"ch1": 0xbad}, self.keys, + self.old_data) + with pytest.raises(SdlTypeError): + self.storage.remove_if_and_publish(self.ns, {"ch1": self.ill_event}, self.keys, + self.old_data) + with pytest.raises(SdlTypeError): + self.storage.remove_if_and_publish(self.ns, {"ch1": ["ev1", 0xbad]}, self.keys, + self.old_data) + with pytest.raises(SdlTypeError): + self.storage.remove_if_and_publish(self.ns, {"ch1": ["ev1", self.ill_event]}, self.keys, + self.old_data) with pytest.raises(SdlTypeError): self.storage.remove_if_and_publish(self.ns, self.channels_and_events, 0xbad, self.old_data) @@ -426,6 +511,18 @@ class TestSyncStorage: def test_remove_all_and_publish_can_raise_exception_for_wrong_argument(self): with pytest.raises(SdlTypeError): self.storage.remove_all_and_publish(0xbad, self.channels_and_events) + with pytest.raises(SdlTypeError): + self.storage.remove_all_and_publish(self.ns, None) + with pytest.raises(SdlTypeError): + self.storage.remove_all_and_publish(self.ns, {0xbad: "ev1"}) + with pytest.raises(SdlTypeError): + self.storage.remove_all_and_publish(self.ns, {"ch1": 0xbad}) + with pytest.raises(SdlTypeError): + self.storage.remove_all_and_publish(self.ns, {"ch1": self.ill_event}) + with pytest.raises(SdlTypeError): + self.storage.remove_all_and_publish(self.ns, {"ch1": ["ev1", 0xbad]}) + with pytest.raises(SdlTypeError): + self.storage.remove_all_and_publish(self.ns, {"ch1": ["ev1", self.ill_event]}) def test_subscribe_function_success(self): def cb(channel, message):