2 # -*- coding: utf-8 -*-
3 # SPDX-License-Identifier: Apache-2.0
4 """Oran Dmaap module."""
6 from oransdk.configuration import settings
7 from onapsdk.dmaap.dmaap import Dmaap
9 class OranDmaap(Dmaap):
10 """Dmaap library provides functions for getting events from Dmaap."""
12 base_url = settings.DMAAP_URL
13 get_all_topics_url = f"{base_url}/topics/listAll"
14 header = {"accept": "application/json", "Content-Type": "application/json"}
20 Create topic in Dmaap.
23 topic: the topic to create, in json format
24 basic_auth: (Dict[str, str]) for example:{ 'username': 'bob', 'password': 'secret' }
27 url = f"{cls.base_url}/topics/create"
28 cls.send_message('POST',
35 def create_service(cls,
36 service_data) -> None:
38 Create Service to policy agent via Dmaap.
41 service_data: the service data in binary format
44 OranDmaap._send_event("A1-POLICY-AGENT-READ", service_data, "Create Service via Dmaap")
47 def send_link_failure_event(cls,
50 Send link failure event.
53 event: the event to sent, in binary format
56 OranDmaap._send_event("unauthenticated.SEC_FAULT_OUTPUT", event, "Send link failure event")
59 def get_result(cls) -> str:
61 Get result from previous request.
67 topic = "A1-POLICY-AGENT-WRITE"
68 url = f"{cls.base_url}/events/{topic}/users/policy-agent?timeout=15000&limit=100"
69 result = cls.send_message('GET',
70 'Get result from previous request',
79 url = f"{cls.base_url}/events/{topic}/"
80 cls.send_message('POST',