Do not thread by default, but let the user choose.
[ric-plt/xapp-frame-py.git] / ricxappframe / xapp_frame.py
index 94a3a3e..f278cf1 100644 (file)
@@ -18,8 +18,7 @@ Framework here means Xapp classes that can be subclassed
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 # ==================================================================================
-
-
+from threading import Thread
 from ricxappframe import xapp_rmr
 from ricxappframe.xapp_sdl import SDLWrapper
 from rmr import rmr
@@ -37,7 +36,7 @@ class _BaseXapp:
     Base xapp; not for client use directly
     """
 
-    def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
+    def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
         """
         Init
 
@@ -53,6 +52,10 @@ class _BaseXapp:
         use_fake_sdl: bool (optional)
             if this is True, it uses dbaas' "fake dict backend" instead of Redis or other backends.
             Set this to true when developing your xapp or during unit testing to completely avoid needing a dbaas running or any network at all
+
+        post_init: function (optional)
+            runs this user provided function after the base xapp is initialized
+            it's signature should be post_init(self)
         """
 
         # Start rmr rcv thread
@@ -62,6 +65,12 @@ class _BaseXapp:
         # SDL
         self._sdl = SDLWrapper(use_fake_sdl)
 
+        # run the optionally provided user post init
+        if post_init:
+            post_init(self)
+
+    # Public rmr methods
+
     def rmr_get_messages(self):
         """
         returns a generator iterable over all current messages in the queue that have not yet been read by the client xapp
@@ -228,8 +237,7 @@ class _BaseXapp:
 
     def stop(self):
         """
-        cleans up and stops the xapp.
-        Currently this only stops the rmr thread
+        cleans up and stops the xapp rmr thread (currently)
         This is critical for unit testing as pytest will never return if the thread is running.
 
         TODO: can we register a ctrl-c handler so this gets called on ctrl-c? Because currently two ctrl-c are needed to stop
@@ -246,31 +254,89 @@ class RMRXapp(_BaseXapp):
     When run is called, the xapp framework waits for rmr messages, and calls the client provided consume callback on every one
     """
 
-    def consume(self, summary, sbuf):
+    def __init__(self, default_handler, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
+        """
+        Parameters
+        ----------
+        default_handler: function
+            a function with the signature (summary, sbuf) to be called when a message of type message_type is received
+            summary: dict
+                the rmr message summary
+            sbuf: ctypes c_void_p
+                Pointer to an rmr message buffer. The user must call free on this when done.
+
+        post_init: function (optional)
+            optionally runs this function after the app initializes and before the run loop
+            it's signature should be post_init(self)
+
+        For the other parameters, see _BaseXapp
+        """
+        # init base
+        super().__init__(
+            rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl, post_init=post_init
+        )
+
+        # setup callbacks
+        self._default_handler = default_handler
+        self._dispatch = {}
+
+        # used for thread control
+        self._keep_going = True
+
+    def register_callback(self, handler, message_type):
         """
-        This function is to be implemented by the client and is called whenever a new rmr message is received.
-        It is expected to take two parameters (besides self):
+        registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type
 
         Parameters
         ----------
-        summary: dict
-            the rmr message summary
-        sbuf: ctypes c_void_p
-            Pointer to an rmr message buffer. The user must call free on this when done.
+        handler: function
+            a function with the signature (summary, sbuf) to be called when a message of type message_type is received
+            summary: dict
+                the rmr message summary
+            sbuf: ctypes c_void_p
+                Pointer to an rmr message buffer. The user must call free on this when done.
+
+        message:type: int
+            the message type to look for
+
+        Note if this method is called multiple times for a single message type, the "last one wins".
         """
-        raise NotImplementedError()
+        self._dispatch[message_type] = handler
 
-    def run(self):
+    def run(self, thread=False):
         """
-        This function should be called when the client xapp is ready to wait for consume to be called on received messages
+        This function should be called when the client xapp is ready to wait for their handlers to be called on received messages
 
-        TODO: should we run this in a thread too? We can't currently call "stop" on rmr xapps at an arbitrary time because this doesn't return control
-        Running the below in a thread probably makes the most sense.
+        Parameters
+        ----------
+        thread: bool (optional)
+            if thread is True, execution is returned to caller and the queue read loop is executed in a thread.
+            The thread can be stopped using .stop()
+            if False, execution is not returned and the framework loops
         """
-        while True:
-            if not self._rmr_loop.rcv_queue.empty():
-                (summary, sbuf) = self._rmr_loop.rcv_queue.get()
-                self.consume(summary, sbuf)
+
+        def loop():
+            while self._keep_going:
+                if not self._rmr_loop.rcv_queue.empty():
+                    (summary, sbuf) = self._rmr_loop.rcv_queue.get()
+                    # dispatch
+                    func = self._dispatch.get(summary["message type"], None)
+                    if not func:
+                        func = self._default_handler
+                    func(self, summary, sbuf)
+
+        if thread:
+            Thread(target=loop).start()
+        else:
+            loop()
+
+    def stop(self):
+        """
+        stops the rmr xapp completely.
+        """
+        super().stop()
+        mdc_logger.debug("Stopping queue reading thread..")
+        self._keep_going = False
 
 
 class Xapp(_BaseXapp):
@@ -278,15 +344,24 @@ class Xapp(_BaseXapp):
     Represents an xapp where the client provides a generic function to call, which is mostly likely a loop-forever loop
     """
 
-    def loop(self):
+    def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
         """
-        This function is to be implemented by the client and is called
+        Parameters
+        ----------
+        entrypoint: function
+            this function is called when the xapp runs; this is the user code
+            it's signature should be function(self)
+
+        For the other parameters, see _BaseXapp
         """
-        raise NotImplementedError()
+        # init base
+        super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl)
+        self._entrypoint = entrypoint
 
     def run(self):
         """
-        This function should be called when the client xapp is ready to start their loop
-        This is simple and the client could just call self.loop(), however this gives a consistent interface as the other xapps
+        This function should be called when the client xapp is ready to start their code
         """
-        self.loop()
+        self._entrypoint(self)
+
+    # there is no need for stop currently here (base has, and nothing special to do here)