1 # ==================================================================================
2 # Copyright (c) 2020 AT&T Intellectual Property.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ==================================================================================
17 qpdriver entrypoint module
20 #define TS_UE_LIST 30000
21 #define TS_QOE_PRED_REQ 30001
22 #define TS_QOE_PREDICTION 30002
23 30000 is the message type QPD receives; sends out type 30001, which should be routed to QP.
28 from ricxappframe.xapp_frame import RMRXapp, rmr
29 from ricxappframe.alarm import alarm
30 from qpdriver import data
31 from qpdriver.exceptions import UENotFound
34 # pylint: disable=invalid-name
40 Function that runs when xapp initialization is complete
42 self.def_hand_called = 0
43 self.traffic_steering_requests = 0
44 self.alarm_mgr = alarm.AlarmManager(self._mrc, "ric-xapp", "qp-driver")
48 def default_handler(self, summary, sbuf):
50 Function that processes messages for which no handler is defined
52 self.def_hand_called += 1
53 self.logger.warning("default_handler unexpected message type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
57 def steering_req_handler(self, summary, sbuf):
59 This is the main handler for this xapp, which handles traffic steering requests.
60 Traffic steering requests predictions on a set of UEs.
61 This app fetches a set of data from SDL, merges it together in a deterministic way,
62 then sends a new message to the QP predictor Xapp.
64 The incoming message that this function handles looks like:
65 {"UEPredictionSet" : ["UEId1","UEId2","UEId3"]}
67 self.traffic_steering_requests += 1
68 # we don't use rts here; free the buffer
73 req = json.loads(summary[rmr.RMR_MS_PAYLOAD]) # input should be a json encoded as bytes
74 ue_list = req["UEPredictionSet"]
75 self.logger.debug("steering_req_handler processing request for UE list {}".format(ue_list))
76 except (json.decoder.JSONDecodeError, KeyError):
77 self.logger.warning("steering_req_handler failed to parse request: {}".format(summary[rmr.RMR_MS_PAYLOAD]))
80 if self._sdl.healthcheck():
81 # healthy, so clear the alarm if it was raised
83 self.logger.debug("steering_req_handler clearing alarm")
84 self.alarm_mgr.clear_alarm(self.alarm_sdl)
87 # not healthy, so (re-)raise the alarm
88 self.logger.debug("steering_req_handler connection to SDL is not healthy, raising alarm")
90 self.alarm_mgr.reraise_alarm(self.alarm_sdl)
92 self.alarm_sdl = self.alarm_mgr.create_alarm(1, alarm.AlarmSeverity.CRITICAL, "SDL failure")
93 self.alarm_mgr.raise_alarm(self.alarm_sdl)
94 self.logger.warning("steering_req_handler dropping request!")
97 # iterate over the UEs and send a request for each, if it is a valid UE, to QP
100 to_qpp = data.form_qp_pred_req(self, ueid)
101 payload = json.dumps(to_qpp).encode()
102 success = self.rmr_send(payload, 30001)
104 self.logger.warning("steering_req_handler failed to send to QP!")
106 self.logger.warning("steering_req_handler received a TS Request for a UE that does not exist!")
109 def start(thread=False):
111 This is a convenience function that allows this xapp to run in Docker
112 for "real" (no thread, real SDL), but also easily modified for unit testing
113 (e.g., use_fake_sdl). The defaults for this function are for the Dockerized xapp.
116 fake_sdl = getenv("USE_FAKE_SDL", None)
117 rmr_xapp = RMRXapp(default_handler, rmr_port=4560, post_init=post_init, use_fake_sdl=bool(fake_sdl))
118 rmr_xapp.register_callback(steering_req_handler, 30000)
124 can only be called if thread=True when started
125 TODO: could we register a signal handler for Docker SIGTERM that calls this?
132 hacky for now, will evolve
134 return {"DefCalled": rmr_xapp.def_hand_called,
135 "SteeringRequests": rmr_xapp.traffic_steering_requests}