+ def _assert_good_send(self, sbuf, pre_send_summary):
+ """
+ Extracts the send result and logs a detailed warning if the send failed.
+ Returns the message state, an integer that indicates the result.
+ """
+ post_send_summary = rmr.message_summary(sbuf)
+ if post_send_summary[rmr.RMR_MS_MSG_STATE] != rmr.RMR_OK:
+ mdc_logger.warning("RMR send failed; pre-send summary: {0}, post-send summary: {1}".format(pre_send_summary, post_send_summary))
+ return post_send_summary[rmr.RMR_MS_MSG_STATE]
+
+ def _send_msg(self, pay, mtype, subid):
+ """
+ Creates and sends a message via RMR's send-message feature with the specified payload
+ using the specified message type and subscription ID.
+ """
+ sbuf = rmr.rmr_alloc_msg(self.mrc, len(pay), payload=pay, gen_transaction_id=True, mtype=mtype, sub_id=subid)
+ sbuf.contents.sub_id = subid
+ pre_send_summary = rmr.message_summary(sbuf)
+ for _ in range(0, RETRY_TIMES):
+ mdc_logger.debug("_send_msg: sending: {}".format(pre_send_summary))
+ sbuf = rmr.rmr_send_msg(self.mrc, sbuf)
+ msg_state = self._assert_good_send(sbuf, pre_send_summary)
+ mdc_logger.debug("_send_msg: result message state: {}".format(msg_state))
+ if msg_state != rmr.RMR_ERR_RETRY:
+ break
+
+ rmr.rmr_free_msg(sbuf)
+ if msg_state != rmr.RMR_OK:
+ mdc_logger.warning("_send_msg: failed after {} retries".format(RETRY_TIMES))
+
+ def _rts_msg(self, pay, sbuf_rts, mtype):
+ """
+ Sends a message via RMR's return-to-sender feature.
+ This neither allocates nor frees a message buffer because we may rts many times.
+ Returns the message buffer from the RTS function, which may reallocate it.
+ """
+ pre_send_summary = rmr.message_summary(sbuf_rts)
+ for _ in range(0, RETRY_TIMES):
+ mdc_logger.debug("_rts_msg: sending: {}".format(pre_send_summary))
+ sbuf_rts = rmr.rmr_rts_msg(self.mrc, sbuf_rts, payload=pay, mtype=mtype)
+ msg_state = self._assert_good_send(sbuf_rts, pre_send_summary)
+ mdc_logger.debug("_rts_msg: result message state: {}".format(msg_state))
+ if msg_state != rmr.RMR_ERR_RETRY:
+ break
+
+ if msg_state != rmr.RMR_OK:
+ mdc_logger.warning("_rts_msg: failed after {} retries".format(RETRY_TIMES))
+ return sbuf_rts # in some cases rts may return a new sbuf
+
+ def _handle_sends(self):
+ # send out all messages waiting for us
+ while not self.instance_send_queue.empty():
+ work_item = self.instance_send_queue.get(block=False, timeout=None)
+ payload = json.dumps(messages.a1_to_handler(*work_item)).encode("utf-8")
+ self._send_msg(payload, A1_POLICY_REQUEST, work_item[1])
+