cce640d33dc55e2070bfc660b92b09e3303cc1b9
[ric-app/hw-python.git] / src / manager / SubscriptionManager.py
1 # ==================================================================================
2 #
3 #       Copyright (c) 2021 Samsung Electronics Co., Ltd. All Rights Reserved.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 #
17 # ==================================================================================
18 """
19
20 """
21
22 import requests
23 from ricxappframe.xapp_frame import RMRXapp
24 import json
25 from ..utils.constants import Constants
26 from ._BaseManager import _BaseManager
27 from swagger_client import api_client
28
29 class SubscriptionManager(_BaseManager):
30
31     __namespace = "e2Manager"
32
33     def __init__(self, rmr_xapp: RMRXapp):
34         super().__init__(rmr_xapp)
35         _client = api_client()
36
37     def get_gnb_list(self):
38         gnblist = self._rmr_xapp.get_list_gnb_ids()   # yet to come in library
39         self.logger.info("SubscriptionManager.getGnbList:: Processed request: {}".format(json.dumps(gnblist)))
40         return gnblist
41
42     def get_enb_list(self):
43         enblist = self._rmr_xapp.get_list_enb_ids()   # yet to come in library
44         self.logger.info("SubscriptionManager.sdlGetGnbList:: Handler processed request: {}".format(json.dumps(enblist)))
45         return enblist
46
47     def send_subscription_request(self,xnb_id):
48         subscription_request = {"xnb_id": xnb_id, "action_type": Constants.ACTION_TYPE}
49         try:
50             json_object = json.dumps(subscription_request,indent=4)
51         except TypeError:
52             print("Unable to serialize the object")
53         url = Constants.SUBSCRIPTION_PATH.format(Constants.PLT_NAMESPACE,
54                                                  Constants.SUBSCRIPTION_SERVICE,
55                                                  Constants.SUBSCRIPTION_PORT)
56         try:
57             response = requests.post(url , json=json_object)
58             response.raise_for_status()
59         except requests.exceptions.HTTPError as err_h:
60             return "An Http Error occurred:" + repr(err_h)
61         except requests.exceptions.ConnectionError as err_c:
62             return "An Error Connecting to the API occurred:" + repr(err_c)
63         except requests.exceptions.Timeout as err_t:
64             return "A Timeout Error occurred:" + repr(err_t)
65         except requests.exceptions.RequestException as err:
66             return "An Unknown Error occurred" + repr(err)
67
68
69
70
71
72
73