Repair send-message methods for free and retry
[ric-plt/a1.git] / a1 / a1rmr.py
index d6bd685..f623024 100644 (file)
@@ -30,6 +30,11 @@ from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
 mdc_logger = Logger(name=__name__)
 
 
+# With Nanomsg and NNG it was possible for a send attempt to have a "soft"
+# failure which did warrant some retries if the status of the send is RMR_ERR_RETRY.
+# Because of the way NNG worked, it sometimes required many tens of retries,
+# and a retry state happened often for even moderately "verbose" applications.
+# With SI95 there is still a possibility that a retry is necessary, but it is very rare.
 RETRY_TIMES = int(os.environ.get("A1_RMR_RETRY_TIMES", 4))
 A1_POLICY_REQUEST = 20010
 A1_POLICY_RESPONSE = 20011
@@ -76,8 +81,8 @@ class _RmrLoop:
             self.mrc = init_func_override()
         else:
             mdc_logger.debug("Waiting for rmr to initialize..")
-            # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
-            # internal ring of messages, and receive calls read from that
+            # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread
+            # populates an internal ring of messages, and receive calls read from that.
             # currently the size is 2048 messages, so this is fine for the foreseeable future
             self.mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
             while rmr.rmr_ready(self.mrc) == 0:
@@ -96,41 +101,49 @@ class _RmrLoop:
 
     def _assert_good_send(self, sbuf, pre_send_summary):
         """
-        common helper function for _send_msg and _rts_msg
+        Extracts the send result and logs a detailed warning if the send failed.
+        Returns the message state, an integer that indicates the result.
         """
         post_send_summary = rmr.message_summary(sbuf)
-        if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
-            return True
-        mdc_logger.debug("Message NOT sent!")
-        mdc_logger.debug("Pre-send summary: {0}, Post-send summary: {1}".format(pre_send_summary, post_send_summary))
-        return False
+        if post_send_summary[rmr.RMR_MS_MSG_STATE] != rmr.RMR_OK:
+            mdc_logger.warning("RMR send failed; pre-send summary: {0}, post-send summary: {1}".format(pre_send_summary, post_send_summary))
+        return post_send_summary[rmr.RMR_MS_MSG_STATE]
 
     def _send_msg(self, pay, mtype, subid):
         """
-        sends a msg
+        Creates and sends a message via RMR's send-message feature with the specified payload
+        using the specified message type and subscription ID.
         """
+        sbuf = rmr.rmr_alloc_msg(self.mrc, len(pay), payload=pay, gen_transaction_id=True, mtype=mtype, sub_id=subid)
+        sbuf.contents.sub_id = subid
+        pre_send_summary = rmr.message_summary(sbuf)
         for _ in range(0, RETRY_TIMES):
-            sbuf = rmr.rmr_alloc_msg(self.mrc, len(pay), payload=pay, gen_transaction_id=True, mtype=mtype, sub_id=subid)
-            sbuf.contents.sub_id = subid
-            pre_send_summary = rmr.message_summary(sbuf)
-            mdc_logger.debug("Trying to send message: {}".format(pre_send_summary))
-            sbuf = rmr.rmr_send_msg(self.mrc, sbuf)  # send
-            if self._assert_good_send(sbuf, pre_send_summary):
-                rmr.rmr_free_msg(sbuf)  # free
-                return
+            mdc_logger.debug("_send_msg: sending: {}".format(pre_send_summary))
+            sbuf = rmr.rmr_send_msg(self.mrc, sbuf)
+            msg_state = self._assert_good_send(sbuf, pre_send_summary)
+            if msg_state != rmr.RMR_ERR_RETRY:
+                break
 
-        mdc_logger.debug("A1 did NOT send the message successfully after {} retries!".format(RETRY_TIMES))
+        rmr.rmr_free_msg(sbuf)
+        if msg_state != rmr.RMR_OK:
+            mdc_logger.warning("_send_msg: failed after {} retries".format(RETRY_TIMES))
 
     def _rts_msg(self, pay, sbuf_rts, mtype):
         """
-        sends a message using rts
-        we do not call free here because we may rts many times; it is called after the rts loop
+        Sends a message via RMR's return-to-sender feature.
+        This neither allocates nor frees a message buffer because we may rts many times.
+        Returns the message buffer from the RTS function, which may reallocate it.
         """
