From: Lott, Christopher (cl778h) Date: Tue, 5 May 2020 22:31:54 +0000 (-0400) Subject: Block on RMR read to avoid 100% CPU usage on wait X-Git-Tag: 1.1.0~2 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=666e8319bd0e618576be79a14208d7eaf0de99f2;p=ric-plt%2Fxapp-frame-py.git Block on RMR read to avoid 100% CPU usage on wait Extend the receive-all helper method to accept a timeout parameter and pass to rmr_torcv_msg so RMR waits for a notification that a message has arrived, instead of spinning the processor while waiting. Issue-ID: RIC-354 Signed-off-by: Lott, Christopher (cl778h) Change-Id: I9149dcdad946f8bac1294eed28260b4c38d1056a --- diff --git a/.gitignore b/.gitignore index ca538b8..c86c4b7 100644 --- a/.gitignore +++ b/.gitignore @@ -112,4 +112,5 @@ coverage-reports # Eclipse .project +.pydevproject .settings/ diff --git a/docs/release-notes.rst b/docs/release-notes.rst index 3ae4794..b28a846 100644 --- a/docs/release-notes.rst +++ b/docs/release-notes.rst @@ -14,6 +14,11 @@ and this project adheres to `Semantic Versioning `__. :depth: 3 :local: +[1.0.4] - 2020-05-05 +-------------------- +* Use RMR timeout on receive to avoid 100% CPU usage (`RIC-354 `_) + + [1.0.3] - 2020-04-29 -------------------- * Upgrade to RMR version 4.0.2 diff --git a/ricxappframe/rmr/helpers.py b/ricxappframe/rmr/helpers.py index cb58041..339526a 100644 --- a/ricxappframe/rmr/helpers.py +++ b/ricxappframe/rmr/helpers.py @@ -17,22 +17,25 @@ # ================================================================================== # Mnemonic: helpers.py -# Abstract: This is a colleciton of extensions to the RMR base package -# which are likely to be convenient for python programmes. +# Abstract: This is a collection of extensions to the RMR base package +# which are likely to be convenient for Python programs. # Date: 26 September 2019 # --------------------------------------------------------------------------- from ricxappframe.rmr import rmr -def rmr_rcvall_msgs(mrc, pass_filter=[]): +def rmr_rcvall_msgs(mrc, pass_filter=[], timeout=0): """ - Assemble an array of all messages which can be received without - blocking. Effectively draining the message queue if RMR is started - in mt-call mode, or draining any waiting TCP buffers. If the - pass_filter parameter is supplied it is treated as one or more message - types to accept (pass through). Using the default, an empty list, results - in messages with any type being captured. + Assembles an array of all messages which can be received without blocking + (but see the timeout parameter). Effectively drains the message queue if + RMR is started in mt-call mode, or draining any waiting TCP buffers. If + the pass_filter parameter is supplied, it is treated as one or more + message types to accept (pass through). Using the default, an empty list, + results in capturing all messages. if the timeout parameter is supplied, + this call may block up to that number of milliseconds waiting for a + message to arrive. Using the default, zero, results in non-blocking + no-wait behavior. Parameters ---------- @@ -42,33 +45,35 @@ def rmr_rcvall_msgs(mrc, pass_filter=[]): pass_filter: list (optional) The message type(s) to capture. + timeout: int (optional) + The number of milliseconds to wait for a message to arrive. + Returns ------- - list of dict - List of message summaries, one for each message captured. + List of message summaries (dict), one for each message received; may be empty. """ new_messages = [] - mbuf = rmr.rmr_alloc_msg(mrc, 4096) # allocate buffer to have something for a return status + mbuf = rmr.rmr_alloc_msg(mrc, 4096) # allocate and reuse a single buffer for RMR while True: - mbuf = rmr.rmr_torcv_msg(mrc, mbuf, 0) # set the timeout to 0 so this doesn't block!! - + mbuf = rmr.rmr_torcv_msg(mrc, mbuf, timeout) # first call may have non-zero timeout + timeout = 0 # reset so subsequent calls do not wait summary = rmr.message_summary(mbuf) - if summary["message status"] != "RMR_OK": # ok indicates msg received, stop on all other states + if summary["message status"] != "RMR_OK": # ok indicates msg received, stop on all other states; e.g., RMR_ERR_TIMEOUT break if len(pass_filter) == 0 or summary["message type"] in pass_filter: # no filter, or passes; capture it new_messages.append(summary) - rmr.rmr_free_msg(mbuf) # must free message to avoid leak + rmr.rmr_free_msg(mbuf) # free the single buffer to avoid leak return new_messages -def rmr_rcvall_msgs_raw(mrc, pass_filter=[]): +def rmr_rcvall_msgs_raw(mrc, pass_filter=[], timeout=0): """ - Same as rmr_rcvall_msgs, but the raw sbuf is also returned. - Useful, for example, if rts is to be used. + Same as rmr_rcvall_msgs, but answers tuples with the raw sbuf. + Useful if return-to-sender (rts) functions are required. Parameters ---------- @@ -78,25 +83,30 @@ def rmr_rcvall_msgs_raw(mrc, pass_filter=[]): pass_filter: list (optional) The message type(s) to capture. + timeout: int (optional) + The number of milliseconds to wait for a message to arrive. + Returns ------- - list of tuple:$ - List of tuples [(S, sbuf),...] where S is a message summary and sbuf is the raw message$ - the caller is responsible for calling rmr.rmr_free_msg(sbuf) for each sbuf afterwards to prevent memory leaks. + list of tuple: + List of tuples [(S, sbuf),...] where S is a message summary (dict), and sbuf is the raw message; may be empty. + The caller MUST call rmr.rmr_free_msg(sbuf) when finished with each sbuf to prevent memory leaks! """ new_messages = [] while True: - mbuf = rmr.rmr_alloc_msg(mrc, 4096) # allocate buffer to have something for a return status - mbuf = rmr.rmr_torcv_msg(mrc, mbuf, 0) # set the timeout to 0 so this doesn't block!! + mbuf = rmr.rmr_alloc_msg(mrc, 4096) # allocate a new buffer for every message + mbuf = rmr.rmr_torcv_msg(mrc, mbuf, timeout) # first call may have non-zero timeout + timeout = 0 # reset so subsequent calls do not wait summary = rmr.message_summary(mbuf) - if summary["message status"] != "RMR_OK": + if summary["message status"] != "RMR_OK": # e.g., RMR_ERR_TIMEOUT + rmr.rmr_free_msg(mbuf) # free the failed-to-receive buffer break if len(pass_filter) == 0 or mbuf.contents.mtype in pass_filter: # no filter, or passes; capture it - new_messages.append((summary, mbuf)) + new_messages.append((summary, mbuf)) # caller is responsible for freeing the buffer else: - rmr.rmr_free_msg(mbuf) + rmr.rmr_free_msg(mbuf) # free the filtered-out message buffer return new_messages diff --git a/ricxappframe/xapp_frame.py b/ricxappframe/xapp_frame.py index 95aa122..58596b8 100644 --- a/ricxappframe/xapp_frame.py +++ b/ricxappframe/xapp_frame.py @@ -76,7 +76,9 @@ class _BaseXapp: def rmr_get_messages(self): """ - returns a generator iterable over all current messages in the queue that have not yet been read by the client xapp + Returns a generator iterable over all items in the queue that have not yet been read by the client xapp. + Each item is a tuple (S, sbuf) where S is a message summary dict and sbuf is the raw message. + The caller MUST call rmr.rmr_free_msg(sbuf) when finished with each sbuf to prevent memory leaks! """ while not self._rmr_loop.rcv_queue.empty(): (summary, sbuf) = self._rmr_loop.rcv_queue.get() diff --git a/ricxappframe/xapp_rmr.py b/ricxappframe/xapp_rmr.py index d18b4c0..6092e14 100644 --- a/ricxappframe/xapp_rmr.py +++ b/ricxappframe/xapp_rmr.py @@ -32,14 +32,16 @@ mdc_logger = Logger(name=__name__) class RmrLoop: """ - Class represents an rmr loop that constantly reads from rmr + Class represents an RMR loop that constantly reads from RMR. - Note, we use a queue here, and a thread, rather than the xapp frame just looping and calling consume, so that a possibly slow running consume function does not block the reading of new messages + Note, we use a queue here, and a thread, rather than the xapp frame just looping + and calling consume, so that a possibly slow running consume function does not + block the reading of new messages """ def __init__(self, port, wait_for_ready=True): """ - sets up rmr, then launches a thread that reads and injects messages into a queue + sets up RMR, then launches a thread that reads and injects messages into a queue. Parameters ---------- @@ -47,17 +49,20 @@ class RmrLoop: port to listen on wait_for_ready: bool (optional) - if this is True, then this function hangs until rmr is ready to send, which includes having a valid routing file. - this can be set to False if the client only wants to *receive only* + If True, then this function hangs until RMR is ready to send, which includes + having a valid routing file. This can be set to False if the client only wants + to *receive only*. """ # Public # thread safe queue https://docs.python.org/3/library/queue.html - # We use a thread and a queue so that a long running consume callback function can never block reads. - # IE a consume implementation could take a long time and the ring size for rmr blows up here and messages are lost + # We use a thread and a queue so that a long running consume callback function can + # never block reads. IE a consume implementation could take a long time and the ring + # size for rmr blows up here and messages are lost. self.rcv_queue = queue.Queue() - # rmr context; RMRFL_MTCALL puts RMR into a multithreaded mode, where a thread populates a ring of messages that receive calls read from + # RMR context; RMRFL_MTCALL puts RMR into a multithreaded mode, where a thread + # populates a ring of messages that receive calls read from self.mrc = rmr.rmr_init(str(port).encode(), rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL) if wait_for_ready: @@ -66,30 +71,30 @@ class RmrLoop: time.sleep(0.1) # Private - self._keep_going = True # used to tell this thread to stop it's work + self._keep_going = True # used to tell this thread to stop self._last_ran = time.time() # used for healthcheck self._loop_is_running = False # used in stop to know when it's safe to kill the mrc - # start the work loop - mdc_logger.debug("Starting loop thread") - def loop(): - mdc_logger.debug("Work loop starting") + mdc_logger.debug("Work loop starts") self._loop_is_running = True while self._keep_going: # read our mailbox # TODO: take a flag as to whether RAW is needed or not # RAW allows for RTS however the caller must free, and the caller may not need RTS. - # Currently after consuming, callers should do rmr.rmr_free_msg(sbuf) + # Currently after consuming, callers should do rmr.rmr_free_msg(sbuf) - for (msg, sbuf) in helpers.rmr_rcvall_msgs_raw(self.mrc): + for (msg, sbuf) in helpers.rmr_rcvall_msgs_raw(self.mrc, timeout=1000): self.rcv_queue.put((msg, sbuf)) self._last_ran = time.time() self._loop_is_running = False + mdc_logger.debug("Work loop ends") + # start the work loop + mdc_logger.debug("Starting loop thread") self._thread = Thread(target=loop) self._thread.start() @@ -97,13 +102,15 @@ class RmrLoop: """ sets a flag that will cleanly stop the thread """ - mdc_logger.debug("Stopping rmr thread. Waiting for last iteration to finish..") + mdc_logger.debug("Stopping RMR thread. Waiting for last iteration to finish..") self._keep_going = False # wait until the current batch of messages is done, then kill the rmr connection - # note; I debated putting this in "loop" however if the while loop was still going setting mrc to close here would blow up any processing still currently happening - # probably more polite to at least finish the current batch and then close. So here we wait until the current batch is done, then we kill the mrc + # note; I debated putting this in "loop" however if the while loop was still going + # setting mrc to close here would blow up any processing still currently happening + # probably more polite to at least finish the current batch and then close. So here + # we wait until the current batch is done, then we kill the mrc while self._loop_is_running: - pass + time.sleep(0.5) mdc_logger.debug("Closing rmr connection") rmr.rmr_close(self.mrc) diff --git a/setup.py b/setup.py index 85e017f..332fde9 100644 --- a/setup.py +++ b/setup.py @@ -32,7 +32,7 @@ def _long_descr(): setup( name="ricxappframe", - version="1.0.3", + version="1.0.4", packages=find_packages(exclude=["tests.*", "tests"]), author="Tommy Carpenter, E. Scott Daniels", description="Xapp and RMR framework for python", diff --git a/tests/test_rmr.py b/tests/test_rmr.py index afd9980..72cdf4b 100644 --- a/tests/test_rmr.py +++ b/tests/test_rmr.py @@ -23,6 +23,7 @@ from ricxappframe.rmr import rmr, helpers, exceptions SIZE = 256 MRC_SEND = None MRC_RCV = None +MRC_BUF_RCV = None def setup_module(): @@ -40,7 +41,7 @@ def setup_module(): time.sleep(1) global MRC_BUF_RCV - MRC_BUF_RCV = rmr.rmr_init(b"3564", rmr.RMR_MAX_RCV_BYTES, 0x02) + MRC_BUF_RCV = rmr.rmr_init(b"3564", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL) while rmr.rmr_ready(MRC_BUF_RCV) == 0: time.sleep(1) @@ -51,6 +52,7 @@ def teardown_module(): """ rmr.rmr_close(MRC_SEND) rmr.rmr_close(MRC_RCV) + rmr.rmr_close(MRC_BUF_RCV) def _assert_new_sbuf(sbuf): @@ -156,10 +158,12 @@ def test_rcv_timeout(): There is no unit test for rmr_rcv_msg; too dangerous, that is a blocking call that may never return. """ sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE) - sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 50) # should time out after 50ms + start_rcv_sec = time.time() + sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 500) # should wait a bit before returning summary = rmr.message_summary(sbuf_rcv) assert summary["message state"] == 12 assert summary["message status"] == "RMR_ERR_TIMEOUT" + assert(time.time() - start_rcv_sec > 0.5) # test duration should be longer than the timeout def test_send_rcv(): @@ -304,7 +308,7 @@ def test_rcv_all(): time.sleep(1) # ensure underlying transport gets cycles to send/receive bundle = helpers.rmr_rcvall_msgs_raw(MRC_BUF_RCV, [2]) # receive only message type 2 messages - assert len(bundle) == 12 # we should only get the second batch of 12 messages + assert len(bundle) == 12 # we should only get the type 2 batch of 12 messages for i, (ms, sbuf) in enumerate(bundle): # test the raw version test_summary = rmr.message_summary(sbuf) @@ -315,6 +319,17 @@ def test_rcv_all(): assert ms["payload"] == expected_pay rmr.rmr_free_msg(sbuf) + # check the timeout scenarios + start_rcv_sec = time.time() + bundle = helpers.rmr_rcvall_msgs(MRC_RCV, timeout=1001) # non-zero timeout means wait + assert len(bundle) == 0 # we should get none + assert(time.time() - start_rcv_sec > 1) # test duration should be longer than 1 second + + start_rcv_sec = time.time() + bundle = helpers.rmr_rcvall_msgs_raw(MRC_RCV, timeout=1002) # non-zero timeout means wait + assert len(bundle) == 0 # we should get none + assert(time.time() - start_rcv_sec > 1) # test duration should be longer than 1 second + def test_bad_buffer(): """test that we get a proper exception when the buffer has a null pointer"""