"""
-
+import json
import os
from mdclogpy import Logger
from ricxappframe.xapp_frame import RMRXapp, rmr
-
+# pylint: disable=invalid-name
qp_xapp = None
logger = Logger(name=__name__)
def post_init(self):
+ """
+ Function that runs when xapp initialization is complete
+ """
self.predict_requests = 0
logger.debug("QP xApp started")
def qp_default_handler(self, summary, sbuf):
+ """
+ Function that processes messages for which no handler is defined
+ """
logger.debug("default handler received message type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
# we don't use rts here; free this
self.rmr_free(sbuf)
def qp_predict_handler(self, summary, sbuf):
+ """
+ Function that processes messages for type 30001
+ """
logger.debug("predict handler received message type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
+ logger.debug("adding somethign")
+ logger.debug("message is " + summary[rmr.RMR_MS_PAYLOAD].decode())
+ pred_req_msg = json.loads(summary[rmr.RMR_MS_PAYLOAD].decode())
+ all_cells = {}
+ ind = 0
+ for ncell in pred_req_msg["CellMeasurements"]:
+ if (ind == 0):
+ all_cells[ncell["CellID"]] = [50000, 20000]
+ else:
+ all_cells[ncell["CellID"]] = [20000, 10000]
+ ind += 1
+
+ pred_msg = {}
+ pred_msg[pred_req_msg["PredictionUE"]] = all_cells
self.predict_requests += 1
# we don't use rts here; free this
self.rmr_free(sbuf)
- # send a mock message
- mock_msg = '{ "12345" : { "310-680-200-555001" : [ 2000000 , 1200000 ] , "310-680-200-555002" : [ 800000 , 400000 ] , "310-680-200-555003" : [ 800000 , 400000 ] } }'
- ok = self.rmr_send(mock_msg.encode(), 30002)
- if ok:
+ # send a mock message based on input
+ success = self.rmr_send(json.dumps(pred_msg).encode(), 30002)
+ if success:
logger.debug("predict handler: sent message successfully")
else:
- logger.warn("predict handler: failed to send message")
+ logger.warning("predict handler: failed to send message")
def start(thread=False):
logger.debug("QP xApp starting")
global qp_xapp
fake_sdl = os.environ.get("USE_FAKE_SDL", None)
- qp_xapp = RMRXapp(qp_default_handler, post_init=post_init, use_fake_sdl=True if fake_sdl else False)
+ qp_xapp = RMRXapp(qp_default_handler, rmr_port=4560, post_init=post_init, use_fake_sdl=bool(fake_sdl))
qp_xapp.register_callback(qp_predict_handler, 30001)
qp_xapp.run(thread)
can only be called if thread=True when started
TODO: could we register a signal handler for Docker SIGTERM that calls this?
"""
+ global qp_xapp
qp_xapp.stop()
def get_stats():
- # hacky for now, will evolve
+ """
+ hacky for now, will evolve
+ """
+ global qp_xapp
return {"PredictRequests": qp_xapp.predict_requests}