3 # ============LICENSE_START=======================================================
4 # ORAN SMO PACKAGE - PYTHONSDK TESTS
5 # ================================================================================
6 # Copyright (C) 2021-2022 AT&T Intellectual Property. All rights
8 # ================================================================================
9 # Licensed under the Apache License, Version 2.0 (the "License");
10 # you may not use this file except in compliance with the License.
11 # You may obtain a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 # ============LICENSE_END============================================
21 # ===================================================================
24 """Oran Dmaap module."""
26 from onapsdk.dmaap.dmaap import Dmaap
27 from oransdk.configuration import settings
29 class OranDmaap(Dmaap):
30 """Dmaap library provides functions for getting events from Dmaap."""
32 base_url = settings.DMAAP_URL
33 get_all_topics_url = f"{base_url}/topics/listAll"
34 header = {"accept": "application/json", "Content-Type": "application/json"}
40 Create topic in Dmaap.
43 topic: the topic to create, in json format
46 url = f"{cls.base_url}/topics/create"
47 cls.send_message('POST',
54 def create_service(cls,
55 service_data) -> None:
57 Create Service to policy agent via Dmaap.
60 service_data: the service data in binary format
63 OranDmaap._send_event("A1-POLICY-AGENT-READ", service_data, "Create Service via Dmaap")
66 def send_link_failure_event(cls,
69 Send link failure event.
72 event: the event to sent, in binary format
75 OranDmaap._send_event("unauthenticated.SEC_FAULT_OUTPUT", event, "Send link failure event")
78 def get_message_from_topic(cls, topic, timeout, dmaap_group, dmaap_user) -> str:
80 Get payload from any topic.
86 url = f"{cls.base_url}/events/{topic}/{dmaap_group}/{dmaap_user}?timeout={timeout}"
87 return cls.send_message('GET', 'Get payload of specific topic', url)
90 def get_result(cls) -> str:
92 Get result from previous request.
98 topic = "A1-POLICY-AGENT-WRITE"
99 url = f"{cls.base_url}/events/{topic}/users/policy-agent?timeout=15000&limit=100"
100 result = cls.send_message('GET', 'Get result from previous request', url)
107 description) -> None:
108 url = f"{cls.base_url}/events/{topic}/"
109 cls.send_message('POST',