- """
- new_messages = _dequeue_all_waiting_messages() # dequeue all waiting messages
- global RECEIVED_MESSAGES # this is ugly, but fine.. we just need an in memory list across the async calls
- RECEIVED_MESSAGES += new_messages
- for index, summary in enumerate(RECEIVED_MESSAGES): # Search the queue for the target message
- if (
- summary["message state"] == 0
- and summary["message status"] == "RMR_OK"
- and summary["message type"] == target_type
- and summary["transaction id"] == target_transid
- ): # Found; delete it from queue
- del RECEIVED_MESSAGES[index]
- return summary
- return None
-
-
-def init_rmr():
- """
- called from run; not called for unit tests
- """
- global MRC
- MRC = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, 0x00)
+ def _assert_good_send(self, sbuf, pre_send_summary):
+ """
+ common helper function for _send_msg and _rts_msg
+ """
+ post_send_summary = rmr.message_summary(sbuf)
+ if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
+ return True
+ mdc_logger.debug("Message NOT sent!")
+ mdc_logger.debug("Pre-send summary: {0}, Post-send summary: {1}".format(pre_send_summary, post_send_summary))
+ return False
+
+ def _send_msg(self, pay, mtype, subid):
+ """
+ sends a msg
+ """
+ for _ in range(0, RETRY_TIMES):
+ sbuf = rmr.rmr_alloc_msg(self.mrc, len(pay), payload=pay, gen_transaction_id=True, mtype=mtype, sub_id=subid)
+ sbuf.contents.sub_id = subid
+ pre_send_summary = rmr.message_summary(sbuf)
+ mdc_logger.debug("Trying to send message: {}".format(pre_send_summary))
+ sbuf = rmr.rmr_send_msg(self.mrc, sbuf) # send
+ if self._assert_good_send(sbuf, pre_send_summary):
+ rmr.rmr_free_msg(sbuf) # free
+ return
+
+ mdc_logger.debug("A1 did NOT send the message successfully after {} retries!".format(RETRY_TIMES))
+
+ def _rts_msg(self, pay, sbuf_rts, mtype):
+ """
+ sends a message using rts
+ we do not call free here because we may rts many times; it is called after the rts loop
+ """
+ for _ in range(0, RETRY_TIMES):
+ pre_send_summary = rmr.message_summary(sbuf_rts)
+ sbuf_rts = rmr.rmr_rts_msg(self.mrc, sbuf_rts, payload=pay, mtype=mtype)
+ if self._assert_good_send(sbuf_rts, pre_send_summary):
+ break
+ return sbuf_rts # in some cases rts may return a new sbuf
+
+ def _handle_sends(self):
+ # send out all messages waiting for us
+ while not self.instance_send_queue.empty():
+ work_item = self.instance_send_queue.get(block=False, timeout=None)
+ payload = json.dumps(messages.a1_to_handler(*work_item)).encode("utf-8")
+ self._send_msg(payload, A1_POLICY_REQUEST, work_item[1])
+
+ def loop(self):
+ """
+ This loop runs forever, and has 3 jobs:
+ - send out any messages that have to go out (create instance, delete instance)
+ - read a1s mailbox and update the status of all instances based on acks from downstream policy handlers
+ - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
+ """
+ # loop forever
+ mdc_logger.debug("Work loop starting")
+ while self.keep_going:
+
+ # Update 3/20/2020
+ # We now handle our sends in a thread (that will just exit when it's done) because there is a difference between how send works in SI95 vs NNG.
+ # Send_msg via NNG formerly never blocked.
+ # However under SI95 this send may block for some arbitrary period of time on the first send to an endpoint for which a connection is not established
+ # If this send takes too long, this loop blocks, and the healthcheck will fail, which will cause A1s healthcheck to fail, which will cause Kubernetes to whack A1 and all kinds of horrible things happen.
+ # Therefore, now under SI95, we thread this.
+ Thread(target=self._handle_sends).start()
+
+ # read our mailbox
+ for (msg, sbuf) in self.rcv_func():
+ # TODO: in the future we may also have to catch SDL errors
+ try:
+ mtype = msg["message type"]
+ except (KeyError, TypeError, json.decoder.JSONDecodeError):
+ mdc_logger.debug("Dropping malformed policy ack/query message: {0}".format(msg))