Alarm statements are commented in main.py for ric-app/qp-driver repo
[ric-app/qp-driver.git] / qpdriver / main.py
1 # ==================================================================================
2 #       Copyright (c) 2020 AT&T Intellectual Property.
3 #
4 #   Licensed under the Apache License, Version 2.0 (the "License");
5 #   you may not use this file except in compliance with the License.
6 #   You may obtain a copy of the License at
7 #
8 #          http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #   Unless required by applicable law or agreed to in writing, software
11 #   distributed under the License is distributed on an "AS IS" BASIS,
12 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #   See the License for the specific language governing permissions and
14 #   limitations under the License.
15 # ==================================================================================
16 """
17 qpdriver entrypoint module
18
19 RMR Messages
20  #define TS_UE_LIST 30000
21  #define TS_QOE_PRED_REQ 30001
22  #define TS_QOE_PREDICTION 30002
23 30000 is the message type QPD receives; sends out type 30001, which should be routed to QP.
24 """
25
26 import json
27 from os import getenv
28 from ricxappframe.xapp_frame import RMRXapp, rmr
29 # from ricxappframe.alarm import alarm
30 from qpdriver import data
31 from qpdriver.exceptions import UENotFound
32
33
34 # pylint: disable=invalid-name
35 rmr_xapp = None
36
37
38 def post_init(self):
39     """
40     Function that runs when xapp initialization is complete
41     """
42     self.def_hand_called = 0
43     self.traffic_steering_requests = 0
44     # self.alarm_mgr = alarm.AlarmManager(self._mrc, "ric-xapp", "qp-driver")
45     # self.alarm_sdl = None
46
47
48 def handle_config_change(self, config):
49     """
50     Function that runs at start and on every configuration file change.
51     """
52     self.logger.debug("handle_config_change: config: {}".format(config))
53
54
55 def default_handler(self, summary, sbuf):
56     """
57     Function that processes messages for which no handler is defined
58     """
59     self.def_hand_called += 1
60     self.logger.warning("default_handler unexpected message type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
61     self.rmr_free(sbuf)
62
63
64 def steering_req_handler(self, summary, sbuf):
65     """
66     This is the main handler for this xapp, which handles traffic steering requests.
67     Traffic steering requests predictions on a set of UEs.
68     This app fetches a set of data from SDL, merges it together in a deterministic way,
69     then sends a new message to the QP predictor Xapp.
70
71     The incoming message that this function handles looks like:
72         {"UEPredictionSet" : ["UEId1","UEId2","UEId3"]}
73     """
74     self.traffic_steering_requests += 1
75     # we don't use rts here; free the buffer
76     self.rmr_free(sbuf)
77
78     ue_list = []
79     try:
80         req = json.loads(summary[rmr.RMR_MS_PAYLOAD])  # input should be a json encoded as bytes
81         ue_list = req["UEPredictionSet"]
82         self.logger.debug("steering_req_handler processing request for UE list {}".format(ue_list))
83     except (json.decoder.JSONDecodeError, KeyError):
84         self.logger.warning("steering_req_handler failed to parse request: {}".format(summary[rmr.RMR_MS_PAYLOAD]))
85         return
86
87     # if self._sdl.healthcheck():
88         # healthy, so clear the alarm if it was raised
89         # if self.alarm_sdl:
90         #    self.logger.debug("steering_req_handler clearing alarm")
91         #    self.alarm_mgr.clear_alarm(self.alarm_sdl)
92         #    self.alarm_sdl = None
93     # else:
94         # not healthy, so (re-)raise the alarm
95         # self.logger.debug("steering_req_handler connection to SDL is not healthy, raising alarm")
96         # if self.alarm_sdl:
97         #    self.alarm_mgr.reraise_alarm(self.alarm_sdl)
98         # else:
99         #    self.alarm_sdl = self.alarm_mgr.create_alarm(1, alarm.AlarmSeverity.CRITICAL, "SDL failure")
100         #    self.alarm_mgr.raise_alarm(self.alarm_sdl)
101         # self.logger.warning("steering_req_handler dropping request!")
102         # return
103
104     # iterate over the UEs and send a request for each, if it is a valid UE, to QP
105     for ueid in ue_list:
106         try:
107             to_qpp = data.form_qp_pred_req(self, ueid)
108             payload = json.dumps(to_qpp).encode()
109             success = self.rmr_send(payload, 30001)
110             if not success:
111                 self.logger.warning("steering_req_handler failed to send to QP!")
112         except UENotFound:
113             self.logger.warning("steering_req_handler received a TS Request for a UE that does not exist!")
114
115
116 def start(thread=False):
117     """
118     This is a convenience function that allows this xapp to run in Docker
119     for "real" (no thread, real SDL), but also easily modified for unit testing
120     (e.g., use_fake_sdl). The defaults for this function are for the Dockerized xapp.
121     """
122     global rmr_xapp
123     fake_sdl = getenv("USE_FAKE_SDL", None)
124     rmr_xapp = RMRXapp(default_handler,
125                        config_handler=handle_config_change,
126                        rmr_port=4560,
127                        post_init=post_init,
128                        use_fake_sdl=bool(fake_sdl))
129     rmr_xapp.register_callback(steering_req_handler, 30000)
130     rmr_xapp.run(thread)
131
132
133 def stop():
134     """
135     can only be called if thread=True when started
136     TODO: could we register a signal handler for Docker SIGTERM that calls this?
137     """
138     rmr_xapp.stop()
139
140
141 def get_stats():
142     """
143     hacky for now, will evolve
144     """
145     return {"DefCalled": rmr_xapp.def_hand_called,
146             "SteeringRequests": rmr_xapp.traffic_steering_requests}