Upgrade A1 to use RMR library version 4.0.2
[ric-plt/a1.git] / a1 / a1rmr.py
index 2e5dace..3d0ff95 100644 (file)
@@ -2,8 +2,8 @@
 a1s rmr functionality
 """
 # ==================================================================================
-#       Copyright (c) 2019 Nokia
-#       Copyright (c) 2018-2019 AT&T Intellectual Property.
+#       Copyright (c) 2019-2020 Nokia
+#       Copyright (c) 2018-2020 AT&T Intellectual Property.
 #
 #   Licensed under the Apache License, Version 2.0 (the "License");
 #   you may not use this file except in compliance with the License.
@@ -22,7 +22,7 @@ import queue
 import time
 import json
 from threading import Thread
-from rmr import rmr, helpers
+from ricxappframe.rmr import rmr, helpers
 from mdclogpy import Logger
 from a1 import data, messages
 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
@@ -31,8 +31,6 @@ mdc_logger = Logger(name=__name__)
 
 
 RETRY_TIMES = int(os.environ.get("A1_RMR_RETRY_TIMES", 4))
-
-
 A1_POLICY_REQUEST = 20010
 A1_POLICY_RESPONSE = 20011
 A1_POLICY_QUERY = 20012
@@ -47,12 +45,16 @@ class _RmrLoop:
     """
     class represents an rmr loop that constantly reads from rmr and performs operations based on waiting messages
     this launches a thread, it should probably only be called once; the public facing method to access these ensures this
+
+    TODO: the xapp frame has a version of this looping structure. See if A1 can switch to that.
     """
 
     def __init__(self, init_func_override=None, rcv_func_override=None):
         self.keep_going = True
         self.rcv_func = None
         self.last_ran = time.time()
+
+        # see docs/overview#resiliency for a discussion of this
         self.instance_send_queue = queue.Queue()  # thread safe queue https://docs.python.org/3/library/queue.html
 
         # intialize rmr context
@@ -97,10 +99,13 @@ class _RmrLoop:
             sbuf = rmr.rmr_alloc_msg(self.mrc, len(pay), payload=pay, gen_transaction_id=True, mtype=mtype, sub_id=subid)
             sbuf.contents.sub_id = subid
             pre_send_summary = rmr.message_summary(sbuf)
+            mdc_logger.debug("Trying to send message: {}".format(pre_send_summary))
             sbuf = rmr.rmr_send_msg(self.mrc, sbuf)  # send
             if self._assert_good_send(sbuf, pre_send_summary):
                 rmr.rmr_free_msg(sbuf)  # free
-                break
+                return
+
+        mdc_logger.debug("A1 did NOT send the message successfully after {} retries!".format(RETRY_TIMES))
 
     def _rts_msg(self, pay, sbuf_rts, mtype):
         """
@@ -114,6 +119,13 @@ class _RmrLoop:
                 break
         return sbuf_rts  # in some cases rts may return a new sbuf
 
+    def _handle_sends(self):
+        # send out all messages waiting for us
+        while not self.instance_send_queue.empty():
+            work_item = self.instance_send_queue.get(block=False, timeout=None)
+            payload = json.dumps(messages.a1_to_handler(*work_item)).encode("utf-8")
+            self._send_msg(payload, A1_POLICY_REQUEST, work_item[1])
+
     def loop(self):
         """
         This loop runs forever, and has 3 jobs:
@@ -125,11 +137,13 @@ class _RmrLoop:
         mdc_logger.debug("Work loop starting")
         while self.keep_going:
 
-            # send out all messages waiting for us
-            while not self.instance_send_queue.empty():
-                work_item = self.instance_send_queue.get(block=False, timeout=None)
-                payload = json.dumps(messages.a1_to_handler(*work_item)).encode("utf-8")
-                self._send_msg(payload, A1_POLICY_REQUEST, work_item[1])
+            # Update 3/20/2020
+            # We now handle our sends in a thread (that will just exit when it's done) because there is a difference between how send works in SI95 vs NNG.
+            # Send_msg via NNG formerly never blocked.
+            # However under SI95 this send may block for some arbitrary period of time on the first send to an endpoint for which a connection is not established
+            # If this send takes too long, this loop blocks, and the healthcheck will fail, which will cause A1s healthcheck to fail, which will cause Kubernetes to whack A1 and all kinds of horrible things happen.
+            # Therefore, now under SI95, we thread this.
+            Thread(target=self._handle_sends).start()
 
             # read our mailbox
             for (msg, sbuf) in self.rcv_func():
@@ -156,12 +170,13 @@ class _RmrLoop:
                     try:
                         # got a query, do a lookup and send out all instances
                         pti = json.loads(msg["payload"])["policy_type_id"]
-                        mdc_logger.debug("Received query for: {0}".format(pti))
-                        for pii in data.get_instance_list(pti):
+                        instance_list = data.get_instance_list(pti)  # will raise if a bad type
+                        mdc_logger.debug("Received a query for a good type: {0}".format(msg))
+                        for pii in instance_list:
                             instance = data.get_policy_instance(pti, pii)
                             payload = json.dumps(messages.a1_to_handler("CREATE", pti, pii, instance)).encode("utf-8")
                             sbuf = self._rts_msg(payload, sbuf, A1_POLICY_REQUEST)
-                    except (PolicyTypeNotFound, PolicyInstanceNotFound):
+                    except (PolicyTypeNotFound):
                         mdc_logger.debug("Received a query for a non-existent type: {0}".format(msg))
                     except (KeyError, TypeError, json.decoder.JSONDecodeError):
                         mdc_logger.debug("Dropping malformed policy query message: {0}".format(msg))
@@ -175,6 +190,8 @@ class _RmrLoop:
             self.last_ran = time.time()
             time.sleep(1)
 
+        mdc_logger.debug("RMR Thread Ending!")
+
 
 # Public