Extend user guide with southbound API details
[ric-plt/a1.git] / a1 / a1rmr.py
index 771842a..050674c 100644 (file)
@@ -1,6 +1,3 @@
-"""
-a1s rmr functionality
-"""
 # ==================================================================================
 #       Copyright (c) 2019-2020 Nokia
 #       Copyright (c) 2018-2020 AT&T Intellectual Property.
 # ==================================================================================
 #       Copyright (c) 2019-2020 Nokia
 #       Copyright (c) 2018-2020 AT&T Intellectual Property.
@@ -17,12 +14,15 @@ a1s rmr functionality
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 # ==================================================================================
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 # ==================================================================================
+"""
+A1 RMR functionality
+"""
 import os
 import queue
 import time
 import json
 from threading import Thread
 import os
 import queue
 import time
 import json
 from threading import Thread
-from rmr import rmr, helpers
+from ricxappframe.rmr import rmr, helpers
 from mdclogpy import Logger
 from a1 import data, messages
 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
 from mdclogpy import Logger
 from a1 import data, messages
 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
@@ -30,6 +30,11 @@ from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
 mdc_logger = Logger(name=__name__)
 
 
 mdc_logger = Logger(name=__name__)
 
 
+# With Nanomsg and NNG it was possible for a send attempt to have a "soft"
+# failure which did warrant some retries if the status of the send is RMR_ERR_RETRY.
+# Because of the way NNG worked, it sometimes required many tens of retries,
+# and a retry state happened often for even moderately "verbose" applications.
+# With SI95 there is still a possibility that a retry is necessary, but it is very rare.
 RETRY_TIMES = int(os.environ.get("A1_RMR_RETRY_TIMES", 4))
 A1_POLICY_REQUEST = 20010
 A1_POLICY_RESPONSE = 20011
 RETRY_TIMES = int(os.environ.get("A1_RMR_RETRY_TIMES", 4))
 A1_POLICY_REQUEST = 20010
 A1_POLICY_RESPONSE = 20011
@@ -43,11 +48,27 @@ __RMR_LOOP__ = None
 
 class _RmrLoop:
     """
 
 class _RmrLoop:
     """
-    class represents an rmr loop that constantly reads from rmr and performs operations based on waiting messages
-    this launches a thread, it should probably only be called once; the public facing method to access these ensures this
+    Class represents an rmr loop that constantly reads from rmr and performs operations
+    based on waiting messages.  This launches a thread, it should probably only be called
+    once; the public facing method to access these ensures this.
+
+    TODO: the xapp frame has a version of this looping structure. See if A1 can switch to that.
     """
 
     def __init__(self, init_func_override=None, rcv_func_override=None):
     """
 
     def __init__(self, init_func_override=None, rcv_func_override=None):
+        """
+        Init
+
+        Parameters
+        ----------
+        init_func_override: function (optional)
+            Function that initializes RMR and answers an RMR context.
+            Supply an empty function to skip initializing RMR.
+
+        rcv_func_override: function (optional)
+            Function that receives messages from RMR and answers a list.
+            Supply a trivial function to skip reading from RMR.
+        """
         self.keep_going = True
         self.rcv_func = None
         self.last_ran = time.time()
         self.keep_going = True
         self.rcv_func = None
         self.last_ran = time.time()
@@ -60,8 +81,8 @@ class _RmrLoop:
             self.mrc = init_func_override()
         else:
             mdc_logger.debug("Waiting for rmr to initialize..")
             self.mrc = init_func_override()
         else:
             mdc_logger.debug("Waiting for rmr to initialize..")
-            # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
-            # internal ring of messages, and receive calls read from that
+            # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread
+            # populates an internal ring of messages, and receive calls read from that.
             # currently the size is 2048 messages, so this is fine for the foreseeable future
             self.mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
             while rmr.rmr_ready(self.mrc) == 0:
             # currently the size is 2048 messages, so this is fine for the foreseeable future
             self.mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
             while rmr.rmr_ready(self.mrc) == 0:
@@ -80,41 +101,51 @@ class _RmrLoop:
 
     def _assert_good_send(self, sbuf, pre_send_summary):
         """
 
     def _assert_good_send(self, sbuf, pre_send_summary):
         """
-        common helper function for _send_msg and _rts_msg
+        Extracts the send result and logs a detailed warning if the send failed.
+        Returns the message state, an integer that indicates the result.
         """
         post_send_summary = rmr.message_summary(sbuf)
         """
         post_send_summary = rmr.message_summary(sbuf)
