-"""
-Contains rmr functionality specific to the xapp
-The general rmr API is via "rmr"
-"""
# ==================================================================================
# Copyright (c) 2020 Nokia
# Copyright (c) 2020 AT&T Intellectual Property.
# limitations under the License.
# ==================================================================================
+"""
+Contains RMR functionality specific to the xapp.
+The general rmr API is via "rmr"
+"""
import time
import queue
"""
Class represents an RMR loop that constantly reads from RMR.
- Note, we use a queue here, and a thread, rather than the xapp frame just looping
- and calling consume, so that a possibly slow running consume function does not
- block the reading of new messages
+ Note, we use a queue here, and a thread, rather than the xapp
+ frame just looping and calling consume, so that a possibly slow
+ running consume function does not block the reading of new messages.
"""
def __init__(self, port, wait_for_ready=True):
"""
- sets up RMR, then launches a thread that reads and injects messages into a queue.
+ sets up RMR, then launches a thread that reads and injects
+ messages into a queue.
Parameters
----------
port to listen on
wait_for_ready: bool (optional)
- If True, then this function hangs until RMR is ready to send, which includes
- having a valid routing file. This can be set to False if the client only wants
- to *receive only*.
+ If True, then this function hangs until RMR is ready to
+ send, which includes having a valid routing file. This can
+ be set to False if the client only wants to *receive only*.
"""
# Public
# read our mailbox
# TODO: take a flag as to whether RAW is needed or not
- # RAW allows for RTS however the caller must free, and the caller may not need RTS.
- # Currently after consuming, callers should do rmr.rmr_free_msg(sbuf)
-
- for (msg, sbuf) in helpers.rmr_rcvall_msgs_raw(self.mrc, timeout=1000):
+ # RAW allows for RTS however the caller must free, and
+ # the caller may not need RTS. Currently after
+ # consuming, callers must call rmr.rmr_free_msg(sbuf)
+ # Use a non-trivial timeout to avoid spinning the CPU.
+ # The function returns if no messages arrive for that
+ # interval, which allows a stop request to be processed.
+ for (msg, sbuf) in helpers.rmr_rcvall_msgs_raw(self.mrc, timeout=5000):
self.rcv_queue.put((msg, sbuf))
self._last_ran = time.time()
"""
sets a flag that will cleanly stop the thread
"""
- mdc_logger.debug("Stopping RMR thread. Waiting for last iteration to finish..")
+ # wait until the current batch of messages is done, then kill
+ # the rmr connection. I debated putting this in "loop" however
+ # if the while loop was still going, setting mrc to close here
+ # would blow up any processing still currently happening.
+ # Probably more polite to at least finish the current batch
+ # and then close. So here we wait until the current batch is
+ # done, then we kill the mrc.
+ mdc_logger.debug("Setting flag to end RMR work loop.")
self._keep_going = False
- # wait until the current batch of messages is done, then kill the rmr connection
- # note; I debated putting this in "loop" however if the while loop was still going
- # setting mrc to close here would blow up any processing still currently happening
- # probably more polite to at least finish the current batch and then close. So here
- # we wait until the current batch is done, then we kill the mrc
while self._loop_is_running:
- time.sleep(0.5)
- mdc_logger.debug("Closing rmr connection")
+ time.sleep(1)
+ mdc_logger.debug("Waiting for RMR work loop to end")
+ mdc_logger.debug("Closing RMR connection")
rmr.rmr_close(self.mrc)
def healthcheck(self, seconds=30):