2 # -*- coding: utf-8 -*-
3 # SPDX-License-Identifier: Apache-2.0
4 """Oran Dmaap module."""
6 from oransdk.configuration import settings
7 from onapsdk.dmaap.dmaap import Dmaap
9 class OranDmaap(Dmaap):
10 """Dmaap library provides functions for getting events from Dmaap."""
12 base_url = settings.DMAAP_URL
13 get_all_topics_url = f"{base_url}/topics/listAll"
14 header = {"accept": "application/json", "Content-Type": "application/json"}
20 Create topic in Dmaap.
23 topic: the topic to create, in json format
24 basic_auth: (Dict[str, str]) for example:{ 'username': 'bob', 'password': 'secret' }
27 url = f"{cls.base_url}/topics/create"
28 cls.send_message('POST',
35 def create_service(cls,
36 service_data) -> None:
38 Create Service to policy agent via Dmaap.
41 service_data: the service data in binary format
44 OranDmaap._send_event("A1-POLICY-AGENT-READ", service_data, "Create Service via Dmaap")
47 def send_link_failure_event(cls,
50 Send link failure event.
53 event: the event to sent, in binary format
56 OranDmaap._send_event("unauthenticated.SEC_FAULT_OUTPUT", event, "Send link failure event")
59 def get_message_from_topic(cls, topic, timeout, dmaap_group, dmaap_user) -> str:
61 Get payload from any topic.
67 url = f"{cls.base_url}/events/{topic}/{dmaap_group}/{dmaap_user}?timeout={timeout}"
68 return cls.send_message('GET',
69 'Get payload of specific topic', url)
72 def get_result(cls) -> str:
74 Get result from previous request.
80 topic = "A1-POLICY-AGENT-WRITE"
81 url = f"{cls.base_url}/events/{topic}/users/policy-agent?timeout=15000&limit=100"
82 result = cls.send_message('GET',
83 'Get result from previous request',
92 url = f"{cls.base_url}/events/{topic}/"
93 cls.send_message('POST',