2 # -*- coding: utf-8 -*-
3 # SPDX-License-Identifier: Apache-2.0
4 """Oran Dmaap module."""
5 from typing import Dict
6 from onapsdk.dmaap.dmaap import Dmaap
7 from onapsdk.dmaap.dmaap_service import DmaapService
9 class OranDmaap(Dmaap):
10 """Dmaap library provides functions for getting events from Dmaap."""
12 get_all_topics_url = f"{DmaapService._url}/topics/listAll"
17 basic_auth: Dict[str, str]) -> dict:
19 Create topic in Dmaap.
22 topic: the topic to create, in json format
23 basic_auth: (Dict[str, str]) for example:{ 'username': 'bob', 'password': 'secret' }
26 create_events_url = f"{DmaapService._url}/topics/create"
27 instance_details = cls.send_message('POST',
33 def get_all_topics(cls,
34 basic_auth: Dict[str, str]) -> dict:
36 Get all topics stored in Dmaap.
39 basic_auth: (Dict[str, str]) for example:{ 'username': 'bob', 'password': 'secret' }
42 (dict) Topics from Dmaap
45 return super().get_all_topics(basic_auth)