X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=ricxappframe%2Fxapp_rmr.py;fp=ricxappframe%2Fxapp_rmr.py;h=6092e141c6ea671ce8a330d5c2c861651c815f0f;hb=666e8319bd0e618576be79a14208d7eaf0de99f2;hp=d18b4c031bfcd4ab0a3197d88927558b9bd5cdc8;hpb=aa29841d6c673640fb395663be0e07f470682e2f;p=ric-plt%2Fxapp-frame-py.git diff --git a/ricxappframe/xapp_rmr.py b/ricxappframe/xapp_rmr.py index d18b4c0..6092e14 100644 --- a/ricxappframe/xapp_rmr.py +++ b/ricxappframe/xapp_rmr.py @@ -32,14 +32,16 @@ mdc_logger = Logger(name=__name__) class RmrLoop: """ - Class represents an rmr loop that constantly reads from rmr + Class represents an RMR loop that constantly reads from RMR. - Note, we use a queue here, and a thread, rather than the xapp frame just looping and calling consume, so that a possibly slow running consume function does not block the reading of new messages + Note, we use a queue here, and a thread, rather than the xapp frame just looping + and calling consume, so that a possibly slow running consume function does not + block the reading of new messages """ def __init__(self, port, wait_for_ready=True): """ - sets up rmr, then launches a thread that reads and injects messages into a queue + sets up RMR, then launches a thread that reads and injects messages into a queue. Parameters ---------- @@ -47,17 +49,20 @@ class RmrLoop: port to listen on wait_for_ready: bool (optional) - if this is True, then this function hangs until rmr is ready to send, which includes having a valid routing file. - this can be set to False if the client only wants to *receive only* + If True, then this function hangs until RMR is ready to send, which includes + having a valid routing file. This can be set to False if the client only wants + to *receive only*. """ # Public # thread safe queue https://docs.python.org/3/library/queue.html - # We use a thread and a queue so that a long running consume callback function can never block reads. - # IE a consume implementation could take a long time and the ring size for rmr blows up here and messages are lost + # We use a thread and a queue so that a long running consume callback function can + # never block reads. IE a consume implementation could take a long time and the ring + # size for rmr blows up here and messages are lost. self.rcv_queue = queue.Queue() - # rmr context; RMRFL_MTCALL puts RMR into a multithreaded mode, where a thread populates a ring of messages that receive calls read from + # RMR context; RMRFL_MTCALL puts RMR into a multithreaded mode, where a thread + # populates a ring of messages that receive calls read from self.mrc = rmr.rmr_init(str(port).encode(), rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL) if wait_for_ready: @@ -66,30 +71,30 @@ class RmrLoop: time.sleep(0.1) # Private - self._keep_going = True # used to tell this thread to stop it's work + self._keep_going = True # used to tell this thread to stop self._last_ran = time.time() # used for healthcheck self._loop_is_running = False # used in stop to know when it's safe to kill the mrc - # start the work loop - mdc_logger.debug("Starting loop thread") - def loop(): - mdc_logger.debug("Work loop starting") + mdc_logger.debug("Work loop starts") self._loop_is_running = True while self._keep_going: # read our mailbox # TODO: take a flag as to whether RAW is needed or not # RAW allows for RTS however the caller must free, and the caller may not need RTS. - # Currently after consuming, callers should do rmr.rmr_free_msg(sbuf) + # Currently after consuming, callers should do rmr.rmr_free_msg(sbuf) - for (msg, sbuf) in helpers.rmr_rcvall_msgs_raw(self.mrc): + for (msg, sbuf) in helpers.rmr_rcvall_msgs_raw(self.mrc, timeout=1000): self.rcv_queue.put((msg, sbuf)) self._last_ran = time.time() self._loop_is_running = False + mdc_logger.debug("Work loop ends") + # start the work loop + mdc_logger.debug("Starting loop thread") self._thread = Thread(target=loop) self._thread.start() @@ -97,13 +102,15 @@ class RmrLoop: """ sets a flag that will cleanly stop the thread """ - mdc_logger.debug("Stopping rmr thread. Waiting for last iteration to finish..") + mdc_logger.debug("Stopping RMR thread. Waiting for last iteration to finish..") self._keep_going = False # wait until the current batch of messages is done, then kill the rmr connection - # note; I debated putting this in "loop" however if the while loop was still going setting mrc to close here would blow up any processing still currently happening - # probably more polite to at least finish the current batch and then close. So here we wait until the current batch is done, then we kill the mrc + # note; I debated putting this in "loop" however if the while loop was still going + # setting mrc to close here would blow up any processing still currently happening + # probably more polite to at least finish the current batch and then close. So here + # we wait until the current batch is done, then we kill the mrc while self._loop_is_running: - pass + time.sleep(0.5) mdc_logger.debug("Closing rmr connection") rmr.rmr_close(self.mrc)