Restore Apex tests
[it/dep.git] / smo-install / test / pythonsdk / src / oransdk / sdnc / sdnc.py
1 #!/usr/bin/env python3
2 ###
3 # ============LICENSE_START=======================================================
4 # ORAN SMO PACKAGE - PYTHONSDK TESTS
5 # ================================================================================
6 # Copyright (C) 2021-2022 AT&T Intellectual Property. All rights
7 #                             reserved.
8 # ================================================================================
9 # Licensed under the Apache License, Version 2.0 (the "License");
10 # you may not use this file except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 # http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 # ============LICENSE_END============================================
21 # ===================================================================
22 #
23 ###
24 """Onap Sdnc module."""
25
26 from typing import Dict
27 from onapsdk.sdnc.sdnc_element import SdncElement
28 from oransdk.configuration import settings
29
30 class OranSdnc(SdncElement):
31     """SDNC library."""
32
33     base_url = settings.SDNC_URL
34     header = {"Accept": "application/json", "Content-Type": "application/json"}
35
36     @classmethod
37     def get_status(cls) -> str:
38         """
39         Get status of SDNC component.
40
41         Returns:
42            the status of the SDNC component
43
44         """
45         url = f"{cls.base_url}/apidoc/explorer/"
46         status = cls.send_message('GET',
47                                   'Get status of SDNC component',
48                                   url)
49         return status
50
51     @classmethod
52     def get_odu_oru_status(cls,
53                            odu_node,
54                            radio_unit,
55                            basic_auth: Dict[str, str]) -> dict:
56         """
57         Get status of SDNC component.
58
59         Args:
60            basic_auth: (Dict[str, str]) for example:{ 'username': 'bob', 'password': 'secret' }
61
62         Returns:
63            the status of the SDNC component
64
65         """
66         url = f"{cls.base_url}/rests/data/network-topology:network-topology/topology=topology-netconf/node={odu_node}/yang-ext:mount/o-ran-sc-du-hello-world:network-function/distributed-unit-functions={odu_node}/radio-resource-management-policy-ratio={radio_unit}"
67         status = cls.send_message_json('GET',
68                                        'Get status of Odu connectivity',
69                                        url,
70                                        basic_auth=basic_auth)
71         return status
72
73     @classmethod
74     def get_devices(cls, device_node, basic_auth: Dict[str, str]) -> int:
75         """
76         Get Devices on SDNC.
77
78         Returns:
79            the status of the sdnc component
80         """
81         url = f"{cls.base_url}/rests/data/network-topology:network-topology/topology=topology-netconf/node={device_node}"
82         status = cls.send_message('GET', 'Get status of Device connectivity', url, basic_auth=basic_auth)
83         return status.status_code
84
85     @classmethod
86     def get_events(cls, basic_auth: Dict[str, str], device):
87         """
88         Create device events in Sdnc.
89
90         Args:
91            topic: the event to create, in json format
92            :param basic_auth: (Dict[str, str]) for example:{ 'username': 'bob', 'password': 'secret' }
93            :param device:
94
95         """
96         url = f"{cls.base_url}/rests/operations/data-provider:read-faultlog-list"
97         return cls.send_message('POST', 'Get SDNC events', url, data='{"input": {"filter": [ {"property": "node-id", "filtervalue": "' + device + '"}],"sortorder":[{"property": "timestamp","sortorder": "descending"}],"pagination": {"size": 10,"page": 1}}}', headers=cls.header, basic_auth=basic_auth)