Changes to framework usage:
[ric-plt/xapp-frame-py.git] / ricxappframe / xapp_frame.py
index dba5ad9..6b70bb6 100644 (file)
@@ -18,8 +18,7 @@ Framework here means Xapp classes that can be subclassed
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 # ==================================================================================
-
-
+from threading import Thread
 from ricxappframe import xapp_rmr
 from ricxappframe.xapp_sdl import SDLWrapper
 from rmr import rmr
@@ -37,7 +36,7 @@ class _BaseXapp:
     Base xapp; not for client use directly
     """
 
-    def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
+    def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
         """
         Init
 
@@ -53,6 +52,10 @@ class _BaseXapp:
         use_fake_sdl: bool (optional)
             if this is True, it uses dbaas' "fake dict backend" instead of Redis or other backends.
             Set this to true when developing your xapp or during unit testing to completely avoid needing a dbaas running or any network at all
+
+        post_init: function (optional)
+            runs this user provided function after the base xapp is initialized
+            it's signature should be post_init(self)
         """
 
         # Start rmr rcv thread
@@ -63,15 +66,8 @@ class _BaseXapp:
         self._sdl = SDLWrapper(use_fake_sdl)
 
         # run the optionally provided user post init
-        self.post_init()
-
-    # Public methods to be implemented by the client
-    def post_init(self):
-        """
-        this method can optionally be implemented by the client to run code immediately after the xall initialized (but before the xapp starts it's processing loop)
-        the base method here does nothing (ie nothing is executed prior to starting if the client does not implement this)
-        """
-        pass
+        if post_init:
+            post_init(self)
 
     # Public rmr methods
 
@@ -241,8 +237,7 @@ class _BaseXapp:
 
     def stop(self):
         """
-        cleans up and stops the xapp.
-        Currently this only stops the rmr thread
+        cleans up and stops the xapp rmr thread (currently)
         This is critical for unit testing as pytest will never return if the thread is running.
 
         TODO: can we register a ctrl-c handler so this gets called on ctrl-c? Because currently two ctrl-c are needed to stop
@@ -259,32 +254,81 @@ class RMRXapp(_BaseXapp):
     When run is called, the xapp framework waits for rmr messages, and calls the client provided consume callback on every one
     """
 
-    def consume(self, summary, sbuf):
+    def __init__(self, default_handler, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
+        """
+        Parameters
+        ----------
+        default_handler: function
+            a function with the signature (summary, sbuf) to be called when a message of type message_type is received
+            summary: dict
+                the rmr message summary
+            sbuf: ctypes c_void_p
+                Pointer to an rmr message buffer. The user must call free on this when done.
+
+        post_init: function (optional)
+            optionally runs this function after the app initializes and before the run loop
+            it's signature should be post_init(self)
+
+        For the other parameters, see _BaseXapp
+        """
+        # init base
+        super().__init__(
+            rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl, post_init=post_init
+        )
+
+        # setup callbacks
+        self._default_handler = default_handler
+        self._dispatch = {}
+
+        # used for thread control
+        self._keep_going = True
+
+    def register_callback(self, handler, message_type):
         """
-        This function is to be implemented by the client and is called whenever a new rmr message is received.
-        It is expected to take two parameters (besides self):
+        registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type
 
         Parameters
         ----------
-        summary: dict
-            the rmr message summary
-        sbuf: ctypes c_void_p
-            Pointer to an rmr message buffer. The user must call free on this when done.
+        handler: function
+            a function with the signature (summary, sbuf) to be called when a message of type message_type is received
+            summary: dict
+                the rmr message summary
+            sbuf: ctypes c_void_p
+                Pointer to an rmr message buffer. The user must call free on this when done.
+
+        message:type: int
+            the message type to look for
+
+        Note if this method is called multiple times for a single message type, the "last one wins".
         """
-        self.stop()
-        raise NotImplementedError()
+        self._dispatch[message_type] = handler
 
     def run(self):
         """
-        This function should be called when the client xapp is ready to wait for consume to be called on received messages
+        This function should be called when the client xapp is ready to wait for their handlers to be called on received messages
+
+        execution is returned to caller
+        """
+
+        def loop():
+            while self._keep_going:
+                if not self._rmr_loop.rcv_queue.empty():
+                    (summary, sbuf) = self._rmr_loop.rcv_queue.get()
+                    # dispatch
+                    func = self._dispatch.get(summary["message type"], None)
+                    if not func:
+                        func = self._default_handler
+                    func(self, summary, sbuf)
 
-        TODO: should we run this in a thread too? We can't currently call "stop" on rmr xapps at an arbitrary time because this doesn't return control
-        Running the below in a thread probably makes the most sense.
+        Thread(target=loop).start()
+
+    def stop(self):
         """
-        while True:
-            if not self._rmr_loop.rcv_queue.empty():
-                (summary, sbuf) = self._rmr_loop.rcv_queue.get()
-                self.consume(summary, sbuf)
+        stops the rmr xapp completely.
+        """
+        super().stop()
+        mdc_logger.debug("Stopping queue reading thread..")
+        self._keep_going = False
 
 
 class Xapp(_BaseXapp):
@@ -292,15 +336,24 @@ class Xapp(_BaseXapp):
     Represents an xapp where the client provides a generic function to call, which is mostly likely a loop-forever loop
     """
 
-    def entrypoint(self):
+    def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
         """
-        This function is to be implemented by the client and is called after post_init
+        Parameters
+        ----------
+        entrypoint: function
+            this function is called when the xapp runs; this is the user code
+            it's signature should be function(self)
+
+        For the other parameters, see _BaseXapp
         """
-        self.stop()
-        raise NotImplementedError()
+        # init base
+        super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl)
+        self._entrypoint = entrypoint
 
     def run(self):
         """
         This function should be called when the client xapp is ready to start their code
         """
-        self.entrypoint()
+        self._entrypoint(self)
+
+    # there is no need for stop currently here (base has, and nothing special to do here)