Add new tests to validate O1
[it/dep.git] / smo-install / test / pythonsdk / src / oransdk / dmaap / dmaap.py
1 #!/usr/bin/env python3
2 ###
3 # ============LICENSE_START=======================================================
4 # ORAN SMO PACKAGE - PYTHONSDK TESTS
5 # ================================================================================
6 # Copyright (C) 2021-2022 AT&T Intellectual Property. All rights
7 #                             reserved.
8 # ================================================================================
9 # Licensed under the Apache License, Version 2.0 (the "License");
10 # you may not use this file except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 # http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 # ============LICENSE_END============================================
21 # ===================================================================
22 #
23 ###
24 """Oran Dmaap module."""
25
26 from onapsdk.dmaap.dmaap import Dmaap
27 from oransdk.configuration import settings
28
29 class OranDmaap(Dmaap):
30     """Dmaap library provides functions for getting events from Dmaap."""
31
32     base_url = settings.DMAAP_URL
33     get_all_topics_url = f"{base_url}/topics/listAll"
34     header = {"accept": "application/json", "Content-Type": "application/json"}
35
36     @classmethod
37     def create_topic(cls,
38                      topic) -> None:
39         """
40         Create topic in Dmaap.
41
42         Args:
43            topic: the topic to create, in json format
44
45         """
46         url = f"{cls.base_url}/topics/create"
47         cls.send_message('POST',
48                          'Create Dmaap Topic',
49                          url,
50                          data=topic,
51                          headers=cls.header)
52
53     @classmethod
54     def create_service(cls,
55                        service_data) -> None:
56         """
57         Create Service to policy agent via Dmaap.
58
59         Args:
60            service_data: the service data in binary format
61
62         """
63         OranDmaap._send_event("A1-POLICY-AGENT-READ", service_data, "Create Service via Dmaap")
64
65     @classmethod
66     def send_link_failure_event(cls,
67                                 event) -> None:
68         """
69         Send link failure event.
70
71         Args:
72            event: the event to sent, in binary format
73
74         """
75         OranDmaap._send_event("unauthenticated.SEC_FAULT_OUTPUT", event, "Send link failure event")
76
77     @classmethod
78     def get_message_from_topic(cls, topic, timeout, dmaap_group, dmaap_user) -> str:
79         """
80         Get payload from any topic.
81
82         Returns:
83             the result
84
85         """
86         url = f"{cls.base_url}/events/{topic}/{dmaap_group}/{dmaap_user}?timeout={timeout}"
87         return cls.send_message('GET', 'Get payload of specific topic', url)
88
89     @classmethod
90     def get_result(cls) -> str:
91         """
92         Get result from previous request.
93
94         Returns:
95             the result
96
97         """
98         topic = "A1-POLICY-AGENT-WRITE"
99         url = f"{cls.base_url}/events/{topic}/users/policy-agent?timeout=15000&limit=100"
100         result = cls.send_message('GET', 'Get result from previous request', url)
101         return result
102
103     @classmethod
104     def _send_event(cls,
105                     topic,
106                     event_data,
107                     description) -> None:
108         url = f"{cls.base_url}/events/{topic}/"
109         cls.send_message('POST',
110                          description,
111                          url,
112                          data=event_data,
113                          headers=cls.header)