Use blocking get call w/ timeout to read msg queue
[ric-plt/xapp-frame-py.git] / ricxappframe / xapp_frame.py
index b81804e..4321852 100644 (file)
@@ -19,6 +19,7 @@ Framework for python xapps
 Framework here means Xapp classes that can be subclassed
 """
 
+import queue
 from threading import Thread
 from ricxappframe import xapp_rmr
 from ricxappframe.rmr import rmr
@@ -48,16 +49,20 @@ class _BaseXapp:
             port to listen on
 
         rmr_wait_for_ready: bool (optional)
-            if this is True, then init waits until rmr is ready to send, which includes having a valid routing file.
-            this can be set to False if the client only wants to *receive only*
+
+            if this is True, then init waits until rmr is ready to send, which
+            includes having a valid routing file. This can be set to
+            False if the client only wants to *receive only*.
 
         use_fake_sdl: bool (optional)
-            if this is True, it uses dbaas' "fake dict backend" instead of Redis or other backends.
-            Set this to true when developing your xapp or during unit testing to completely avoid needing a dbaas running or any network at all
+            if this is True, it uses dbaas' "fake dict backend" instead
+            of Redis or other backends. Set this to true when developing
+            your xapp or during unit testing to completely avoid needing
+            a dbaas running or any network at all.
 
         post_init: function (optional)
-            runs this user provided function after the base xapp is initialized
-            it's signature should be post_init(self)
+            runs this user provided function after the base xapp is
+            initialized; its signature should be post_init(self)
         """
         # PUBLIC, can be used by xapps using self.(name):
         self.logger = Logger(name=__name__)
@@ -77,9 +82,11 @@ class _BaseXapp:
 
     def rmr_get_messages(self):
         """
-        Returns a generator iterable over all items in the queue that have not yet been read by the client xapp.
-        Each item is a tuple (S, sbuf) where S is a message summary dict and sbuf is the raw message.
-        The caller MUST call rmr.rmr_free_msg(sbuf) when finished with each sbuf to prevent memory leaks!
+        Returns a generator iterable over all items in the queue that
+        have not yet been read by the client xapp. Each item is a tuple
+        (S, sbuf) where S is a message summary dict and sbuf is the raw
+        message. The caller MUST call rmr.rmr_free_msg(sbuf) when
+        finished with each sbuf to prevent memory leaks!
         """
         while not self._rmr_loop.rcv_queue.empty():
             (summary, sbuf) = self._rmr_loop.rcv_queue.get()
@@ -116,10 +123,10 @@ class _BaseXapp:
 
     def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
         """
-        Allows the xapp to return to sender, possibly adjusting the payload and message type before doing so
-
-        This does NOT free the sbuf for the caller as the caller may wish to perform multiple rts per buffer.
-        The client needs to free.
+        Allows the xapp to return to sender, possibly adjusting the
+        payload and message type before doing so.  This does NOT free
+        the sbuf for the caller as the caller may wish to perform
+        multiple rts per buffer. The client needs to free.
 
         Parameters
         ----------
@@ -130,7 +137,8 @@ class _BaseXapp:
         new_mtype: int (optional)
             New message type (replaces the received message)
         retries: int (optional)
-            Number of times to retry at the application level before excepting RMRFailure
+            Number of times to retry at the application level before
+            throwing exception RMRFailure
 
         Returns
         -------
@@ -147,9 +155,12 @@ class _BaseXapp:
 
     def rmr_free(self, sbuf):
         """
-        Free an rmr message buffer after use
+        Frees an rmr message buffer after use
+
+        Note: this does not need to be a class method, self is not
+        used. However if we break it out as a function we need a home
+        for it.
 
-        Note: this does not need to be a class method, self is not used. However if we break it out as a function we need a home for it.
         Parameters
         ----------
         sbuf: ctypes c_void_p
@@ -158,8 +169,10 @@ class _BaseXapp:
         rmr.rmr_free_msg(sbuf)
 
     # SDL
-    # NOTE, even though these are passthroughs, the seperate SDL wrapper is useful for other applications like A1.
-    # Therefore, we don't embed that SDLWrapper functionality here so that it can be instantiated on it's own.
+    # NOTE, even though these are passthroughs, the seperate SDL wrapper
+    # is useful for other applications like A1. Therefore, we don't
+    # embed that SDLWrapper functionality here so that it can be
+    # instantiated on its own.
 
     def sdl_set(self, ns, key, value, usemsgpack=True):
         """
@@ -244,10 +257,12 @@ class _BaseXapp:
 
     def stop(self):
         """
-        cleans up and stops the xapp rmr thread (currently)
-        This is critical for unit testing as pytest will never return if the thread is running.
+        cleans up and stops the xapp rmr thread (currently). This is
+        critical for unit testing as pytest will never return if the
+        thread is running.
 
-        TODO: can we register a ctrl-c handler so this gets called on ctrl-c? Because currently two ctrl-c are needed to stop
+        TODO: can we register a ctrl-c handler so this gets called on
+        ctrl-c? Because currently two ctrl-c are needed to stop.
         """
         self._rmr_loop.stop()
 
@@ -257,8 +272,10 @@ class _BaseXapp:
 
 class RMRXapp(_BaseXapp):
     """
