Block on RMR read to avoid 100% CPU usage on wait 89/3589/2
authorLott, Christopher (cl778h) <cl778h@att.com>
Tue, 5 May 2020 22:31:54 +0000 (18:31 -0400)
committerLott, Christopher (cl778h) <cl778h@att.com>
Wed, 6 May 2020 01:12:30 +0000 (21:12 -0400)
Extend the receive-all helper method to accept a timeout parameter and
pass to rmr_torcv_msg so RMR waits for a notification that a message
has arrived, instead of spinning the processor while waiting.

Issue-ID: RIC-354
Signed-off-by: Lott, Christopher (cl778h) <cl778h@att.com>
Change-Id: I9149dcdad946f8bac1294eed28260b4c38d1056a

.gitignore
docs/release-notes.rst
ricxappframe/rmr/helpers.py
ricxappframe/xapp_frame.py
ricxappframe/xapp_rmr.py
setup.py
tests/test_rmr.py

index ca538b8..c86c4b7 100644 (file)
@@ -112,4 +112,5 @@ coverage-reports
 
 # Eclipse
 .project
+.pydevproject
 .settings/
index 3ae4794..b28a846 100644 (file)
@@ -14,6 +14,11 @@ and this project adheres to `Semantic Versioning <http://semver.org/>`__.
    :depth: 3
    :local:
 
+[1.0.4] - 2020-05-05
+--------------------
+* Use RMR timeout on receive to avoid 100% CPU usage (`RIC-354 <https://jira.o-ran-sc.org/browse/RIC-354>`_)
+
+
 [1.0.3] - 2020-04-29
 --------------------
 * Upgrade to RMR version 4.0.2
index cb58041..339526a 100644 (file)
 # ==================================================================================
 
 #   Mnemonic:   helpers.py
-#   Abstract:   This is a colleciton of extensions to the RMR base package
-#               which are likely to be convenient for python programmes.
+#   Abstract:   This is a collection of extensions to the RMR base package
+#               which are likely to be convenient for Python programs.
 #   Date:       26 September 2019
 # ---------------------------------------------------------------------------
 
 from ricxappframe.rmr import rmr
 
 
-def rmr_rcvall_msgs(mrc, pass_filter=[]):
+def rmr_rcvall_msgs(mrc, pass_filter=[], timeout=0):
     """
-    Assemble an array of all messages which can be received without
-    blocking.  Effectively draining the message queue if RMR is started
-    in mt-call mode, or draining any waiting TCP buffers.  If the
-    pass_filter parameter is supplied it is treated as one or more message
-    types to accept (pass through). Using the default, an empty list, results
-    in messages with any type being captured.
+    Assembles an array of all messages which can be received without blocking
+    (but see the timeout parameter).  Effectively drains the message queue if
+    RMR is started in mt-call mode, or draining any waiting TCP buffers.  If
+    the pass_filter parameter is supplied, it is treated as one or more
+    message types to accept (pass through). Using the default, an empty list,
+    results in capturing all messages. if the timeout parameter is supplied,
+    this call may block up to that number of milliseconds waiting for a
+    message to arrive. Using the default, zero, results in non-blocking
+    no-wait behavior.
 
     Parameters
     ----------
@@ -42,33 +45,35 @@ def rmr_rcvall_msgs(mrc, pass_filter=[]):
         pass_filter: list (optional)
             The message type(s) to capture.
 
+        timeout: int (optional)
+            The number of milliseconds to wait for a message to arrive.
+
     Returns
     -------
-        list of dict
-        List of message summaries, one for each message captured.
+        List of message summaries (dict), one for each message received; may be empty.
     """
 
     new_messages = []
-    mbuf = rmr.rmr_alloc_msg(mrc, 4096)  # allocate buffer to have something for a return status
+    mbuf = rmr.rmr_alloc_msg(mrc, 4096)  # allocate and reuse a single buffer for RMR
 
     while True:
-        mbuf = rmr.rmr_torcv_msg(mrc, mbuf, 0)  # set the timeout to 0 so this doesn't block!!
-
+        mbuf = rmr.rmr_torcv_msg(mrc, mbuf, timeout)  # first call may have non-zero timeout
+        timeout = 0  # reset so subsequent calls do not wait
         summary = rmr.message_summary(mbuf)
