X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=ricxappframe%2Fxapp_rmr.py;h=2a35b577db223e845dc6b5eef3ac6dfd105bc462;hb=HEAD;hp=d18b4c031bfcd4ab0a3197d88927558b9bd5cdc8;hpb=3a6ac016f65db3fc255f950f96f4768470d584d8;p=ric-plt%2Fxapp-frame-py.git diff --git a/ricxappframe/xapp_rmr.py b/ricxappframe/xapp_rmr.py index d18b4c0..2a35b57 100644 --- a/ricxappframe/xapp_rmr.py +++ b/ricxappframe/xapp_rmr.py @@ -1,7 +1,3 @@ -""" -Contains rmr functionality specific to the xapp -The general rmr API is via "rmr" -""" # ================================================================================== # Copyright (c) 2020 Nokia # Copyright (c) 2020 AT&T Intellectual Property. @@ -19,6 +15,10 @@ The general rmr API is via "rmr" # limitations under the License. # ================================================================================== +""" +Contains RMR functionality specific to the xapp. +The general rmr API is via "rmr" +""" import time import queue @@ -32,14 +32,17 @@ mdc_logger = Logger(name=__name__) class RmrLoop: """ - Class represents an rmr loop that constantly reads from rmr + Class represents an RMR loop that constantly reads from RMR. - Note, we use a queue here, and a thread, rather than the xapp frame just looping and calling consume, so that a possibly slow running consume function does not block the reading of new messages + Note, we use a queue here, and a thread, rather than the xapp + frame just looping and calling consume, so that a possibly slow + running consume function does not block the reading of new messages. """ def __init__(self, port, wait_for_ready=True): """ - sets up rmr, then launches a thread that reads and injects messages into a queue + sets up RMR, then launches a thread that reads and injects + messages into a queue. Parameters ---------- @@ -47,17 +50,20 @@ class RmrLoop: port to listen on wait_for_ready: bool (optional) - if this is True, then this function hangs until rmr is ready to send, which includes having a valid routing file. - this can be set to False if the client only wants to *receive only* + If True, then this function hangs until RMR is ready to + send, which includes having a valid routing file. This can + be set to False if the client only wants to *receive only*. """ # Public # thread safe queue https://docs.python.org/3/library/queue.html - # We use a thread and a queue so that a long running consume callback function can never block reads. - # IE a consume implementation could take a long time and the ring size for rmr blows up here and messages are lost + # We use a thread and a queue so that a long running consume callback function can + # never block reads. IE a consume implementation could take a long time and the ring + # size for rmr blows up here and messages are lost. self.rcv_queue = queue.Queue() - # rmr context; RMRFL_MTCALL puts RMR into a multithreaded mode, where a thread populates a ring of messages that receive calls read from + # RMR context; RMRFL_MTCALL puts RMR into a multithreaded mode, where a thread + # populates a ring of messages that receive calls read from self.mrc = rmr.rmr_init(str(port).encode(), rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL) if wait_for_ready: @@ -66,30 +72,33 @@ class RmrLoop: time.sleep(0.1) # Private - self._keep_going = True # used to tell this thread to stop it's work + self._keep_going = True # used to tell this thread to stop self._last_ran = time.time() # used for healthcheck self._loop_is_running = False # used in stop to know when it's safe to kill the mrc - # start the work loop - mdc_logger.debug("Starting loop thread") - def loop(): - mdc_logger.debug("Work loop starting") + mdc_logger.debug("Work loop starts") self._loop_is_running = True while self._keep_going: # read our mailbox # TODO: take a flag as to whether RAW is needed or not - # RAW allows for RTS however the caller must free, and the caller may not need RTS. - # Currently after consuming, callers should do rmr.rmr_free_msg(sbuf) - - for (msg, sbuf) in helpers.rmr_rcvall_msgs_raw(self.mrc): + # RAW allows for RTS however the caller must free, and + # the caller may not need RTS. Currently after + # consuming, callers must call rmr.rmr_free_msg(sbuf) + # Use a non-trivial timeout to avoid spinning the CPU. + # The function returns if no messages arrive for that + # interval, which allows a stop request to be processed. + for (msg, sbuf) in helpers.rmr_rcvall_msgs_raw(self.mrc, timeout=5000): self.rcv_queue.put((msg, sbuf)) self._last_ran = time.time() self._loop_is_running = False + mdc_logger.debug("Work loop ends") + # start the work loop + mdc_logger.debug("Starting loop thread") self._thread = Thread(target=loop) self._thread.start() @@ -97,14 +106,19 @@ class RmrLoop: """ sets a flag that will cleanly stop the thread """ - mdc_logger.debug("Stopping rmr thread. Waiting for last iteration to finish..") + # wait until the current batch of messages is done, then kill + # the rmr connection. I debated putting this in "loop" however + # if the while loop was still going, setting mrc to close here + # would blow up any processing still currently happening. + # Probably more polite to at least finish the current batch + # and then close. So here we wait until the current batch is + # done, then we kill the mrc. + mdc_logger.debug("Setting flag to end RMR work loop.") self._keep_going = False - # wait until the current batch of messages is done, then kill the rmr connection - # note; I debated putting this in "loop" however if the while loop was still going setting mrc to close here would blow up any processing still currently happening - # probably more polite to at least finish the current batch and then close. So here we wait until the current batch is done, then we kill the mrc while self._loop_is_running: - pass - mdc_logger.debug("Closing rmr connection") + time.sleep(1) + mdc_logger.debug("Waiting for RMR work loop to end") + mdc_logger.debug("Closing RMR connection") rmr.rmr_close(self.mrc) def healthcheck(self, seconds=30):