+
+ def __init__(self, real_init=True):
+ self._rmr_is_ready = False
+ self._keep_going = True
+ self._real_init = real_init # useful for unit testing to turn off initialization
+
+ def rmr_is_ready(self):
+ """returns whether rmr has been initialized"""
+ return self._rmr_is_ready
+
+ def stop(self):
+ """sets a flag for the loop to end"""
+ self._keep_going = False
+
+ def loop(self):
+ """
+ This loop runs in an a1 thread forever, and has 3 jobs:
+ - send out any messages that have to go out (create instance, delete instance)
+ - read a1s mailbox and update the status of all instances based on acks from downstream policy handlers
+ - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
+ """
+
+ # get a context
+ mrc = None
+ logger.debug("Waiting for rmr to initialize...")
+ if self._real_init:
+ mrc = _init_rmr()
+ self._rmr_is_ready = True
+ logger.debug("Rmr is ready")
+
+ # loop forever
+ logger.debug("Work loop starting")
+ while self._keep_going:
+ """
+ We never raise an exception here. Log and keep moving
+ Bugs will eventually be caught be examining logs.
+ """
+ try:
+ # First, send out all messages waiting for us
+ while not _SEND_QUEUE.empty():
+ work_item = _SEND_QUEUE.get(block=False, timeout=None)
+ _send(mrc, payload=work_item["payload"], message_type=work_item["msg type"])
+
+ # Next, update all statuses waiting in a1s mailbox
+ _update_all_statuses(mrc)
+
+ # TODO: next body of work is to try to clean up the database for any updated statuses
+
+ except Exception as e:
+ logger.debug("Polling thread encountered an unexpected exception, but it will continue:")
+ logger.exception(e)
+
+ time.sleep(1)