-        if summary["message status"] != "RMR_OK":  # ok indicates msg received, stop on all other states
+        if summary["message status"] != "RMR_OK":  # ok indicates msg received, stop on all other states; e.g., RMR_ERR_TIMEOUT
             break
 
         if len(pass_filter) == 0 or summary["message type"] in pass_filter:  # no filter, or passes; capture it
             new_messages.append(summary)
 
-    rmr.rmr_free_msg(mbuf)  # must free message to avoid leak
+    rmr.rmr_free_msg(mbuf)  # free the single buffer to avoid leak
     return new_messages
 
 
-def rmr_rcvall_msgs_raw(mrc, pass_filter=[]):
+def rmr_rcvall_msgs_raw(mrc, pass_filter=[], timeout=0):
     """
-    Same as rmr_rcvall_msgs, but the raw sbuf is also returned.
-    Useful, for example, if rts is to be used.
+    Same as rmr_rcvall_msgs, but answers tuples with the raw sbuf.
+    Useful if return-to-sender (rts) functions are required.
 
     Parameters
     ----------
@@ -78,25 +83,30 @@ def rmr_rcvall_msgs_raw(mrc, pass_filter=[]):
         pass_filter: list (optional)
             The message type(s) to capture.
 
+        timeout: int (optional)
+            The number of milliseconds to wait for a message to arrive.
+
     Returns
     -------
-    list of tuple:$
-       List of tuples [(S, sbuf),...] where S is a message summary and sbuf is the raw message$
-       the caller is responsible for calling rmr.rmr_free_msg(sbuf) for each sbuf afterwards to prevent memory leaks.
+    list of tuple:
+        List of tuples [(S, sbuf),...] where S is a message summary (dict), and sbuf is the raw message; may be empty.
+        The caller MUST call rmr.rmr_free_msg(sbuf) when finished with each sbuf to prevent memory leaks!
     """
 
     new_messages = []
 
     while True:
-        mbuf = rmr.rmr_alloc_msg(mrc, 4096)  # allocate buffer to have something for a return status
-        mbuf = rmr.rmr_torcv_msg(mrc, mbuf, 0)  # set the timeout to 0 so this doesn't block!!
+        mbuf = rmr.rmr_alloc_msg(mrc, 4096)  # allocate a new buffer for every message
+        mbuf = rmr.rmr_torcv_msg(mrc, mbuf, timeout)  # first call may have non-zero timeout
+        timeout = 0  # reset so subsequent calls do not wait
         summary = rmr.message_summary(mbuf)
-        if summary["message status"] != "RMR_OK":
+        if summary["message status"] != "RMR_OK":  # e.g., RMR_ERR_TIMEOUT
+            rmr.rmr_free_msg(mbuf)  # free the failed-to-receive buffer
             break
 
         if len(pass_filter) == 0 or mbuf.contents.mtype in pass_filter:  # no filter, or passes; capture it
-            new_messages.append((summary, mbuf))
+            new_messages.append((summary, mbuf))  # caller is responsible for freeing the buffer
         else:
-            rmr.rmr_free_msg(mbuf)
+            rmr.rmr_free_msg(mbuf)  # free the filtered-out message buffer
 
     return new_messages
index 95aa122..58596b8 100644 (file)
@@ -76,7 +76,9 @@ class _BaseXapp:
 
     def rmr_get_messages(self):
         """
-        returns a generator iterable over all current messages in the queue that have not yet been read by the client xapp
+        Returns a generator iterable over all items in the queue that have not yet been read by the client xapp.
+        Each item is a tuple (S, sbuf) where S is a message summary dict and sbuf is the raw message.
+        The caller MUST call rmr.rmr_free_msg(sbuf) when finished with each sbuf to prevent memory leaks!
         """
         while not self._rmr_loop.rcv_queue.empty():
             (summary, sbuf) = self._rmr_loop.rcv_queue.get()
