Implement healthcheck handler
[ric-plt/xapp-frame-py.git] / ricxappframe / xapp_frame.py
index 94a3a3e..180c3ae 100644 (file)
@@ -18,15 +18,15 @@ Framework here means Xapp classes that can be subclassed
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 # ==================================================================================
-
-
+from threading import Thread
 from ricxappframe import xapp_rmr
 from ricxappframe.xapp_sdl import SDLWrapper
 from rmr import rmr
 from mdclogpy import Logger
 
-
-mdc_logger = Logger(name=__name__)
+# constants
+RIC_HEALTH_CHECK_REQ = 100
+RIC_HEALTH_CHECK_RESP = 101
 
 
 # Private base class; not for direct client use
@@ -37,7 +37,7 @@ class _BaseXapp:
     Base xapp; not for client use directly
     """
 
-    def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
+    def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
         """
         Init
 
@@ -53,7 +53,13 @@ class _BaseXapp:
         use_fake_sdl: bool (optional)
             if this is True, it uses dbaas' "fake dict backend" instead of Redis or other backends.
             Set this to true when developing your xapp or during unit testing to completely avoid needing a dbaas running or any network at all
+
+        post_init: function (optional)
+            runs this user provided function after the base xapp is initialized
+            it's signature should be post_init(self)
         """
+        # PUBLIC, can be used by xapps using self.(name):
+        self.logger = Logger(name=__name__)
 
         # Start rmr rcv thread
         self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
@@ -62,6 +68,12 @@ class _BaseXapp:
         # SDL
         self._sdl = SDLWrapper(use_fake_sdl)
 
+        # run the optionally provided user post init
+        if post_init:
+            post_init(self)
+
+    # Public rmr methods
+
     def rmr_get_messages(self):
         """
         returns a generator iterable over all current messages in the queue that have not yet been read by the client xapp
@@ -127,6 +139,7 @@ class _BaseXapp:
             if sbuf.contents.state == 0:
                 return True
 
+        self.logger.info("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf)))
         return False
 
     def rmr_free(self, sbuf):
@@ -228,8 +241,7 @@ class _BaseXapp:
 
     def stop(self):
         """
-        cleans up and stops the xapp.
-        Currently this only stops the rmr thread
+        cleans up and stops the xapp rmr thread (currently)
         This is critical for unit testing as pytest will never return if the thread is running.
 
         TODO: can we register a ctrl-c handler so this gets called on ctrl-c? Because currently two ctrl-c are needed to stop
@@ -246,31 +258,100 @@ class RMRXapp(_BaseXapp):
     When run is called, the xapp framework waits for rmr messages, and calls the client provided consume callback on every one
     """
 
-    def consume(self, summary, sbuf):
+    def __init__(self, default_handler, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
         """
-        This function is to be implemented by the client and is called whenever a new rmr message is received.
-        It is expected to take two parameters (besides self):
+        Parameters
+        ----------
+        default_handler: function
+            a function with the signature (summary, sbuf) to be called when a message of type message_type is received
+            summary: dict
+                the rmr message summary
+            sbuf: ctypes c_void_p
+                Pointer to an rmr message buffer. The user must call free on this when done.
+
+        post_init: function (optional)
+            optionally runs this function after the app initializes and before the run loop
+            it's signature should be post_init(self)
+
+        For the other parameters, see _BaseXapp
+        """
+        # init base
+        super().__init__(
+            rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl, post_init=post_init
+        )
+
+        # setup callbacks
+        self._default_handler = default_handler
+        self._dispatch = {}
+
+        # used for thread control
+        self._keep_going = True
+
+        # register a default healthcheck handler
+        # this default checks that rmr is working and SDL is working
+        # the user can override this and register their own handler if they wish since the "last registered callback wins".
+        def handle_healthcheck(self, summary, sbuf):
+            ok = self.healthcheck()
+            payload = b"OK\n" if ok else b"ERROR [RMR or SDL is unhealthy]\n"
+            self.rmr_rts(sbuf, new_payload=payload, new_mtype=RIC_HEALTH_CHECK_RESP)
+            self.rmr_free(sbuf)
+
+        self.register_callback(handle_healthcheck, RIC_HEALTH_CHECK_REQ)
+
+    def register_callback(self, handler, message_type):
+        """
+        registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type
 
         Parameters
         ----------
-        summary: dict
-            the rmr message summary
-        sbuf: ctypes c_void_p
-            Pointer to an rmr message buffer. The user must call free on this when done.
+        handler: function
+            a function with the signature (summary, sbuf) to be called when a message of type message_type is received
+            summary: dict
+                the rmr message summary
+            sbuf: ctypes c_void_p
+                Pointer to an rmr message buffer. The user must call free on this when done.
+
+        message:type: int
+            the message type to look for
+
+        Note if this method is called multiple times for a single message type, the "last one wins".
         """
-        raise NotImplementedError()
+        self._dispatch[message_type] = handler
 
-    def run(self):
+    def run(self, thread=False):
+        """
+        This function should be called when the client xapp is ready to wait for their handlers to be called on received messages
+
+        Parameters
+        ----------
+        thread: bool (optional)
+            if thread is True, execution is returned to caller and the queue read loop is executed in a thread.
+            The thread can be stopped using .stop()
+            if False, execution is not returned and the framework loops
         """
-        This function should be called when the client xapp is ready to wait for consume to be called on received messages
 
-        TODO: should we run this in a thread too? We can't currently call "stop" on rmr xapps at an arbitrary time because this doesn't return control
-        Running the below in a thread probably makes the most sense.
+        def loop():
+            while self._keep_going:
+                if not self._rmr_loop.rcv_queue.empty():
+                    (summary, sbuf) = self._rmr_loop.rcv_queue.get()
+                    # dispatch
+                    func = self._dispatch.get(summary["message type"], None)
+                    if not func:
+                        func = self._default_handler
+                    func(self, summary, sbuf)
+
+        if thread:
+            Thread(target=loop).start()
+        else:
+            loop()
+
+    def stop(self):
+        """
+        stops the rmr xapp completely.
         """
-        while True:
-            if not self._rmr_loop.rcv_queue.empty():
-                (summary, sbuf) = self._rmr_loop.rcv_queue.get()
-                self.consume(summary, sbuf)
+        super().stop()
+        self.logger.debug("Stopping queue reading thread..")
+        self._keep_going = False
 
 
 class Xapp(_BaseXapp):
@@ -278,15 +359,24 @@ class Xapp(_BaseXapp):
     Represents an xapp where the client provides a generic function to call, which is mostly likely a loop-forever loop
     """
 
-    def loop(self):
+    def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
         """
-        This function is to be implemented by the client and is called
+        Parameters
+        ----------
+        entrypoint: function
+            this function is called when the xapp runs; this is the user code
+            it's signature should be function(self)
+
+        For the other parameters, see _BaseXapp
         """
-        raise NotImplementedError()
+        # init base
+        super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl)
+        self._entrypoint = entrypoint
 
     def run(self):
         """
-        This function should be called when the client xapp is ready to start their loop
-        This is simple and the client could just call self.loop(), however this gives a consistent interface as the other xapps
+        This function should be called when the client xapp is ready to start their code
         """
-        self.loop()
+        self._entrypoint(self)
+
+    # there is no need for stop currently here (base has, and nothing special to do here)