-    Represents an xapp that is purely driven by rmr messages (i.e., when messages are received, the xapp does something
-    When run is called, the xapp framework waits for rmr messages, and calls the client provided consume callback on every one
+    Represents an xapp that is purely driven by RMR messages; i.e., when
+    messages are received, the xapp does something. When run is called,
+    the xapp framework waits for rmr messages, and calls the
+    client-provided consume callback on every one.
     """
 
     def __init__(self, default_handler, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
@@ -266,15 +283,16 @@ class RMRXapp(_BaseXapp):
         Parameters
         ----------
         default_handler: function
-            a function with the signature (summary, sbuf) to be called when a message of type message_type is received
-            summary: dict
-                the rmr message summary
-            sbuf: ctypes c_void_p
-                Pointer to an rmr message buffer. The user must call free on this when done.
-
+            a function with the signature (summary, sbuf) to be called
+            when a message of type message_type is received.
+        summary: dict
+            the rmr message summary
+        sbuf: ctypes c_void_p
+            Pointer to an rmr message buffer. The user must call free on
+            this when done.
         post_init: function (optional)
-            optionally runs this function after the app initializes and before the run loop
-            it's signature should be post_init(self)
+            optionally runs this function after the app initializes and
+        before the run loop; its signature should be post_init(self)
 
         For the other parameters, see _BaseXapp
         """
@@ -292,7 +310,8 @@ class RMRXapp(_BaseXapp):
 
         # register a default healthcheck handler
         # this default checks that rmr is working and SDL is working
-        # the user can override this and register their own handler if they wish since the "last registered callback wins".
+        # the user can override this and register their own handler
+        # if they wish since the "last registered callback wins".
         def handle_healthcheck(self, summary, sbuf):
             ok = self.healthcheck()
             payload = b"OK\n" if ok else b"ERROR [RMR or SDL is unhealthy]\n"
@@ -308,11 +327,12 @@ class RMRXapp(_BaseXapp):
         Parameters
         ----------
         handler: function
-            a function with the signature (summary, sbuf) to be called when a message of type message_type is received
-            summary: dict
-                the rmr message summary
-            sbuf: ctypes c_void_p
-                Pointer to an rmr message buffer. The user must call free on this when done.
+            a function with the signature (summary, sbuf) to be called
+            when a message of type message_type is received
+        summary: dict
+            the rmr message summary
+        sbuf: ctypes c_void_p
+            Pointer to an rmr message buffer. The user must call free on this when done.
 
         message:type: int
             the message type to look for
@@ -323,25 +343,30 @@ class RMRXapp(_BaseXapp):
 
     def run(self, thread=False):
         """
-        This function should be called when the client xapp is ready to wait for their handlers to be called on received messages
+        This function should be called when the client xapp is ready to
+        wait for its handlers to be called on received messages.
 
         Parameters
         ----------
         thread: bool (optional)
-            if thread is True, execution is returned to caller and the queue read loop is executed in a thread.
-            The thread can be stopped using .stop()
-            if False, execution is not returned and the framework loops
+            If True, a thread is started to run the queue read/dispatch loop
+            and execution is returned to caller; the thread can be stopped
+            by calling .stop(). If False (the default), execution is not
+            returned and the framework loops forever.
         """
 
         def loop():
             while self._keep_going:
-                if not self._rmr_loop.rcv_queue.empty():
-                    (summary, sbuf) = self._rmr_loop.rcv_queue.get()
+                try:
+                    (summary, sbuf) = self._rmr_loop.rcv_queue.get(block=True, timeout=5)
                     # dispatch
                     func = self._dispatch.get(summary[rmr.RMR_MS_MSG_TYPE], None)
                     if not func:
                         func = self._default_handler
                     func(self, summary, sbuf)
+                except queue.Empty:
+                    # the get timed out
+                    pass
 
         if thread:
             Thread(target=loop).start()
@@ -350,16 +375,17 @@ class RMRXapp(_BaseXapp):
 
     def stop(self):
         """
-        stops the rmr xapp completely.
+        Sets the flag to end the dispatch loop.
         """
         super().stop()
-        self.logger.debug("Stopping queue reading thread..")
+        self.logger.debug("Setting flag to end framework work loop.")
         self._keep_going = False
 
 
 class Xapp(_BaseXapp):
     """
-    Represents an xapp where the client provides a generic function to call, which is mostly likely a loop-forever loop
+    Represents an xapp where the client provides a generic function to
+    call, which is mostly likely a loop-forever loop.
     """
 
     def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
@@ -367,8 +393,8 @@ class Xapp(_BaseXapp):
         Parameters
         ----------
         entrypoint: function
-            this function is called when the xapp runs; this is the user code
-            it's signature should be function(self)
+            this function is called when the xapp runs; this is the user code.
+            its signature should be function(self)
 
         For the other parameters, see _BaseXapp
         """
@@ -378,8 +404,10 @@ class Xapp(_BaseXapp):
 
     def run(self):
         """
-        This function should be called when the client xapp is ready to start their code
+        This function should be called when the client xapp is ready to
+        start their code.
         """
         self._entrypoint(self)
 
-    # there is no need for stop currently here (base has, and nothing special to do here)
+    # there is no need for stop currently here (base has, and nothing
+    # special to do here)