X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=smo-install%2Ftest%2Fpythonsdk%2Fsrc%2Forantests%2Ftest_o1.py;h=aabb8d9ea62ac34ee9030aa46d380ddeedffe516;hb=a8414e7e44dea72da6f4f235e14f7fef59b4e266;hp=8506ec46dc4225b5a6e6691a1e5d64b727b9ec69;hpb=6c7e3c3c5e51182a891a2a21676f1cc2646ba484;p=it%2Fdep.git diff --git a/smo-install/test/pythonsdk/src/orantests/test_o1.py b/smo-install/test/pythonsdk/src/orantests/test_o1.py index 8506ec46..aabb8d9e 100644 --- a/smo-install/test/pythonsdk/src/orantests/test_o1.py +++ b/smo-install/test/pythonsdk/src/orantests/test_o1.py @@ -46,6 +46,10 @@ network_simulators = NetworkSimulators("./resources") dmaap = OranDmaap() test_session_timestamp = datetime.datetime.now() +TOPIC_PNFREG = '{"topicName": "unauthenticated.VES_PNFREG_OUTPUT"}' + +TOPIC_FAULT = '{"topicName": "unauthenticated.SEC_FAULT_OUTPUT"}' + @pytest.fixture(scope="module", autouse=True) def setup_simulators(): """Setup the simulators before the executing the tests.""" @@ -54,6 +58,8 @@ def setup_simulators(): # Do a first get to register the o1test/o1test user in DMAAP # all registration messages will then be stored for the registration tests. # If it exists already it clears all cached events. + dmaap.create_topic(TOPIC_PNFREG) + dmaap.create_topic(TOPIC_FAULT) wait(lambda: (dmaap.get_message_from_topic("unauthenticated.VES_PNFREG_OUTPUT", 5000, settings.DMAAP_GROUP, settings.DMAAP_USER).json() == []), sleep_seconds=10, timeout_seconds=60, waiting_for="DMaap topic unauthenticated.VES_PNFREG_OUTPUT to be empty") wait(lambda: (dmaap.get_message_from_topic("unauthenticated.SEC_FAULT_OUTPUT", 5000, settings.DMAAP_GROUP, settings.DMAAP_USER).json() == []), sleep_seconds=10, timeout_seconds=60, waiting_for="DMaap topic unauthenticated.SEC_FAULT_OUTPUT to be empty") network_simulators.start_network_simulators()