-        if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
-            return True
-        mdc_logger.debug("Message NOT sent!")
-        mdc_logger.debug("Pre-send summary: {0}, Post-send summary: {1}".format(pre_send_summary, post_send_summary))
-        return False
+        if post_send_summary[rmr.RMR_MS_MSG_STATE] != rmr.RMR_OK:
+            mdc_logger.warning("RMR send failed; pre-send summary: {0}, post-send summary: {1}".format(pre_send_summary, post_send_summary))
+        return post_send_summary[rmr.RMR_MS_MSG_STATE]
 
     def _send_msg(self, pay, mtype, subid):
         """
 
     def _send_msg(self, pay, mtype, subid):
         """
-        sends a msg
+        Creates and sends a message via RMR's send-message feature with the specified payload
+        using the specified message type and subscription ID.
         """
         """
+        sbuf = rmr.rmr_alloc_msg(self.mrc, len(pay), payload=pay, gen_transaction_id=True, mtype=mtype, sub_id=subid)
+        sbuf.contents.sub_id = subid
+        pre_send_summary = rmr.message_summary(sbuf)
         for _ in range(0, RETRY_TIMES):
         for _ in range(0, RETRY_TIMES):
-            sbuf = rmr.rmr_alloc_msg(self.mrc, len(pay), payload=pay, gen_transaction_id=True, mtype=mtype, sub_id=subid)
-            sbuf.contents.sub_id = subid
-            pre_send_summary = rmr.message_summary(sbuf)
-            mdc_logger.debug("Trying to send message: {}".format(pre_send_summary))
-            sbuf = rmr.rmr_send_msg(self.mrc, sbuf)  # send
-            if self._assert_good_send(sbuf, pre_send_summary):
-                rmr.rmr_free_msg(sbuf)  # free
-                return
+            mdc_logger.debug("_send_msg: sending: {}".format(pre_send_summary))
+            sbuf = rmr.rmr_send_msg(self.mrc, sbuf)
+            msg_state = self._assert_good_send(sbuf, pre_send_summary)
+            mdc_logger.debug("_send_msg: result message state: {}".format(msg_state))
+            if msg_state != rmr.RMR_ERR_RETRY:
+                break
 
 
-        mdc_logger.debug("A1 did NOT send the message successfully after {} retries!".format(RETRY_TIMES))
+        rmr.rmr_free_msg(sbuf)
+        if msg_state != rmr.RMR_OK:
+            mdc_logger.warning("_send_msg: failed after {} retries".format(RETRY_TIMES))
 
     def _rts_msg(self, pay, sbuf_rts, mtype):
         """
 
     def _rts_msg(self, pay, sbuf_rts, mtype):
         """
-        sends a message using rts
-        we do not call free here because we may rts many times; it is called after the rts loop
+        Sends a message via RMR's return-to-sender feature.
+        This neither allocates nor frees a message buffer because we may rts many times.
+        Returns the message buffer from the RTS function, which may reallocate it.
         """
         """
+        pre_send_summary = rmr.message_summary(sbuf_rts)
         for _ in range(0, RETRY_TIMES):
         for _ in range(0, RETRY_TIMES):
-            pre_send_summary = rmr.message_summary(sbuf_rts)
+            mdc_logger.debug("_rts_msg: sending: {}".format(pre_send_summary))
             sbuf_rts = rmr.rmr_rts_msg(self.mrc, sbuf_rts, payload=pay, mtype=mtype)
             sbuf_rts = rmr.rmr_rts_msg(self.mrc, sbuf_rts, payload=pay, mtype=mtype)
-            if self._assert_good_send(sbuf_rts, pre_send_summary):
+            msg_state = self._assert_good_send(sbuf_rts, pre_send_summary)
+            mdc_logger.debug("_rts_msg: result message state: {}".format(msg_state))
+            if msg_state != rmr.RMR_ERR_RETRY:
                 break
                 break
+
+        if msg_state != rmr.RMR_OK:
+            mdc_logger.warning("_rts_msg: failed after {} retries".format(RETRY_TIMES))
         return sbuf_rts  # in some cases rts may return a new sbuf
 
     def _handle_sends(self):
         return sbuf_rts  # in some cases rts may return a new sbuf
 
     def _handle_sends(self):
@@ -147,40 +178,40 @@ class _RmrLoop:
             for (msg, sbuf) in self.rcv_func():
                 # TODO: in the future we may also have to catch SDL errors
                 try:
             for (msg, sbuf) in self.rcv_func():
                 # TODO: in the future we may also have to catch SDL errors
                 try:
-                    mtype = msg["message type"]
+                    mtype = msg[rmr.RMR_MS_MSG_TYPE]
                 except (KeyError, TypeError, json.decoder.JSONDecodeError):
                 except (KeyError, TypeError, json.decoder.JSONDecodeError):
