+ def _assert_good_send(self, sbuf, pre_send_summary):
+ """
+ common helper function for _send_msg and _rts_msg
+ """
+ post_send_summary = rmr.message_summary(sbuf)
+ if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
+ return True
+ mdc_logger.debug("Message NOT sent!")
+ mdc_logger.debug("Pre-send summary: {0}, Post-send summary: {1}".format(pre_send_summary, post_send_summary))
+ return False
+
+ def _send_msg(self, pay, mtype, subid):
+ """
+ sends a msg
+ """
+ for _ in range(0, RETRY_TIMES):
+ sbuf = rmr.rmr_alloc_msg(self.mrc, len(pay), payload=pay, gen_transaction_id=True, mtype=mtype, sub_id=subid)
+ sbuf.contents.sub_id = subid
+ pre_send_summary = rmr.message_summary(sbuf)
+ mdc_logger.debug("Trying to send message: {}".format(pre_send_summary))
+ sbuf = rmr.rmr_send_msg(self.mrc, sbuf) # send
+ if self._assert_good_send(sbuf, pre_send_summary):
+ rmr.rmr_free_msg(sbuf) # free
+ return
+
+ mdc_logger.debug("A1 did NOT send the message successfully after {} retries!".format(RETRY_TIMES))
+
+ def _rts_msg(self, pay, sbuf_rts, mtype):
+ """
+ sends a message using rts
+ we do not call free here because we may rts many times; it is called after the rts loop
+ """
+ for _ in range(0, RETRY_TIMES):
+ pre_send_summary = rmr.message_summary(sbuf_rts)
+ sbuf_rts = rmr.rmr_rts_msg(self.mrc, sbuf_rts, payload=pay, mtype=mtype)
+ if self._assert_good_send(sbuf_rts, pre_send_summary):
+ break
+ return sbuf_rts # in some cases rts may return a new sbuf
+
+ def _handle_sends(self):
+ # send out all messages waiting for us
+ while not self.instance_send_queue.empty():
+ work_item = self.instance_send_queue.get(block=False, timeout=None)
+ payload = json.dumps(messages.a1_to_handler(*work_item)).encode("utf-8")
+ self._send_msg(payload, A1_POLICY_REQUEST, work_item[1])
+