Block on RMR read to avoid 100% CPU usage on wait
[ric-plt/xapp-frame-py.git] / ricxappframe / rmr / helpers.py
index cb58041..339526a 100644 (file)
 # ==================================================================================
 
 #   Mnemonic:   helpers.py
-#   Abstract:   This is a colleciton of extensions to the RMR base package
-#               which are likely to be convenient for python programmes.
+#   Abstract:   This is a collection of extensions to the RMR base package
+#               which are likely to be convenient for Python programs.
 #   Date:       26 September 2019
 # ---------------------------------------------------------------------------
 
 from ricxappframe.rmr import rmr
 
 
-def rmr_rcvall_msgs(mrc, pass_filter=[]):
+def rmr_rcvall_msgs(mrc, pass_filter=[], timeout=0):
     """
-    Assemble an array of all messages which can be received without
-    blocking.  Effectively draining the message queue if RMR is started
-    in mt-call mode, or draining any waiting TCP buffers.  If the
-    pass_filter parameter is supplied it is treated as one or more message
-    types to accept (pass through). Using the default, an empty list, results
-    in messages with any type being captured.
+    Assembles an array of all messages which can be received without blocking
+    (but see the timeout parameter).  Effectively drains the message queue if
+    RMR is started in mt-call mode, or draining any waiting TCP buffers.  If
+    the pass_filter parameter is supplied, it is treated as one or more
+    message types to accept (pass through). Using the default, an empty list,
+    results in capturing all messages. if the timeout parameter is supplied,
+    this call may block up to that number of milliseconds waiting for a
+    message to arrive. Using the default, zero, results in non-blocking
+    no-wait behavior.
 
     Parameters
     ----------
@@ -42,33 +45,35 @@ def rmr_rcvall_msgs(mrc, pass_filter=[]):
         pass_filter: list (optional)
             The message type(s) to capture.
 
+        timeout: int (optional)
+            The number of milliseconds to wait for a message to arrive.
+
     Returns
     -------
-        list of dict
-        List of message summaries, one for each message captured.
+        List of message summaries (dict), one for each message received; may be empty.
     """
 
     new_messages = []
-    mbuf = rmr.rmr_alloc_msg(mrc, 4096)  # allocate buffer to have something for a return status
+    mbuf = rmr.rmr_alloc_msg(mrc, 4096)  # allocate and reuse a single buffer for RMR
 
     while True:
-        mbuf = rmr.rmr_torcv_msg(mrc, mbuf, 0)  # set the timeout to 0 so this doesn't block!!
-
+        mbuf = rmr.rmr_torcv_msg(mrc, mbuf, timeout)  # first call may have non-zero timeout
+        timeout = 0  # reset so subsequent calls do not wait
         summary = rmr.message_summary(mbuf)
-        if summary["message status"] != "RMR_OK":  # ok indicates msg received, stop on all other states
+        if summary["message status"] != "RMR_OK":  # ok indicates msg received, stop on all other states; e.g., RMR_ERR_TIMEOUT
             break
 
         if len(pass_filter) == 0 or summary["message type"] in pass_filter:  # no filter, or passes; capture it
             new_messages.append(summary)
 
-    rmr.rmr_free_msg(mbuf)  # must free message to avoid leak
+    rmr.rmr_free_msg(mbuf)  # free the single buffer to avoid leak
     return new_messages
 
 
-def rmr_rcvall_msgs_raw(mrc, pass_filter=[]):
+def rmr_rcvall_msgs_raw(mrc, pass_filter=[], timeout=0):
     """
-    Same as rmr_rcvall_msgs, but the raw sbuf is also returned.
-    Useful, for example, if rts is to be used.
+    Same as rmr_rcvall_msgs, but answers tuples with the raw sbuf.
+    Useful if return-to-sender (rts) functions are required.
 
     Parameters
     ----------
@@ -78,25 +83,30 @@ def rmr_rcvall_msgs_raw(mrc, pass_filter=[]):
         pass_filter: list (optional)
             The message type(s) to capture.
 
+        timeout: int (optional)
+            The number of milliseconds to wait for a message to arrive.
+
     Returns
     -------
-    list of tuple:$
-       List of tuples [(S, sbuf),...] where S is a message summary and sbuf is the raw message$
-       the caller is responsible for calling rmr.rmr_free_msg(sbuf) for each sbuf afterwards to prevent memory leaks.
+    list of tuple:
+        List of tuples [(S, sbuf),...] where S is a message summary (dict), and sbuf is the raw message; may be empty.
+        The caller MUST call rmr.rmr_free_msg(sbuf) when finished with each sbuf to prevent memory leaks!
     """
 
     new_messages = []
 
     while True:
-        mbuf = rmr.rmr_alloc_msg(mrc, 4096)  # allocate buffer to have something for a return status
-        mbuf = rmr.rmr_torcv_msg(mrc, mbuf, 0)  # set the timeout to 0 so this doesn't block!!
+        mbuf = rmr.rmr_alloc_msg(mrc, 4096)  # allocate a new buffer for every message
+        mbuf = rmr.rmr_torcv_msg(mrc, mbuf, timeout)  # first call may have non-zero timeout
+        timeout = 0  # reset so subsequent calls do not wait
         summary = rmr.message_summary(mbuf)
-        if summary["message status"] != "RMR_OK":
+        if summary["message status"] != "RMR_OK":  # e.g., RMR_ERR_TIMEOUT
+            rmr.rmr_free_msg(mbuf)  # free the failed-to-receive buffer
             break
 
         if len(pass_filter) == 0 or mbuf.contents.mtype in pass_filter:  # no filter, or passes; capture it
-            new_messages.append((summary, mbuf))
+            new_messages.append((summary, mbuf))  # caller is responsible for freeing the buffer
         else:
-            rmr.rmr_free_msg(mbuf)
+            rmr.rmr_free_msg(mbuf)  # free the filtered-out message buffer
 
     return new_messages