-                    mdc_logger.debug("Dropping malformed policy ack/query message: {0}".format(msg))
+                    mdc_logger.warning("Dropping malformed message: {0}".format(msg))
 
                 if mtype == A1_POLICY_RESPONSE:
                     try:
                         # got a policy response, update status
 
                 if mtype == A1_POLICY_RESPONSE:
                     try:
                         # got a policy response, update status
-                        pay = json.loads(msg["payload"])
+                        pay = json.loads(msg[rmr.RMR_MS_PAYLOAD])
                         data.set_policy_instance_status(
                             pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
                         )
                         mdc_logger.debug("Successfully received status update: {0}".format(pay))
                     except (PolicyTypeNotFound, PolicyInstanceNotFound):
                         data.set_policy_instance_status(
                             pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
                         )
                         mdc_logger.debug("Successfully received status update: {0}".format(pay))
                     except (PolicyTypeNotFound, PolicyInstanceNotFound):
-                        mdc_logger.debug("Received a response  for a non-existent instance")
+                        mdc_logger.warning("Received a response for a non-existent type/instance: {0}".format(msg))
                     except (KeyError, TypeError, json.decoder.JSONDecodeError):
                     except (KeyError, TypeError, json.decoder.JSONDecodeError):
-                        mdc_logger.debug("Dropping malformed policy ack message: {0}".format(msg))
+                        mdc_logger.warning("Dropping malformed policy response: {0}".format(msg))
 
                 elif mtype == A1_POLICY_QUERY:
                     try:
                         # got a query, do a lookup and send out all instances
 
                 elif mtype == A1_POLICY_QUERY:
                     try:
                         # got a query, do a lookup and send out all instances
-                        pti = json.loads(msg["payload"])["policy_type_id"]
+                        pti = json.loads(msg[rmr.RMR_MS_PAYLOAD])["policy_type_id"]
                         instance_list = data.get_instance_list(pti)  # will raise if a bad type
                         instance_list = data.get_instance_list(pti)  # will raise if a bad type
-                        mdc_logger.debug("Received a query for a good type: {0}".format(msg))
+                        mdc_logger.debug("Received a query for a known policy type: {0}".format(msg))
                         for pii in instance_list:
                             instance = data.get_policy_instance(pti, pii)
                             payload = json.dumps(messages.a1_to_handler("CREATE", pti, pii, instance)).encode("utf-8")
                             sbuf = self._rts_msg(payload, sbuf, A1_POLICY_REQUEST)
                     except (PolicyTypeNotFound):
                         for pii in instance_list:
                             instance = data.get_policy_instance(pti, pii)
                             payload = json.dumps(messages.a1_to_handler("CREATE", pti, pii, instance)).encode("utf-8")
                             sbuf = self._rts_msg(payload, sbuf, A1_POLICY_REQUEST)
                     except (PolicyTypeNotFound):
-                        mdc_logger.debug("Received a query for a non-existent type: {0}".format(msg))
+                        mdc_logger.warning("Received a policy query for a non-existent type: {0}".format(msg))
                     except (KeyError, TypeError, json.decoder.JSONDecodeError):
                     except (KeyError, TypeError, json.decoder.JSONDecodeError):
-                        mdc_logger.debug("Dropping malformed policy query message: {0}".format(msg))
+                        mdc_logger.warning("Dropping malformed policy query: {0}".format(msg))
 
                 else:
 
                 else:
-                    mdc_logger.debug("Received message type {0} but A1 does not handle this".format(mtype))
+                    mdc_logger.warning("Received message type {0} but A1 does not handle this".format(mtype))
 
                 # we must free each sbuf
                 rmr.rmr_free_msg(sbuf)
 
                 # we must free each sbuf
                 rmr.rmr_free_msg(sbuf)
@@ -197,6 +228,16 @@ class _RmrLoop:
 def start_rmr_thread(init_func_override=None, rcv_func_override=None):
     """
     Start a1s rmr thread
 def start_rmr_thread(init_func_override=None, rcv_func_override=None):
     """
     Start a1s rmr thread
+
+    Parameters
+    ----------
+    init_func_override: function (optional)
+        Function that initializes RMR and answers an RMR context.
+        Supply an empty function to skip initializing RMR.
+
+    rcv_func_override: function (optional)
+        Function that receives messages from RMR and answers a list.
+        Supply a trivial function to skip reading from RMR.
     """
     global __RMR_LOOP__
     if __RMR_LOOP__ is None:
     """
     global __RMR_LOOP__
     if __RMR_LOOP__ is None: