+ mdc_logger.debug("Waiting for rmr to initialize..")
+ # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread
+ # populates an internal ring of messages, and receive calls read from that.
+ # currently the size is 2048 messages, so this is fine for the foreseeable future
+ self.mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
+ while rmr.rmr_ready(self.mrc) == 0:
+ time.sleep(0.5)
+
+ # set the receive function
+ self.rcv_func = (
+ rcv_func_override
+ if rcv_func_override
+ else lambda: helpers.rmr_rcvall_msgs_raw(self.mrc, [A1_POLICY_RESPONSE, A1_POLICY_QUERY, A1_EI_QUERY_ALL, A1_EI_CREATE_JOB])
+ )
+
+ # start the work loop
+ self.thread = Thread(target=self.loop)
+ self.thread.start()
+
+ def _assert_good_send(self, sbuf, pre_send_summary):
+ """
+ Extracts the send result and logs a detailed warning if the send failed.
+ Returns the message state, an integer that indicates the result.
+ """
+ post_send_summary = rmr.message_summary(sbuf)
+ if post_send_summary[rmr.RMR_MS_MSG_STATE] != rmr.RMR_OK:
+ mdc_logger.warning("RMR send failed; pre-send summary: {0}, post-send summary: {1}".format(pre_send_summary, post_send_summary))
+ return post_send_summary[rmr.RMR_MS_MSG_STATE]
+
+ def _send_msg(self, pay, mtype, subid):
+ """
+ Creates and sends a message via RMR's send-message feature with the specified payload
+ using the specified message type and subscription ID.
+ """
+ sbuf = rmr.rmr_alloc_msg(self.mrc, len(pay), payload=pay, gen_transaction_id=True, mtype=mtype, sub_id=subid)
+ sbuf.contents.sub_id = subid
+ pre_send_summary = rmr.message_summary(sbuf)
+ for _ in range(0, RETRY_TIMES):
+ mdc_logger.debug("_send_msg: sending: {}".format(pre_send_summary))
+ sbuf = rmr.rmr_send_msg(self.mrc, sbuf)
+ msg_state = self._assert_good_send(sbuf, pre_send_summary)
+ mdc_logger.debug("_send_msg: result message state: {}".format(msg_state))
+ if msg_state != rmr.RMR_ERR_RETRY:
+ break
+
+ rmr.rmr_free_msg(sbuf)
+ if msg_state != rmr.RMR_OK:
+ mdc_logger.warning("_send_msg: failed after {} retries".format(RETRY_TIMES))
+
+ def _rts_msg(self, pay, sbuf_rts, mtype):
+ """
+ Sends a message via RMR's return-to-sender feature.
+ This neither allocates nor frees a message buffer because we may rts many times.
+ Returns the message buffer from the RTS function, which may reallocate it.
+ """
+ pre_send_summary = rmr.message_summary(sbuf_rts)
+ for _ in range(0, RETRY_TIMES):
+ mdc_logger.debug("_rts_msg: sending: {}".format(pre_send_summary))
+ sbuf_rts = rmr.rmr_rts_msg(self.mrc, sbuf_rts, payload=pay, mtype=mtype)
+ msg_state = self._assert_good_send(sbuf_rts, pre_send_summary)
+ mdc_logger.debug("_rts_msg: result message state: {}".format(msg_state))
+ if msg_state != rmr.RMR_ERR_RETRY:
+ break
+
+ if msg_state != rmr.RMR_OK:
+ mdc_logger.warning("_rts_msg: failed after {} retries".format(RETRY_TIMES))
+ return sbuf_rts # in some cases rts may return a new sbuf
+
+ def _handle_sends(self):
+ # send out all messages waiting for us
+ while not self.instance_send_queue.empty():
+ work_item = self.instance_send_queue.get(block=False, timeout=None)
+ payload = json.dumps(messages.a1_to_handler(*work_item)).encode("utf-8")
+ self._send_msg(payload, A1_POLICY_REQUEST, work_item[1])
+
+ # now send all the ei-job related data
+ while not self.ei_job_result_queue.empty():
+ mdc_logger.debug("perform data delivery to consumer")
+
+ work_item = self.ei_job_result_queue.get(block=False, timeout=None)
+ payload = json.dumps(messages.ei_to_handler(*work_item)).encode("utf-8")
+ ei_job_id = int(work_item[0])
+ mdc_logger.debug("data-delivery: {}".format(payload))
+
+ # send the payload to consumer subscribed for ei_job_id
+ self._send_msg(payload, A1_EI_DATA_DELIVERY, ei_job_id)
+
+ def loop(self):
+ """
+ This loop runs forever, and has 3 jobs:
+ - send out any messages that have to go out (create instance, delete instance)
+ - read a1s mailbox and update the status of all instances based on acks from downstream policy handlers
+ - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
+ """
+ # loop forever
+ mdc_logger.debug("Work loop starting")
+ while self.keep_going:
+
+ # Update 3/20/2020
+ # We now handle our sends in a thread (that will just exit when it's done) because there is a difference between how send works in SI95 vs NNG.
+ # Send_msg via NNG formerly never blocked.
+ # However under SI95 this send may block for some arbitrary period of time on the first send to an endpoint for which a connection is not established
+ # If this send takes too long, this loop blocks, and the healthcheck will fail, which will cause A1s healthcheck to fail, which will cause Kubernetes to whack A1 and all kinds of horrible things happen.
+ # Therefore, now under SI95, we thread this.
+ Thread(target=self._handle_sends).start()
+
+ # read our mailbox
+ for (msg, sbuf) in self.rcv_func():
+ # TODO: in the future we may also have to catch SDL errors
+ try:
+ mtype = msg[rmr.RMR_MS_MSG_TYPE]
+ except (KeyError, TypeError, json.decoder.JSONDecodeError):
+ mdc_logger.warning("Dropping malformed message: {0}".format(msg))
+
+ if mtype == A1_POLICY_RESPONSE:
+ try:
+ # got a policy response, update status
+ pay = json.loads(msg[rmr.RMR_MS_PAYLOAD])
+ data.set_policy_instance_status(
+ pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
+ )
+ mdc_logger.debug("Successfully received status update: {0}".format(pay))
+ except (PolicyTypeNotFound, PolicyInstanceNotFound):
+ mdc_logger.warning("Received a response for a non-existent type/instance: {0}".format(msg))
+ except (KeyError, TypeError, json.decoder.JSONDecodeError):
+ mdc_logger.warning("Dropping malformed policy response: {0}".format(msg))
+
+ elif mtype == A1_POLICY_QUERY:
+ try:
+ # got a query, do a lookup and send out all instances
+ pti = json.loads(msg[rmr.RMR_MS_PAYLOAD])["policy_type_id"]
+ instance_list = data.get_instance_list(pti) # will raise if a bad type
+ mdc_logger.debug("Received a query for a known policy type: {0}".format(msg))
+ for pii in instance_list:
+ instance = data.get_policy_instance(pti, pii)
+ payload = json.dumps(messages.a1_to_handler("CREATE", pti, pii, instance)).encode("utf-8")
+ sbuf = self._rts_msg(payload, sbuf, A1_POLICY_REQUEST)
+ except (PolicyTypeNotFound):
+ mdc_logger.warning("Received a policy query for a non-existent type: {0}".format(msg))
+ except (KeyError, TypeError, json.decoder.JSONDecodeError):
+ mdc_logger.warning("Dropping malformed policy query: {0}".format(msg))