Merge "Adding a note on helm3 for Near RIC installation procedure"
[it/dep.git] / smo-install / test / pythonsdk / test / test_dmaap.py
1 #!/usr/bin/env python3
2 # SPDX-License-Identifier: Apache-2.0
3 import logging
4 from onapsdk.configuration import settings
5 from oransdk.dmaap.oran_dmaap import OranDmaap
6 from oransdk.a1sim.a1sim import A1sim
7
8 BASIC_AUTH = {}
9 logger = logging.getLogger("")
10 logger.setLevel(logging.DEBUG)
11 fh = logging.StreamHandler()
12 fh_formatter = logging.Formatter('%(asctime)s %(levelname)s %(lineno)d:%(filename)s(%(process)d) - %(message)s')
13 fh.setFormatter(fh_formatter)
14 logger.addHandler(fh)
15
16 dmaap = OranDmaap()
17 logger.info("Get all the topics")
18 topiclist = dmaap.get_all_topics(BASIC_AUTH)
19 logger.info("response is: %s", topiclist)
20
21
22 logger.info("Create new topic")
23 topic = '{  "topicName": "unauthenticated.SEC_FAULT_OUTPUT",  "topicDescription": "test topic",  "partitionCount": 1,  "replicationCnCount": 1,  "transactionEnabled": "false"}'
24 response = dmaap.create_topic(topic, BASIC_AUTH)
25 logger.info("response is: %s", response)
26
27
28 logger.info("Get topics again")
29 topiclist = dmaap.get_all_topics(BASIC_AUTH)
30 logger.info("response is: %s", topiclist)
31
32
33
34 logger.info("Get ric version for ost")
35 a1sim = A1sim()
36 version1 = a1sim.check_version(settings.A1SIM_OST_URL)
37 version2 = a1sim.check_version(settings.A1SIM_STD1_URL)
38 version3 = a1sim.check_version(settings.A1SIM_STD2_URL)