a3079090805c88cb95e7557afc4b7df941329d98
[it/dep.git] / smo-install / test / pythonsdk / src / oransdk / dmaap / oran_dmaap.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 # SPDX-License-Identifier: Apache-2.0
4 """Oran Dmaap module."""
5 from typing import Dict
6 from onapsdk.dmaap.dmaap import Dmaap
7 from onapsdk.dmaap.dmaap_service import DmaapService
8
9 class OranDmaap(Dmaap):
10     """Dmaap library provides functions for getting events from Dmaap."""
11
12     get_all_topics_url = f"{DmaapService._url}/topics/listAll"
13
14     @classmethod
15     def create_topic(cls,
16                     topic,
17                     basic_auth: Dict[str, str]) -> dict:
18         """
19         Create topic in Dmaap.
20
21         Args:
22            topic: the topic to create, in json format
23            basic_auth: (Dict[str, str]) for example:{ 'username': 'bob', 'password': 'secret' }
24
25         """
26         create_events_url = f"{DmaapService._url}/topics/create"
27         instance_details = cls.send_message('POST',
28                                             'Create Dmaap Topic',
29                                             create_events_url,
30                                             data=topic)
31
32     @classmethod
33     def get_all_topics(cls,
34                        basic_auth: Dict[str, str]) -> dict:
35         """
36         Get all topics stored in Dmaap.
37
38         Args:
39            basic_auth: (Dict[str, str]) for example:{ 'username': 'bob', 'password': 'secret' }
40
41         Returns:
42             (dict) Topics from Dmaap
43
44         """
45         return super().get_all_topics(basic_auth)