class RmrLoop:
"""
- Class represents an rmr loop that constantly reads from rmr
+ Class represents an RMR loop that constantly reads from RMR.
- Note, we use a queue here, and a thread, rather than the xapp frame just looping and calling consume, so that a possibly slow running consume function does not block the reading of new messages
+ Note, we use a queue here, and a thread, rather than the xapp frame just looping
+ and calling consume, so that a possibly slow running consume function does not
+ block the reading of new messages
"""
def __init__(self, port, wait_for_ready=True):
"""
- sets up rmr, then launches a thread that reads and injects messages into a queue
+ sets up RMR, then launches a thread that reads and injects messages into a queue.
Parameters
----------
port to listen on
wait_for_ready: bool (optional)
- if this is True, then this function hangs until rmr is ready to send, which includes having a valid routing file.
- this can be set to False if the client only wants to *receive only*
+ If True, then this function hangs until RMR is ready to send, which includes
+ having a valid routing file. This can be set to False if the client only wants
+ to *receive only*.
"""
# Public
# thread safe queue https://docs.python.org/3/library/queue.html
- # We use a thread and a queue so that a long running consume callback function can never block reads.
- # IE a consume implementation could take a long time and the ring size for rmr blows up here and messages are lost
+ # We use a thread and a queue so that a long running consume callback function can
+ # never block reads. IE a consume implementation could take a long time and the ring
+ # size for rmr blows up here and messages are lost.
self.rcv_queue = queue.Queue()
- # rmr context; RMRFL_MTCALL puts RMR into a multithreaded mode, where a thread populates a ring of messages that receive calls read from
+ # RMR context; RMRFL_MTCALL puts RMR into a multithreaded mode, where a thread
+ # populates a ring of messages that receive calls read from
self.mrc = rmr.rmr_init(str(port).encode(), rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
if wait_for_ready:
time.sleep(0.1)
# Private
- self._keep_going = True # used to tell this thread to stop it's work
+ self._keep_going = True # used to tell this thread to stop
self._last_ran = time.time() # used for healthcheck
self._loop_is_running = False # used in stop to know when it's safe to kill the mrc
- # start the work loop
- mdc_logger.debug("Starting loop thread")
-
def loop():
- mdc_logger.debug("Work loop starting")
+ mdc_logger.debug("Work loop starts")
self._loop_is_running = True
while self._keep_going:
# read our mailbox
# TODO: take a flag as to whether RAW is needed or not
# RAW allows for RTS however the caller must free, and the caller may not need RTS.
- # Currently after consuming, callers should do rmr.rmr_free_msg(sbuf)
+ # Currently after consuming, callers should do rmr.rmr_free_msg(sbuf)
- for (msg, sbuf) in helpers.rmr_rcvall_msgs_raw(self.mrc):
+ for (msg, sbuf) in helpers.rmr_rcvall_msgs_raw(self.mrc, timeout=1000):
self.rcv_queue.put((msg, sbuf))
self._last_ran = time.time()
self._loop_is_running = False
+ mdc_logger.debug("Work loop ends")
+ # start the work loop
+ mdc_logger.debug("Starting loop thread")
self._thread = Thread(target=loop)
self._thread.start()
"""
sets a flag that will cleanly stop the thread
"""
- mdc_logger.debug("Stopping rmr thread. Waiting for last iteration to finish..")
+ mdc_logger.debug("Stopping RMR thread. Waiting for last iteration to finish..")
self._keep_going = False
# wait until the current batch of messages is done, then kill the rmr connection
- # note; I debated putting this in "loop" however if the while loop was still going setting mrc to close here would blow up any processing still currently happening
- # probably more polite to at least finish the current batch and then close. So here we wait until the current batch is done, then we kill the mrc
+ # note; I debated putting this in "loop" however if the while loop was still going
+ # setting mrc to close here would blow up any processing still currently happening
+ # probably more polite to at least finish the current batch and then close. So here
+ # we wait until the current batch is done, then we kill the mrc
while self._loop_is_running:
- pass
+ time.sleep(0.5)
mdc_logger.debug("Closing rmr connection")
rmr.rmr_close(self.mrc)