# Eclipse
.project
+.pydevproject
.settings/
:depth: 3
:local:
+[1.0.4] - 2020-05-05
+--------------------
+* Use RMR timeout on receive to avoid 100% CPU usage (`RIC-354 <https://jira.o-ran-sc.org/browse/RIC-354>`_)
+
+
[1.0.3] - 2020-04-29
--------------------
* Upgrade to RMR version 4.0.2
# ==================================================================================
# Mnemonic: helpers.py
-# Abstract: This is a colleciton of extensions to the RMR base package
-# which are likely to be convenient for python programmes.
+# Abstract: This is a collection of extensions to the RMR base package
+# which are likely to be convenient for Python programs.
# Date: 26 September 2019
# ---------------------------------------------------------------------------
from ricxappframe.rmr import rmr
-def rmr_rcvall_msgs(mrc, pass_filter=[]):
+def rmr_rcvall_msgs(mrc, pass_filter=[], timeout=0):
"""
- Assemble an array of all messages which can be received without
- blocking. Effectively draining the message queue if RMR is started
- in mt-call mode, or draining any waiting TCP buffers. If the
- pass_filter parameter is supplied it is treated as one or more message
- types to accept (pass through). Using the default, an empty list, results
- in messages with any type being captured.
+ Assembles an array of all messages which can be received without blocking
+ (but see the timeout parameter). Effectively drains the message queue if
+ RMR is started in mt-call mode, or draining any waiting TCP buffers. If
+ the pass_filter parameter is supplied, it is treated as one or more
+ message types to accept (pass through). Using the default, an empty list,
+ results in capturing all messages. if the timeout parameter is supplied,
+ this call may block up to that number of milliseconds waiting for a
+ message to arrive. Using the default, zero, results in non-blocking
+ no-wait behavior.
Parameters
----------
pass_filter: list (optional)
The message type(s) to capture.
+ timeout: int (optional)
+ The number of milliseconds to wait for a message to arrive.
+
Returns
-------
- list of dict
- List of message summaries, one for each message captured.
+ List of message summaries (dict), one for each message received; may be empty.
"""
new_messages = []
- mbuf = rmr.rmr_alloc_msg(mrc, 4096) # allocate buffer to have something for a return status
+ mbuf = rmr.rmr_alloc_msg(mrc, 4096) # allocate and reuse a single buffer for RMR
while True:
- mbuf = rmr.rmr_torcv_msg(mrc, mbuf, 0) # set the timeout to 0 so this doesn't block!!
-
+ mbuf = rmr.rmr_torcv_msg(mrc, mbuf, timeout) # first call may have non-zero timeout
+ timeout = 0 # reset so subsequent calls do not wait
summary = rmr.message_summary(mbuf)
- if summary["message status"] != "RMR_OK": # ok indicates msg received, stop on all other states
+ if summary["message status"] != "RMR_OK": # ok indicates msg received, stop on all other states; e.g., RMR_ERR_TIMEOUT
break
if len(pass_filter) == 0 or summary["message type"] in pass_filter: # no filter, or passes; capture it
new_messages.append(summary)
- rmr.rmr_free_msg(mbuf) # must free message to avoid leak
+ rmr.rmr_free_msg(mbuf) # free the single buffer to avoid leak
return new_messages
-def rmr_rcvall_msgs_raw(mrc, pass_filter=[]):
+def rmr_rcvall_msgs_raw(mrc, pass_filter=[], timeout=0):
"""
- Same as rmr_rcvall_msgs, but the raw sbuf is also returned.
- Useful, for example, if rts is to be used.
+ Same as rmr_rcvall_msgs, but answers tuples with the raw sbuf.
+ Useful if return-to-sender (rts) functions are required.
Parameters
----------
pass_filter: list (optional)
The message type(s) to capture.
+ timeout: int (optional)
+ The number of milliseconds to wait for a message to arrive.
+
Returns
-------
- list of tuple:$
- List of tuples [(S, sbuf),...] where S is a message summary and sbuf is the raw message$
- the caller is responsible for calling rmr.rmr_free_msg(sbuf) for each sbuf afterwards to prevent memory leaks.
+ list of tuple:
+ List of tuples [(S, sbuf),...] where S is a message summary (dict), and sbuf is the raw message; may be empty.
+ The caller MUST call rmr.rmr_free_msg(sbuf) when finished with each sbuf to prevent memory leaks!
"""
new_messages = []
while True:
- mbuf = rmr.rmr_alloc_msg(mrc, 4096) # allocate buffer to have something for a return status
- mbuf = rmr.rmr_torcv_msg(mrc, mbuf, 0) # set the timeout to 0 so this doesn't block!!
+ mbuf = rmr.rmr_alloc_msg(mrc, 4096) # allocate a new buffer for every message
+ mbuf = rmr.rmr_torcv_msg(mrc, mbuf, timeout) # first call may have non-zero timeout
+ timeout = 0 # reset so subsequent calls do not wait
summary = rmr.message_summary(mbuf)
- if summary["message status"] != "RMR_OK":
+ if summary["message status"] != "RMR_OK": # e.g., RMR_ERR_TIMEOUT
+ rmr.rmr_free_msg(mbuf) # free the failed-to-receive buffer
break
if len(pass_filter) == 0 or mbuf.contents.mtype in pass_filter: # no filter, or passes; capture it
- new_messages.append((summary, mbuf))
+ new_messages.append((summary, mbuf)) # caller is responsible for freeing the buffer
else:
- rmr.rmr_free_msg(mbuf)
+ rmr.rmr_free_msg(mbuf) # free the filtered-out message buffer
return new_messages
def rmr_get_messages(self):
"""
- returns a generator iterable over all current messages in the queue that have not yet been read by the client xapp
+ Returns a generator iterable over all items in the queue that have not yet been read by the client xapp.
+ Each item is a tuple (S, sbuf) where S is a message summary dict and sbuf is the raw message.
+ The caller MUST call rmr.rmr_free_msg(sbuf) when finished with each sbuf to prevent memory leaks!
"""
while not self._rmr_loop.rcv_queue.empty():
(summary, sbuf) = self._rmr_loop.rcv_queue.get()
class RmrLoop:
"""
- Class represents an rmr loop that constantly reads from rmr
+ Class represents an RMR loop that constantly reads from RMR.
- Note, we use a queue here, and a thread, rather than the xapp frame just looping and calling consume, so that a possibly slow running consume function does not block the reading of new messages
+ Note, we use a queue here, and a thread, rather than the xapp frame just looping
+ and calling consume, so that a possibly slow running consume function does not
+ block the reading of new messages
"""
def __init__(self, port, wait_for_ready=True):
"""
- sets up rmr, then launches a thread that reads and injects messages into a queue
+ sets up RMR, then launches a thread that reads and injects messages into a queue.
Parameters
----------
port to listen on
wait_for_ready: bool (optional)
- if this is True, then this function hangs until rmr is ready to send, which includes having a valid routing file.
- this can be set to False if the client only wants to *receive only*
+ If True, then this function hangs until RMR is ready to send, which includes
+ having a valid routing file. This can be set to False if the client only wants
+ to *receive only*.
"""
# Public
# thread safe queue https://docs.python.org/3/library/queue.html
- # We use a thread and a queue so that a long running consume callback function can never block reads.
- # IE a consume implementation could take a long time and the ring size for rmr blows up here and messages are lost
+ # We use a thread and a queue so that a long running consume callback function can
+ # never block reads. IE a consume implementation could take a long time and the ring
+ # size for rmr blows up here and messages are lost.
self.rcv_queue = queue.Queue()
- # rmr context; RMRFL_MTCALL puts RMR into a multithreaded mode, where a thread populates a ring of messages that receive calls read from
+ # RMR context; RMRFL_MTCALL puts RMR into a multithreaded mode, where a thread
+ # populates a ring of messages that receive calls read from
self.mrc = rmr.rmr_init(str(port).encode(), rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
if wait_for_ready:
time.sleep(0.1)
# Private
- self._keep_going = True # used to tell this thread to stop it's work
+ self._keep_going = True # used to tell this thread to stop
self._last_ran = time.time() # used for healthcheck
self._loop_is_running = False # used in stop to know when it's safe to kill the mrc
- # start the work loop
- mdc_logger.debug("Starting loop thread")
-
def loop():
- mdc_logger.debug("Work loop starting")
+ mdc_logger.debug("Work loop starts")
self._loop_is_running = True
while self._keep_going:
# read our mailbox
# TODO: take a flag as to whether RAW is needed or not
# RAW allows for RTS however the caller must free, and the caller may not need RTS.
- # Currently after consuming, callers should do rmr.rmr_free_msg(sbuf)
+ # Currently after consuming, callers should do rmr.rmr_free_msg(sbuf)
- for (msg, sbuf) in helpers.rmr_rcvall_msgs_raw(self.mrc):
+ for (msg, sbuf) in helpers.rmr_rcvall_msgs_raw(self.mrc, timeout=1000):
self.rcv_queue.put((msg, sbuf))
self._last_ran = time.time()
self._loop_is_running = False
+ mdc_logger.debug("Work loop ends")
+ # start the work loop
+ mdc_logger.debug("Starting loop thread")
self._thread = Thread(target=loop)
self._thread.start()
"""
sets a flag that will cleanly stop the thread
"""
- mdc_logger.debug("Stopping rmr thread. Waiting for last iteration to finish..")
+ mdc_logger.debug("Stopping RMR thread. Waiting for last iteration to finish..")
self._keep_going = False
# wait until the current batch of messages is done, then kill the rmr connection
- # note; I debated putting this in "loop" however if the while loop was still going setting mrc to close here would blow up any processing still currently happening
- # probably more polite to at least finish the current batch and then close. So here we wait until the current batch is done, then we kill the mrc
+ # note; I debated putting this in "loop" however if the while loop was still going
+ # setting mrc to close here would blow up any processing still currently happening
+ # probably more polite to at least finish the current batch and then close. So here
+ # we wait until the current batch is done, then we kill the mrc
while self._loop_is_running:
- pass
+ time.sleep(0.5)
mdc_logger.debug("Closing rmr connection")
rmr.rmr_close(self.mrc)
setup(
name="ricxappframe",
- version="1.0.3",
+ version="1.0.4",
packages=find_packages(exclude=["tests.*", "tests"]),
author="Tommy Carpenter, E. Scott Daniels",
description="Xapp and RMR framework for python",
SIZE = 256
MRC_SEND = None
MRC_RCV = None
+MRC_BUF_RCV = None
def setup_module():
time.sleep(1)
global MRC_BUF_RCV
- MRC_BUF_RCV = rmr.rmr_init(b"3564", rmr.RMR_MAX_RCV_BYTES, 0x02)
+ MRC_BUF_RCV = rmr.rmr_init(b"3564", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
while rmr.rmr_ready(MRC_BUF_RCV) == 0:
time.sleep(1)
"""
rmr.rmr_close(MRC_SEND)
rmr.rmr_close(MRC_RCV)
+ rmr.rmr_close(MRC_BUF_RCV)
def _assert_new_sbuf(sbuf):
There is no unit test for rmr_rcv_msg; too dangerous, that is a blocking call that may never return.
"""
sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
- sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 50) # should time out after 50ms
+ start_rcv_sec = time.time()
+ sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 500) # should wait a bit before returning
summary = rmr.message_summary(sbuf_rcv)
assert summary["message state"] == 12
assert summary["message status"] == "RMR_ERR_TIMEOUT"
+ assert(time.time() - start_rcv_sec > 0.5) # test duration should be longer than the timeout
def test_send_rcv():
time.sleep(1) # ensure underlying transport gets cycles to send/receive
bundle = helpers.rmr_rcvall_msgs_raw(MRC_BUF_RCV, [2]) # receive only message type 2 messages
- assert len(bundle) == 12 # we should only get the second batch of 12 messages
+ assert len(bundle) == 12 # we should only get the type 2 batch of 12 messages
for i, (ms, sbuf) in enumerate(bundle): # test the raw version
test_summary = rmr.message_summary(sbuf)
assert ms["payload"] == expected_pay
rmr.rmr_free_msg(sbuf)
+ # check the timeout scenarios
+ start_rcv_sec = time.time()
+ bundle = helpers.rmr_rcvall_msgs(MRC_RCV, timeout=1001) # non-zero timeout means wait
+ assert len(bundle) == 0 # we should get none
+ assert(time.time() - start_rcv_sec > 1) # test duration should be longer than 1 second
+
+ start_rcv_sec = time.time()
+ bundle = helpers.rmr_rcvall_msgs_raw(MRC_RCV, timeout=1002) # non-zero timeout means wait
+ assert len(bundle) == 0 # we should get none
+ assert(time.time() - start_rcv_sec > 1) # test duration should be longer than 1 second
+
def test_bad_buffer():
"""test that we get a proper exception when the buffer has a null pointer"""