X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=ricxappframe%2Fxapp_rmr.py;h=2a35b577db223e845dc6b5eef3ac6dfd105bc462;hb=bbc9028aa34ae48e7806596cd05fbe7a5bfd7fb8;hp=93eaeeb7523628aaa0caf69e6e8ef4d4dea9f189;hpb=df048193df5fc166961a01d8fc8a2248a649ba53;p=ric-plt%2Fxapp-frame-py.git diff --git a/ricxappframe/xapp_rmr.py b/ricxappframe/xapp_rmr.py index 93eaeeb..2a35b57 100644 --- a/ricxappframe/xapp_rmr.py +++ b/ricxappframe/xapp_rmr.py @@ -34,14 +34,15 @@ class RmrLoop: """ Class represents an RMR loop that constantly reads from RMR. - Note, we use a queue here, and a thread, rather than the xapp frame just looping - and calling consume, so that a possibly slow running consume function does not - block the reading of new messages + Note, we use a queue here, and a thread, rather than the xapp + frame just looping and calling consume, so that a possibly slow + running consume function does not block the reading of new messages. """ def __init__(self, port, wait_for_ready=True): """ - sets up RMR, then launches a thread that reads and injects messages into a queue. + sets up RMR, then launches a thread that reads and injects + messages into a queue. Parameters ---------- @@ -49,9 +50,9 @@ class RmrLoop: port to listen on wait_for_ready: bool (optional) - If True, then this function hangs until RMR is ready to send, which includes - having a valid routing file. This can be set to False if the client only wants - to *receive only*. + If True, then this function hangs until RMR is ready to + send, which includes having a valid routing file. This can + be set to False if the client only wants to *receive only*. """ # Public @@ -82,10 +83,13 @@ class RmrLoop: # read our mailbox # TODO: take a flag as to whether RAW is needed or not - # RAW allows for RTS however the caller must free, and the caller may not need RTS. - # Currently after consuming, callers should do rmr.rmr_free_msg(sbuf) - - for (msg, sbuf) in helpers.rmr_rcvall_msgs_raw(self.mrc, timeout=1000): + # RAW allows for RTS however the caller must free, and + # the caller may not need RTS. Currently after + # consuming, callers must call rmr.rmr_free_msg(sbuf) + # Use a non-trivial timeout to avoid spinning the CPU. + # The function returns if no messages arrive for that + # interval, which allows a stop request to be processed. + for (msg, sbuf) in helpers.rmr_rcvall_msgs_raw(self.mrc, timeout=5000): self.rcv_queue.put((msg, sbuf)) self._last_ran = time.time() @@ -102,16 +106,19 @@ class RmrLoop: """ sets a flag that will cleanly stop the thread """ - mdc_logger.debug("Stopping RMR thread. Waiting for last iteration to finish..") + # wait until the current batch of messages is done, then kill + # the rmr connection. I debated putting this in "loop" however + # if the while loop was still going, setting mrc to close here + # would blow up any processing still currently happening. + # Probably more polite to at least finish the current batch + # and then close. So here we wait until the current batch is + # done, then we kill the mrc. + mdc_logger.debug("Setting flag to end RMR work loop.") self._keep_going = False - # wait until the current batch of messages is done, then kill the rmr connection - # note; I debated putting this in "loop" however if the while loop was still going - # setting mrc to close here would blow up any processing still currently happening - # probably more polite to at least finish the current batch and then close. So here - # we wait until the current batch is done, then we kill the mrc while self._loop_is_running: - time.sleep(0.5) - mdc_logger.debug("Closing rmr connection") + time.sleep(1) + mdc_logger.debug("Waiting for RMR work loop to end") + mdc_logger.debug("Closing RMR connection") rmr.rmr_close(self.mrc) def healthcheck(self, seconds=30):