sbuf = rmr.rmr_alloc_msg(self.mrc, len(pay), payload=pay, gen_transaction_id=True, mtype=mtype, sub_id=subid)
sbuf.contents.sub_id = subid
pre_send_summary = rmr.message_summary(sbuf)
+ mdc_logger.debug("Trying to send message: {}".format(pre_send_summary))
sbuf = rmr.rmr_send_msg(self.mrc, sbuf) # send
if self._assert_good_send(sbuf, pre_send_summary):
rmr.rmr_free_msg(sbuf) # free
- break
+ return
+
+ mdc_logger.debug("A1 did NOT send the message successfully after {} retries!".format(RETRY_TIMES))
def _rts_msg(self, pay, sbuf_rts, mtype):
"""
break
return sbuf_rts # in some cases rts may return a new sbuf
+ def _handle_sends(self):
+ # send out all messages waiting for us
+ while not self.instance_send_queue.empty():
+ work_item = self.instance_send_queue.get(block=False, timeout=None)
+ payload = json.dumps(messages.a1_to_handler(*work_item)).encode("utf-8")
+ self._send_msg(payload, A1_POLICY_REQUEST, work_item[1])
+
def loop(self):
"""
This loop runs forever, and has 3 jobs:
mdc_logger.debug("Work loop starting")
while self.keep_going:
- # send out all messages waiting for us
- while not self.instance_send_queue.empty():
- work_item = self.instance_send_queue.get(block=False, timeout=None)
- payload = json.dumps(messages.a1_to_handler(*work_item)).encode("utf-8")
- self._send_msg(payload, A1_POLICY_REQUEST, work_item[1])
+ # Update 3/20/2020
+ # We now handle our sends in a thread (that will just exit when it's done) because there is a difference between how send works in SI95 vs NNG.
+ # Send_msg via NNG formerly never blocked.
+ # However under SI95 this send may block for some arbitrary period of time on the first send to an endpoint for which a connection is not established
+ # If this send takes too long, this loop blocks, and the healthcheck will fail, which will cause A1s healthcheck to fail, which will cause Kubernetes to whack A1 and all kinds of horrible things happen.
+ # Therefore, now under SI95, we thread this.
+ Thread(target=self._handle_sends).start()
# read our mailbox
for (msg, sbuf) in self.rcv_func():
try:
# got a query, do a lookup and send out all instances
pti = json.loads(msg["payload"])["policy_type_id"]
- mdc_logger.debug("Received query for: {0}".format(pti))
- for pii in data.get_instance_list(pti):
+ instance_list = data.get_instance_list(pti) # will raise if a bad type
+ mdc_logger.debug("Received a query for a good type: {0}".format(msg))
+ for pii in instance_list:
instance = data.get_policy_instance(pti, pii)
payload = json.dumps(messages.a1_to_handler("CREATE", pti, pii, instance)).encode("utf-8")
sbuf = self._rts_msg(payload, sbuf, A1_POLICY_REQUEST)
- except (PolicyTypeNotFound, PolicyInstanceNotFound):
+ except (PolicyTypeNotFound):
mdc_logger.debug("Received a query for a non-existent type: {0}".format(msg))
except (KeyError, TypeError, json.decoder.JSONDecodeError):
mdc_logger.debug("Dropping malformed policy query message: {0}".format(msg))
self.last_ran = time.time()
time.sleep(1)
+ mdc_logger.debug("RMR Thread Ending!")
+
# Public