index d18b4c0..6092e14 100644 (file)
@@ -32,14 +32,16 @@ mdc_logger = Logger(name=__name__)
 
 class RmrLoop:
     """
-    Class represents an rmr loop that constantly reads from rmr
+    Class represents an RMR loop that constantly reads from RMR.
 
-    Note, we use a queue here, and a thread, rather than the xapp frame just looping and calling consume, so that a possibly slow running consume function does not block the reading of new messages
+    Note, we use a queue here, and a thread, rather than the xapp frame just looping
+    and calling consume, so that a possibly slow running consume function does not
+    block the reading of new messages
     """
 
     def __init__(self, port, wait_for_ready=True):
         """
-        sets up rmr, then launches a thread that reads and injects messages into a queue
+        sets up RMR, then launches a thread that reads and injects messages into a queue.
 
         Parameters
         ----------
@@ -47,17 +49,20 @@ class RmrLoop:
             port to listen on
 
         wait_for_ready: bool (optional)
-            if this is True, then this function hangs until rmr is ready to send, which includes having a valid routing file.
-            this can be set to False if the client only wants to *receive only*
+            If True, then this function hangs until RMR is ready to send, which includes
+            having a valid routing file. This can be set to False if the client only wants
+            to *receive only*.
         """
 
         # Public
         # thread safe queue https://docs.python.org/3/library/queue.html
-        # We use a thread and a queue so that a long running consume callback function can never block reads.
-        # IE a consume implementation could take a long time and the ring size for rmr blows up here and messages are lost
+        # We use a thread and a queue so that a long running consume callback function can
+        # never block reads. IE a consume implementation could take a long time and the ring
+        # size for rmr blows up here and messages are lost.
         self.rcv_queue = queue.Queue()
 