+        pre_send_summary = rmr.message_summary(sbuf_rts)
         for _ in range(0, RETRY_TIMES):
-            pre_send_summary = rmr.message_summary(sbuf_rts)
+            mdc_logger.debug("_rts_msg: sending: {}".format(pre_send_summary))
             sbuf_rts = rmr.rmr_rts_msg(self.mrc, sbuf_rts, payload=pay, mtype=mtype)
-            if self._assert_good_send(sbuf_rts, pre_send_summary):
+            msg_state = self._assert_good_send(sbuf_rts, pre_send_summary)
+            if msg_state != rmr.RMR_ERR_RETRY:
                 break
+
+        if msg_state != rmr.RMR_OK:
+            mdc_logger.warning("_rts_msg: failed after {} retries".format(RETRY_TIMES))
         return sbuf_rts  # in some cases rts may return a new sbuf
 
     def _handle_sends(self):
@@ -163,40 +176,40 @@ class _RmrLoop:
             for (msg, sbuf) in self.rcv_func():
                 # TODO: in the future we may also have to catch SDL errors
                 try:
-                    mtype = msg["message type"]
+                    mtype = msg[rmr.RMR_MS_MSG_TYPE]
                 except (KeyError, TypeError, json.decoder.JSONDecodeError):
-                    mdc_logger.debug("Dropping malformed policy ack/query message: {0}".format(msg))
+                    mdc_logger.warning("Dropping malformed message: {0}".format(msg))
 
                 if mtype == A1_POLICY_RESPONSE:
                     try:
                         # got a policy response, update status
-                        pay = json.loads(msg["payload"])
+                        pay = json.loads(msg[rmr.RMR_MS_PAYLOAD])
                         data.set_policy_instance_status(
                             pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
                         )
                         mdc_logger.debug("Successfully received status update: {0}".format(pay))
                     except (PolicyTypeNotFound, PolicyInstanceNotFound):
-                        mdc_logger.debug("Received a response  for a non-existent instance")
+                        mdc_logger.warning("Received a response for a non-existent type/instance: {0}".format(msg))
                     except (KeyError, TypeError, json.decoder.JSONDecodeError):
-                        mdc_logger.debug("Dropping malformed policy ack message: {0}".format(msg))
+                        mdc_logger.warning("Dropping malformed policy response: {0}".format(msg))
 
                 elif mtype == A1_POLICY_QUERY:
                     try:
                         # got a query, do a lookup and send out all instances
-                        pti = json.loads(msg["payload"])["policy_type_id"]
+                        pti = json.loads(msg[rmr.RMR_MS_PAYLOAD])["policy_type_id"]
                         instance_list = data.get_instance_list(pti)  # will raise if a bad type
-                        mdc_logger.debug("Received a query for a good type: {0}".format(msg))
+                        mdc_logger.debug("Received a query for a known policy type: {0}".format(msg))
                         for pii in instance_list:
                             instance = data.get_policy_instance(pti, pii)
                             payload = json.dumps(messages.a1_to_handler("CREATE", pti, pii, instance)).encode("utf-8")
                             sbuf = self._rts_msg(payload, sbuf, A1_POLICY_REQUEST)
                     except (PolicyTypeNotFound):
-                        mdc_logger.debug("Received a query for a non-existent type: {0}".format(msg))
+                        mdc_logger.warning("Received a policy query for a non-existent type: {0}".format(msg))
                     except (KeyError, TypeError, json.decoder.JSONDecodeError):
-                        mdc_logger.debug("Dropping malformed policy query message: {0}".format(msg))
+                        mdc_logger.warning("Dropping malformed policy query: {0}".format(msg))
 
                 else:
-                    mdc_logger.debug("Received message type {0} but A1 does not handle this".format(mtype))
+                    mdc_logger.warning("Received message type {0} but A1 does not handle this".format(mtype))
 
                 # we must free each sbuf
                 rmr.rmr_free_msg(sbuf)