- gevent.sleep(0.01) # wait 10ms before we try the first recieve
- for _ in range(0, RETRY_TIMES):
- logger.debug("Seeing if return message is fufilled")
- summary = _check_if_ack_received(expected_transaction_id, expected_ack_message_type)
- if summary:
- logger.debug("Target ack Message received!: %s", summary)
- logger.debug("current queue size is %d", len(RECEIVED_MESSAGES))
- del WAITING_TRANSIDS[expected_transaction_id]
- return summary["payload"]
- else:
- logger.debug("Deffering execution for %s seconds", str(RMR_RCV_RETRY_INTERVAL / 1000))
- gevent.sleep(RMR_RCV_RETRY_INTERVAL / 1000)
-
- # we still didn't get the ACK we want
- raise ExpectedAckNotReceived()
+class RmrLoop:
+ """
+ class represents an rmr loop meant to be called as a longstanding separate thread
+ """
+
+ def __init__(self, _init_func_override=None, rcv_func_override=None):
+ self._rmr_is_ready = False
+ self._keep_going = True
+ self._init_func_override = _init_func_override # useful for unit testing
+ self._rcv_func_override = rcv_func_override # useful for unit testing to mock certain recieve scenarios
+ self._rcv_func = None
+
+ def rmr_is_ready(self):
+ """returns whether rmr has been initialized"""
+ return self._rmr_is_ready
+
+ def stop(self):
+ """sets a flag for the loop to end"""
+ self._keep_going = False
+
+ def loop(self):
+ """
+ This loop runs in an a1 thread forever, and has 3 jobs:
+ - send out any messages that have to go out (create instance, delete instance)
+ - read a1s mailbox and update the status of all instances based on acks from downstream policy handlers
+ - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
+ """
+
+ # get a context
+ mrc = self._init_func_override() if self._init_func_override else _init_rmr()
+ self._rmr_is_ready = True
+ logger.debug("Rmr is ready")
+
+ # set the receive function called below
+ self._rcv_func = (
+ self._rcv_func_override if self._rcv_func_override else lambda: helpers.rmr_rcvall_msgs(mrc, [21024])
+ )
+
+ # loop forever
+ logger.debug("Work loop starting")
+ while self._keep_going:
+ # send out all messages waiting for us
+ while not _SEND_QUEUE.empty():
+ work_item = _SEND_QUEUE.get(block=False, timeout=None)
+ _send(mrc, payload=work_item["payload"], message_type=work_item["msg type"])
+
+ # read our mailbox and update statuses
+ updated_instances = set()
+ for msg in self._rcv_func():
+ try:
+ pay = json.loads(msg["payload"])
+ pti = pay["policy_type_id"]
+ pii = pay["policy_instance_id"]
+ data.set_status(pti, pii, pay["handler_id"], pay["status"])
+ updated_instances.add((pti, pii))
+ except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError, json.decoder.JSONDecodeError):
+ # TODO: in the future we may also have to catch SDL errors
+ logger.debug(("Dropping malformed or non applicable message", msg))
+
+ # for all updated instances, see if we can trigger a delete
+ # should be no catch needed here, since the status update would have failed if it was a bad pair
+ for ut in updated_instances:
+ data.clean_up_instance(ut[0], ut[1])
+
+ # TODO: what's a reasonable sleep time? we don't want to hammer redis too much, and a1 isn't a real time component
+ time.sleep(1)
+
+
+def start_rmr_thread(init_func_override=None, rcv_func_override=None):
+ """
+ Start a1s rmr thread
+ Also called during unit testing
+ """
+ rmr_loop = RmrLoop(init_func_override, rcv_func_override)
+ thread = Thread(target=rmr_loop.loop)
+ thread.start()
+ while not rmr_loop.rmr_is_ready():
+ time.sleep(0.5)
+ return rmr_loop # return the handle; useful during unit testing