from threading import Thread
from ricxappframe import xapp_rmr
from ricxappframe.xapp_sdl import SDLWrapper
-from rmr import rmr
+from ricxappframe.rmr import rmr
from mdclogpy import Logger
+# constants
+RIC_HEALTH_CHECK_REQ = 100
+RIC_HEALTH_CHECK_RESP = 101
+
# Private base class; not for direct client use
if sbuf.contents.state == 0:
return True
+ self.logger.info("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf)))
return False
def rmr_free(self, sbuf):
# used for thread control
self._keep_going = True
+ # register a default healthcheck handler
+ # this default checks that rmr is working and SDL is working
+ # the user can override this and register their own handler if they wish since the "last registered callback wins".
+ def handle_healthcheck(self, summary, sbuf):
+ ok = self.healthcheck()
+ payload = b"OK\n" if ok else b"ERROR [RMR or SDL is unhealthy]\n"
+ self.rmr_rts(sbuf, new_payload=payload, new_mtype=RIC_HEALTH_CHECK_RESP)
+ self.rmr_free(sbuf)
+
+ self.register_callback(handle_healthcheck, RIC_HEALTH_CHECK_REQ)
+
def register_callback(self, handler, message_type):
"""
registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type