Add ORAN Python SDK first draft
[it/dep.git] / smo-install / test / pythonsdk / src / oransdk / dmaap / dmaap.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 # SPDX-License-Identifier: Apache-2.0
4 """Oran Dmaap module."""
5
6 from oransdk.configuration import settings
7 from onapsdk.dmaap.dmaap import Dmaap
8
9 class OranDmaap(Dmaap):
10     """Dmaap library provides functions for getting events from Dmaap."""
11
12     base_url = settings.DMAAP_URL
13     get_all_topics_url = f"{base_url}/topics/listAll"
14     header = {"accept": "application/json", "Content-Type": "application/json"}
15
16     @classmethod
17     def create_topic(cls,
18                      topic) -> None:
19         """
20         Create topic in Dmaap.
21
22         Args:
23            topic: the topic to create, in json format
24            basic_auth: (Dict[str, str]) for example:{ 'username': 'bob', 'password': 'secret' }
25
26         """
27         url = f"{cls.base_url}/topics/create"
28         cls.send_message('POST',
29                          'Create Dmaap Topic',
30                          url,
31                          data=topic,
32                          headers=cls.header)
33
34     @classmethod
35     def create_service(cls,
36                        service_data) -> None:
37         """
38         Create Service to policy agent via Dmaap.
39
40         Args:
41            service_data: the service data in binary format
42
43         """
44         OranDmaap._send_event("A1-POLICY-AGENT-READ", service_data, "Create Service via Dmaap")
45
46     @classmethod
47     def send_link_failure_event(cls,
48                                 event) -> None:
49         """
50         Send link failure event.
51
52         Args:
53            event: the event to sent, in binary format
54
55         """
56         OranDmaap._send_event("unauthenticated.SEC_FAULT_OUTPUT", event, "Send link failure event")
57
58     @classmethod
59     def get_result(cls) -> str:
60         """
61         Get result from previous request.
62
63         Returns:
64             the result
65
66         """
67         topic = "A1-POLICY-AGENT-WRITE"
68         url = f"{cls.base_url}/events/{topic}/users/policy-agent?timeout=15000&limit=100"
69         result = cls.send_message('GET',
70                                   'Get result from previous request',
71                                   url)
72         return result
73
74     @classmethod
75     def _send_event(cls,
76                     topic,
77                     event_data,
78                     description) -> None:
79         url = f"{cls.base_url}/events/{topic}/"
80         cls.send_message('POST',
81                          description,
82                          url,
83                          data=event_data,
84                          headers=cls.header)