-        # rmr context; RMRFL_MTCALL puts RMR into a multithreaded mode, where a thread populates a ring of messages that receive calls read from
+        # RMR context; RMRFL_MTCALL puts RMR into a multithreaded mode, where a thread
+        # populates a ring of messages that receive calls read from
         self.mrc = rmr.rmr_init(str(port).encode(), rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
 
         if wait_for_ready:
@@ -66,30 +71,30 @@ class RmrLoop:
                 time.sleep(0.1)
 
         # Private
-        self._keep_going = True  # used to tell this thread to stop it's work
+        self._keep_going = True  # used to tell this thread to stop
         self._last_ran = time.time()  # used for healthcheck
         self._loop_is_running = False  # used in stop to know when it's safe to kill the mrc
 
-        # start the work loop
-        mdc_logger.debug("Starting loop thread")
-
         def loop():
-            mdc_logger.debug("Work loop starting")
+            mdc_logger.debug("Work loop starts")
             self._loop_is_running = True
             while self._keep_going:
 
                 # read our mailbox
                 # TODO: take a flag as to whether RAW is needed or not
                 # RAW allows for RTS however the caller must free, and the caller may not need RTS.
-                # Currently after consuming, callers should do  rmr.rmr_free_msg(sbuf)
+                # Currently after consuming, callers should do rmr.rmr_free_msg(sbuf)
 
-                for (msg, sbuf) in helpers.rmr_rcvall_msgs_raw(self.mrc):
+                for (msg, sbuf) in helpers.rmr_rcvall_msgs_raw(self.mrc, timeout=1000):
                     self.rcv_queue.put((msg, sbuf))
 
                 self._last_ran = time.time()
 
             self._loop_is_running = False
+            mdc_logger.debug("Work loop ends")
 
+        # start the work loop
+        mdc_logger.debug("Starting loop thread")
         self._thread = Thread(target=loop)
         self._thread.start()
 
@@ -97,13 +102,15 @@ class RmrLoop:
         """
         sets a flag that will cleanly stop the thread
         """
-        mdc_logger.debug("Stopping rmr thread. Waiting for last iteration to finish..")
+        mdc_logger.debug("Stopping RMR thread. Waiting for last iteration to finish..")
         self._keep_going = False
         # wait until the current batch of messages is done, then kill the rmr connection
-        # note; I debated putting this in "loop" however if the while loop was still going setting mrc to close here would blow up any processing still currently happening
-        # probably more polite to at least finish the current batch and then close. So here we wait until the current batch is done, then we kill the mrc
+        # note; I debated putting this in "loop" however if the while loop was still going
+        # setting mrc to close here would blow up any processing still currently happening
+        # probably more polite to at least finish the current batch and then close. So here
+        # we wait until the current batch is done, then we kill the mrc
         while self._loop_is_running:
-            pass
+            time.sleep(0.5)
         mdc_logger.debug("Closing rmr connection")
         rmr.rmr_close(self.mrc)
 
index 85e017f..332fde9 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -32,7 +32,7 @@ def _long_descr():
 
 setup(
     name="ricxappframe",
-    version="1.0.3",
+    version="1.0.4",
     packages=find_packages(exclude=["tests.*", "tests"]),
     author="Tommy Carpenter, E. Scott Daniels",
     description="Xapp and RMR framework for python",
index afd9980..72cdf4b 100644 (file)
@@ -23,6 +23,7 @@ from ricxappframe.rmr import rmr, helpers, exceptions
 SIZE = 256
 MRC_SEND = None
 MRC_RCV = None
+MRC_BUF_RCV = None
 
 
 def setup_module():
@@ -40,7 +41,7 @@ def setup_module():
         time.sleep(1)
 
     global MRC_BUF_RCV
-    MRC_BUF_RCV = rmr.rmr_init(b"3564", rmr.RMR_MAX_RCV_BYTES, 0x02)
+    MRC_BUF_RCV = rmr.rmr_init(b"3564", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
     while rmr.rmr_ready(MRC_BUF_RCV) == 0:
         time.sleep(1)
 
@@ -51,6 +52,7 @@ def teardown_module():
     """
     rmr.rmr_close(MRC_SEND)
     rmr.rmr_close(MRC_RCV)
+    rmr.rmr_close(MRC_BUF_RCV)
 
 
 def _assert_new_sbuf(sbuf):
@@ -156,10 +158,12 @@ def test_rcv_timeout():
     There is no unit test for rmr_rcv_msg; too dangerous, that is a blocking call that may never return.
     """
     sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
-    sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 50)  # should time out after 50ms
+    start_rcv_sec = time.time()
+    sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 500)  # should wait a bit before returning
     summary = rmr.message_summary(sbuf_rcv)
     assert summary["message state"] == 12
     assert summary["message status"] == "RMR_ERR_TIMEOUT"
+    assert(time.time() - start_rcv_sec > 0.5)  # test duration should be longer than the timeout
 
 
 def test_send_rcv():
@@ -304,7 +308,7 @@ def test_rcv_all():
     time.sleep(1)  # ensure underlying transport gets cycles to send/receive
 
     bundle = helpers.rmr_rcvall_msgs_raw(MRC_BUF_RCV, [2])  # receive only message type 2 messages
-    assert len(bundle) == 12  # we should only get the second batch of 12 messages
+    assert len(bundle) == 12  # we should only get the type 2 batch of 12 messages
 
     for i, (ms, sbuf) in enumerate(bundle):  # test the raw version
         test_summary = rmr.message_summary(sbuf)
@@ -315,6 +319,17 @@ def test_rcv_all():
         assert ms["payload"] == expected_pay
         rmr.rmr_free_msg(sbuf)
 
+    # check the timeout scenarios
+    start_rcv_sec = time.time()
+    bundle = helpers.rmr_rcvall_msgs(MRC_RCV, timeout=1001)  # non-zero timeout means wait
+    assert len(bundle) == 0  # we should get none
+    assert(time.time() - start_rcv_sec > 1)  # test duration should be longer than 1 second
+
+    start_rcv_sec = time.time()
+    bundle = helpers.rmr_rcvall_msgs_raw(MRC_RCV, timeout=1002)  # non-zero timeout means wait
+    assert len(bundle) == 0  # we should get none
+    assert(time.time() - start_rcv_sec > 1)  # test duration should be longer than 1 second
+
 
 def test_bad_buffer():
     """test that we get a proper exception when the buffer has a null pointer"""