from mdclogpy import Logger
from ricxappframe.xapp_frame import RMRXapp, rmr
-
+# pylint: disable=invalid-name
qp_xapp = None
logger = Logger(name=__name__)
def post_init(self):
+ """
+ Function that runs when xapp initialization is complete
+ """
self.predict_requests = 0
logger.debug("QP xApp started")
def qp_default_handler(self, summary, sbuf):
+ """
+ Function that processes messages for which no handler is defined
+ """
logger.debug("default handler received message type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
# we don't use rts here; free this
self.rmr_free(sbuf)
def qp_predict_handler(self, summary, sbuf):
+ """
+ Function that processes messages for type 30001
+ """
logger.debug("predict handler received message type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
self.predict_requests += 1
# we don't use rts here; free this
self.rmr_free(sbuf)
# send a mock message
- mock_msg = '{ "12345" : { "310-680-200-555001" : [ 2000000 , 1200000 ] , "310-680-200-555002" : [ 800000 , 400000 ] , "310-680-200-555003" : [ 800000 , 400000 ] } }'
- ok = self.rmr_send(mock_msg.encode(), 30002)
- if ok:
+ mock_msg = '{ "12345" : { "310-680-200-555001" : [ 2000000 , 1200000 ], '\
+ ' "310-680-200-555002" : [ 800000 , 400000 ], '\
+ ' "310-680-200-555003" : [ 800000 , 400000 ] } }'
+ success = self.rmr_send(mock_msg.encode(), 30002)
+ if success:
logger.debug("predict handler: sent message successfully")
else:
- logger.warn("predict handler: failed to send message")
+ logger.warning("predict handler: failed to send message")
def start(thread=False):
logger.debug("QP xApp starting")
global qp_xapp
fake_sdl = os.environ.get("USE_FAKE_SDL", None)
- qp_xapp = RMRXapp(qp_default_handler, post_init=post_init, use_fake_sdl=True if fake_sdl else False)
+ qp_xapp = RMRXapp(qp_default_handler, rmr_port=4560, post_init=post_init, use_fake_sdl=bool(fake_sdl))
qp_xapp.register_callback(qp_predict_handler, 30001)
qp_xapp.run(thread)
can only be called if thread=True when started
TODO: could we register a signal handler for Docker SIGTERM that calls this?
"""
+ global qp_xapp
qp_xapp.stop()
def get_stats():
- # hacky for now, will evolve
+ """
+ hacky for now, will evolve
+ """
+ global qp_xapp
return {"PredictRequests": qp_xapp.predict_requests}