import json
from os import getenv
from ricxappframe.xapp_frame import RMRXapp, rmr
+from ricxappframe.alarm import alarm
from qpdriver import data
from qpdriver.exceptions import UENotFound
"""
self.def_hand_called = 0
self.traffic_steering_requests = 0
+ self.alarm_mgr = alarm.AlarmManager(self._mrc, "ric-xapp", "qp-driver")
+ self.alarm_sdl = None
def default_handler(self, summary, sbuf):
Function that processes messages for which no handler is defined
"""
self.def_hand_called += 1
- self.logger.warning("QP Driver received an unexpected message of type: {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
+ self.logger.warning("default_handler unexpected message type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
self.rmr_free(sbuf)
"""
This is the main handler for this xapp, which handles traffic steering requests.
Traffic steering requests predictions on a set of UEs.
- This app fetches a set of data, merges it together in a deterministic way,
- then sends a new message out to the QP predictor Xapp.
+ This app fetches a set of data from SDL, merges it together in a deterministic way,
+ then sends a new message to the QP predictor Xapp.
The incoming message that this function handles looks like:
- {“UEPredictionSet” : [“UEId1”,”UEId2”,”UEId3”]}
+ {"UEPredictionSet" : ["UEId1","UEId2","UEId3"]}
"""
self.traffic_steering_requests += 1
+ # we don't use rts here; free the buffer
+ self.rmr_free(sbuf)
+
ue_list = []
try:
req = json.loads(summary[rmr.RMR_MS_PAYLOAD]) # input should be a json encoded as bytes
ue_list = req["UEPredictionSet"]
+ self.logger.debug("steering_req_handler processing request for UE list {}".format(ue_list))
except (json.decoder.JSONDecodeError, KeyError):
- self.logger.debug("Received a TS Request but it was malformed!")
-
- # we don't use rts here; free this
- self.rmr_free(sbuf)
+ self.logger.warning("steering_req_handler failed to parse request: {}".format(summary[rmr.RMR_MS_PAYLOAD]))
+ return
+
+ if self._sdl.healthcheck():
+ # healthy, so clear the alarm if it was raised
+ if self.alarm_sdl:
+ self.logger.debug("steering_req_handler clearing alarm")
+ self.alarm_mgr.clear_alarm(self.alarm_sdl)
+ self.alarm_sdl = None
+ else:
+ # not healthy, so (re-)raise the alarm
+ self.logger.debug("steering_req_handler connection to SDL is not healthy, raising alarm")
+ if self.alarm_sdl:
+ self.alarm_mgr.reraise_alarm(self.alarm_sdl)
+ else:
+ self.alarm_sdl = self.alarm_mgr.create_alarm(1, alarm.AlarmSeverity.CRITICAL, "SDL failure")
+ self.alarm_mgr.raise_alarm(self.alarm_sdl)
+ self.logger.warning("steering_req_handler dropping request!")
+ return
# iterate over the UEs and send a request for each, if it is a valid UE, to QP
for ueid in ue_list:
payload = json.dumps(to_qpp).encode()
success = self.rmr_send(payload, 30001)
if not success:
- self.logger.debug("QP Driver was unable to send to QP!")
+ self.logger.warning("steering_req_handler failed to send to QP!")
except UENotFound:
- self.logger.debug("Received a TS Request for a UE that does not exist!")
+ self.logger.warning("steering_req_handler received a TS Request for a UE that does not exist!")
def start(thread=False):