Block on RMR read to avoid 100% CPU usage on wait
[ric-plt/xapp-frame-py.git] / ricxappframe / xapp_rmr.py
index d18b4c0..6092e14 100644 (file)
@@ -32,14 +32,16 @@ mdc_logger = Logger(name=__name__)
 
 class RmrLoop:
     """
-    Class represents an rmr loop that constantly reads from rmr
+    Class represents an RMR loop that constantly reads from RMR.
 
-    Note, we use a queue here, and a thread, rather than the xapp frame just looping and calling consume, so that a possibly slow running consume function does not block the reading of new messages
+    Note, we use a queue here, and a thread, rather than the xapp frame just looping
+    and calling consume, so that a possibly slow running consume function does not
+    block the reading of new messages
     """
 
     def __init__(self, port, wait_for_ready=True):
         """
-        sets up rmr, then launches a thread that reads and injects messages into a queue
+        sets up RMR, then launches a thread that reads and injects messages into a queue.
 
         Parameters
         ----------
@@ -47,17 +49,20 @@ class RmrLoop:
             port to listen on
 
         wait_for_ready: bool (optional)
-            if this is True, then this function hangs until rmr is ready to send, which includes having a valid routing file.
-            this can be set to False if the client only wants to *receive only*
+            If True, then this function hangs until RMR is ready to send, which includes
+            having a valid routing file. This can be set to False if the client only wants
+            to *receive only*.
         """
 
         # Public
         # thread safe queue https://docs.python.org/3/library/queue.html
-        # We use a thread and a queue so that a long running consume callback function can never block reads.
-        # IE a consume implementation could take a long time and the ring size for rmr blows up here and messages are lost
+        # We use a thread and a queue so that a long running consume callback function can
+        # never block reads. IE a consume implementation could take a long time and the ring
+        # size for rmr blows up here and messages are lost.
         self.rcv_queue = queue.Queue()
 
-        # rmr context; RMRFL_MTCALL puts RMR into a multithreaded mode, where a thread populates a ring of messages that receive calls read from
+        # RMR context; RMRFL_MTCALL puts RMR into a multithreaded mode, where a thread
+        # populates a ring of messages that receive calls read from
         self.mrc = rmr.rmr_init(str(port).encode(), rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
 
         if wait_for_ready:
@@ -66,30 +71,30 @@ class RmrLoop:
                 time.sleep(0.1)
 
         # Private
-        self._keep_going = True  # used to tell this thread to stop it's work
+        self._keep_going = True  # used to tell this thread to stop
         self._last_ran = time.time()  # used for healthcheck
         self._loop_is_running = False  # used in stop to know when it's safe to kill the mrc
 
-        # start the work loop
-        mdc_logger.debug("Starting loop thread")
-
         def loop():
-            mdc_logger.debug("Work loop starting")
+            mdc_logger.debug("Work loop starts")
             self._loop_is_running = True
             while self._keep_going:
 
                 # read our mailbox
                 # TODO: take a flag as to whether RAW is needed or not
                 # RAW allows for RTS however the caller must free, and the caller may not need RTS.
-                # Currently after consuming, callers should do  rmr.rmr_free_msg(sbuf)
+                # Currently after consuming, callers should do rmr.rmr_free_msg(sbuf)
 
-                for (msg, sbuf) in helpers.rmr_rcvall_msgs_raw(self.mrc):
+                for (msg, sbuf) in helpers.rmr_rcvall_msgs_raw(self.mrc, timeout=1000):
                     self.rcv_queue.put((msg, sbuf))
 
                 self._last_ran = time.time()
 
             self._loop_is_running = False
+            mdc_logger.debug("Work loop ends")
 
+        # start the work loop
+        mdc_logger.debug("Starting loop thread")
         self._thread = Thread(target=loop)
         self._thread.start()
 
@@ -97,13 +102,15 @@ class RmrLoop:
         """
         sets a flag that will cleanly stop the thread
         """
-        mdc_logger.debug("Stopping rmr thread. Waiting for last iteration to finish..")
+        mdc_logger.debug("Stopping RMR thread. Waiting for last iteration to finish..")
         self._keep_going = False
         # wait until the current batch of messages is done, then kill the rmr connection
-        # note; I debated putting this in "loop" however if the while loop was still going setting mrc to close here would blow up any processing still currently happening
-        # probably more polite to at least finish the current batch and then close. So here we wait until the current batch is done, then we kill the mrc
+        # note; I debated putting this in "loop" however if the while loop was still going
+        # setting mrc to close here would blow up any processing still currently happening
+        # probably more polite to at least finish the current batch and then close. So here
+        # we wait until the current batch is done, then we kill the mrc
         while self._loop_is_running:
-            pass
+            time.sleep(0.5)
         mdc_logger.debug("Closing rmr connection")
         rmr.rmr_close(self.mrc)