Use blocking get call w/ timeout to read msg queue
[ric-plt/xapp-frame-py.git] / ricxappframe / xapp_rmr.py
index 93eaeeb..2a35b57 100644 (file)
@@ -34,14 +34,15 @@ class RmrLoop:
     """
     Class represents an RMR loop that constantly reads from RMR.
 
-    Note, we use a queue here, and a thread, rather than the xapp frame just looping
-    and calling consume, so that a possibly slow running consume function does not
-    block the reading of new messages
+    Note, we use a queue here, and a thread, rather than the xapp
+    frame just looping and calling consume, so that a possibly slow
+    running consume function does not block the reading of new messages.
     """
 
     def __init__(self, port, wait_for_ready=True):
         """
-        sets up RMR, then launches a thread that reads and injects messages into a queue.
+        sets up RMR, then launches a thread that reads and injects
+        messages into a queue.
 
         Parameters
         ----------
@@ -49,9 +50,9 @@ class RmrLoop:
             port to listen on
 
         wait_for_ready: bool (optional)
-            If True, then this function hangs until RMR is ready to send, which includes
-            having a valid routing file. This can be set to False if the client only wants
-            to *receive only*.
+            If True, then this function hangs until RMR is ready to
+            send, which includes having a valid routing file. This can
+            be set to False if the client only wants to *receive only*.
         """
 
         # Public
@@ -82,10 +83,13 @@ class RmrLoop:
 
                 # read our mailbox
                 # TODO: take a flag as to whether RAW is needed or not
-                # RAW allows for RTS however the caller must free, and the caller may not need RTS.
-                # Currently after consuming, callers should do rmr.rmr_free_msg(sbuf)
-
-                for (msg, sbuf) in helpers.rmr_rcvall_msgs_raw(self.mrc, timeout=1000):
+                # RAW allows for RTS however the caller must free, and
+                # the caller may not need RTS. Currently after
+                # consuming, callers must call rmr.rmr_free_msg(sbuf)
+                # Use a non-trivial timeout to avoid spinning the CPU.
+                # The function returns if no messages arrive for that
+                # interval, which allows a stop request to be processed.
+                for (msg, sbuf) in helpers.rmr_rcvall_msgs_raw(self.mrc, timeout=5000):
                     self.rcv_queue.put((msg, sbuf))
 
                 self._last_ran = time.time()
@@ -102,16 +106,19 @@ class RmrLoop:
         """
         sets a flag that will cleanly stop the thread
         """
-        mdc_logger.debug("Stopping RMR thread. Waiting for last iteration to finish..")
+        # wait until the current batch of messages is done, then kill
+        # the rmr connection. I debated putting this in "loop" however
+        # if the while loop was still going, setting mrc to close here
+        # would blow up any processing still currently happening.
+        # Probably more polite to at least finish the current batch
+        # and then close. So here we wait until the current batch is
+        # done, then we kill the mrc.
+        mdc_logger.debug("Setting flag to end RMR work loop.")
         self._keep_going = False
-        # wait until the current batch of messages is done, then kill the rmr connection
-        # note; I debated putting this in "loop" however if the while loop was still going
-        # setting mrc to close here would blow up any processing still currently happening
-        # probably more polite to at least finish the current batch and then close. So here
-        # we wait until the current batch is done, then we kill the mrc
         while self._loop_is_running:
-            time.sleep(0.5)
-        mdc_logger.debug("Closing rmr connection")
+            time.sleep(1)
+            mdc_logger.debug("Waiting for RMR work loop to end")
+        mdc_logger.debug("Closing RMR connection")
         rmr.rmr_close(self.mrc)
 
     def healthcheck(self, seconds=30):