a621e918c1539d13875531eed698daa2a9f5b139
[ric-app/qp-driver.git] / qpdriver / main.py
1 # ==================================================================================
2 #       Copyright (c) 2020 AT&T Intellectual Property.
3 #
4 #   Licensed under the Apache License, Version 2.0 (the "License");
5 #   you may not use this file except in compliance with the License.
6 #   You may obtain a copy of the License at
7 #
8 #          http://www.apache.org/licenses/LICENSE-2.0
9 #
10 #   Unless required by applicable law or agreed to in writing, software
11 #   distributed under the License is distributed on an "AS IS" BASIS,
12 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 #   See the License for the specific language governing permissions and
14 #   limitations under the License.
15 # ==================================================================================
16 """
17 qpdriver entrypoint module
18
19 RMR Messages
20  #define TS_UE_LIST 30000
21  #define TS_QOE_PRED_REQ 30001
22  #define TS_QOE_PREDICTION 30002
23 30000 is the message type QPD receives; sends out type 30001, which should be routed to QP.
24 """
25
26 import json
27 from os import getenv
28 from ricxappframe.xapp_frame import RMRXapp, rmr
29 from ricxappframe.alarm import alarm
30 from qpdriver import data
31 from qpdriver.exceptions import UENotFound
32
33
34 # pylint: disable=invalid-name
35 rmr_xapp = None
36
37
38 def post_init(self):
39     """
40     Function that runs when xapp initialization is complete
41     """
42     self.def_hand_called = 0
43     self.traffic_steering_requests = 0
44     self.alarm_mgr = alarm.AlarmManager(self._mrc, "ric-xapp", "qp-driver")
45     self.alarm_sdl = None
46
47
48 def default_handler(self, summary, sbuf):
49     """
50     Function that processes messages for which no handler is defined
51     """
52     self.def_hand_called += 1
53     self.logger.warning("default_handler unexpected message type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
54     self.rmr_free(sbuf)
55
56
57 def steering_req_handler(self, summary, sbuf):
58     """
59     This is the main handler for this xapp, which handles traffic steering requests.
60     Traffic steering requests predictions on a set of UEs.
61     This app fetches a set of data from SDL, merges it together in a deterministic way,
62     then sends a new message to the QP predictor Xapp.
63
64     The incoming message that this function handles looks like:
65         {"UEPredictionSet" : ["UEId1","UEId2","UEId3"]}
66     """
67     self.traffic_steering_requests += 1
68     # we don't use rts here; free the buffer
69     self.rmr_free(sbuf)
70
71     ue_list = []
72     try:
73         req = json.loads(summary[rmr.RMR_MS_PAYLOAD])  # input should be a json encoded as bytes
74         ue_list = req["UEPredictionSet"]
75         self.logger.debug("steering_req_handler processing request for UE list {}".format(ue_list))
76     except (json.decoder.JSONDecodeError, KeyError):
77         self.logger.warning("steering_req_handler failed to parse request: {}".format(summary[rmr.RMR_MS_PAYLOAD]))
78         return
79
80     if self._sdl.healthcheck():
81         # healthy, so clear the alarm if it was raised
82         if self.alarm_sdl:
83             self.logger.debug("steering_req_handler clearing alarm")
84             self.alarm_mgr.clear_alarm(self.alarm_sdl)
85             self.alarm_sdl = None
86     else:
87         # not healthy, so (re-)raise the alarm
88         self.logger.debug("steering_req_handler connection to SDL is not healthy, raising alarm")
89         if self.alarm_sdl:
90             self.alarm_mgr.reraise_alarm(self.alarm_sdl)
91         else:
92             self.alarm_sdl = self.alarm_mgr.create_alarm(1, alarm.AlarmSeverity.CRITICAL, "SDL failure")
93             self.alarm_mgr.raise_alarm(self.alarm_sdl)
94         self.logger.warning("steering_req_handler dropping request!")
95         return
96
97     # iterate over the UEs and send a request for each, if it is a valid UE, to QP
98     for ueid in ue_list:
99         try:
100             to_qpp = data.form_qp_pred_req(self, ueid)
101             payload = json.dumps(to_qpp).encode()
102             success = self.rmr_send(payload, 30001)
103             if not success:
104                 self.logger.warning("steering_req_handler failed to send to QP!")
105         except UENotFound:
106             self.logger.warning("steering_req_handler received a TS Request for a UE that does not exist!")
107
108
109 def start(thread=False):
110     """
111     This is a convenience function that allows this xapp to run in Docker
112     for "real" (no thread, real SDL), but also easily modified for unit testing
113     (e.g., use_fake_sdl). The defaults for this function are for the Dockerized xapp.
114     """
115     global rmr_xapp
116     fake_sdl = getenv("USE_FAKE_SDL", None)
117     rmr_xapp = RMRXapp(default_handler, rmr_port=4560, post_init=post_init, use_fake_sdl=bool(fake_sdl))
118     rmr_xapp.register_callback(steering_req_handler, 30000)
119     rmr_xapp.run(thread)
120
121
122 def stop():
123     """
124     can only be called if thread=True when started
125     TODO: could we register a signal handler for Docker SIGTERM that calls this?
126     """
127     rmr_xapp.stop()
128
129
130 def get_stats():
131     """
132     hacky for now, will evolve
133     """
134     return {"DefCalled": rmr_xapp.def_hand_called,
135             "SteeringRequests": rmr_xapp.traffic_steering_requests}