Cleanups only (no code changes)
[ric-plt/a1.git] / a1 / a1rmr.py
index ae2bf00..58ec1c0 100644 (file)
@@ -2,8 +2,8 @@
 a1s rmr functionality
 """
 # ==================================================================================
-#       Copyright (c) 2019 Nokia
-#       Copyright (c) 2018-2019 AT&T Intellectual Property.
+#       Copyright (c) 2019-2020 Nokia
+#       Copyright (c) 2018-2020 AT&T Intellectual Property.
 #
 #   Licensed under the Apache License, Version 2.0 (the "License");
 #   you may not use this file except in compliance with the License.
@@ -24,15 +24,13 @@ import json
 from threading import Thread
 from rmr import rmr, helpers
 from mdclogpy import Logger
-from a1 import data
+from a1 import data, messages
 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
 
 mdc_logger = Logger(name=__name__)
 
 
 RETRY_TIMES = int(os.environ.get("A1_RMR_RETRY_TIMES", 4))
-
-
 A1_POLICY_REQUEST = 20010
 A1_POLICY_RESPONSE = 20011
 A1_POLICY_QUERY = 20012
@@ -53,7 +51,9 @@ class _RmrLoop:
         self.keep_going = True
         self.rcv_func = None
         self.last_ran = time.time()
-        self.work_queue = queue.Queue()  # thread safe queue https://docs.python.org/3/library/queue.html
+
+        # see docs/overview#resiliency for a discussion of this
+        self.instance_send_queue = queue.Queue()  # thread safe queue https://docs.python.org/3/library/queue.html
 
         # intialize rmr context
         if init_func_override:
@@ -68,15 +68,52 @@ class _RmrLoop:
                 time.sleep(0.5)
 
         # set the receive function
-        # TODO: when policy query is implemented, add A1_POLICY_QUERY
         self.rcv_func = (
-            rcv_func_override if rcv_func_override else lambda: helpers.rmr_rcvall_msgs(self.mrc, [A1_POLICY_RESPONSE])
+            rcv_func_override
+            if rcv_func_override
+            else lambda: helpers.rmr_rcvall_msgs_raw(self.mrc, [A1_POLICY_RESPONSE, A1_POLICY_QUERY])
         )
 
         # start the work loop
         self.thread = Thread(target=self.loop)
         self.thread.start()
 
+    def _assert_good_send(self, sbuf, pre_send_summary):
+        """
+        common helper function for _send_msg and _rts_msg
+        """
+        post_send_summary = rmr.message_summary(sbuf)
+        if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
+            return True
+        mdc_logger.debug("Message NOT sent!")
+        mdc_logger.debug("Pre-send summary: {0}, Post-send summary: {1}".format(pre_send_summary, post_send_summary))
+        return False
+
+    def _send_msg(self, pay, mtype, subid):
+        """
+        sends a msg
+        """
+        for _ in range(0, RETRY_TIMES):
+            sbuf = rmr.rmr_alloc_msg(self.mrc, len(pay), payload=pay, gen_transaction_id=True, mtype=mtype, sub_id=subid)
+            sbuf.contents.sub_id = subid
+            pre_send_summary = rmr.message_summary(sbuf)
+            sbuf = rmr.rmr_send_msg(self.mrc, sbuf)  # send
+            if self._assert_good_send(sbuf, pre_send_summary):
+                rmr.rmr_free_msg(sbuf)  # free
+                break
+
+    def _rts_msg(self, pay, sbuf_rts, mtype):
+        """
+        sends a message using rts
+        we do not call free here because we may rts many times; it is called after the rts loop
+        """
+        for _ in range(0, RETRY_TIMES):
+            pre_send_summary = rmr.message_summary(sbuf_rts)
+            sbuf_rts = rmr.rmr_rts_msg(self.mrc, sbuf_rts, payload=pay, mtype=mtype)
+            if self._assert_good_send(sbuf_rts, pre_send_summary):
+                break
+        return sbuf_rts  # in some cases rts may return a new sbuf
+
     def loop(self):
         """
         This loop runs forever, and has 3 jobs:
@@ -89,38 +126,52 @@ class _RmrLoop:
         while self.keep_going:
 
             # send out all messages waiting for us
-            while not self.work_queue.empty():
-                work_item = self.work_queue.get(block=False, timeout=None)
-
-                pay = work_item["payload"].encode("utf-8")
-                for _ in range(0, RETRY_TIMES):
-                    # Waiting on an rmr bugfix regarding the over-allocation: https://rancodev.atlassian.net/browse/RICPLT-2490
-                    sbuf = rmr.rmr_alloc_msg(self.mrc, 4096, pay, True, A1_POLICY_REQUEST)
-                    # TODO: after next rmr is released, this can be done in the alloc call. but that's not avail in pypi yet
-                    sbuf.contents.sub_id = work_item["ptid"]
-                    pre_send_summary = rmr.message_summary(sbuf)
-                    sbuf = rmr.rmr_send_msg(self.mrc, sbuf)  # send
-                    post_send_summary = rmr.message_summary(sbuf)
-                    mdc_logger.debug(
-                        "Pre-send summary: {0}, Post-send summary: {1}".format(pre_send_summary, post_send_summary)
-                    )
-                    rmr.rmr_free_msg(sbuf)  # free
-                    if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
-                        mdc_logger.debug("Message sent successfully!")
-                        break
-
-            # read our mailbox and update statuses
-            for msg in self.rcv_func():
+            while not self.instance_send_queue.empty():
+                work_item = self.instance_send_queue.get(block=False, timeout=None)
+                payload = json.dumps(messages.a1_to_handler(*work_item)).encode("utf-8")
+                self._send_msg(payload, A1_POLICY_REQUEST, work_item[1])
+
+            # read our mailbox
+            for (msg, sbuf) in self.rcv_func():
+                # TODO: in the future we may also have to catch SDL errors
                 try:
-                    pay = json.loads(msg["payload"])
-                    pti = pay["policy_type_id"]
-                    pii = pay["policy_instance_id"]
-                    data.set_policy_instance_status(pti, pii, pay["handler_id"], pay["status"])
-                except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError, TypeError, json.decoder.JSONDecodeError):
-                    # TODO: in the future we may also have to catch SDL errors
-                    mdc_logger.debug("Dropping malformed or non applicable message: {0}".format(msg))
-
-            # TODO: what's a reasonable sleep time? we don't want to hammer redis too much, and a1 isn't a real time component
+                    mtype = msg["message type"]
+                except (KeyError, TypeError, json.decoder.JSONDecodeError):
+                    mdc_logger.debug("Dropping malformed policy ack/query message: {0}".format(msg))
+
+                if mtype == A1_POLICY_RESPONSE:
+                    try:
+                        # got a policy response, update status
+                        pay = json.loads(msg["payload"])
+                        data.set_policy_instance_status(
+                            pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
+                        )
+                        mdc_logger.debug("Successfully received status update: {0}".format(pay))
+                    except (PolicyTypeNotFound, PolicyInstanceNotFound):
+                        mdc_logger.debug("Received a response  for a non-existent instance")
+                    except (KeyError, TypeError, json.decoder.JSONDecodeError):
+                        mdc_logger.debug("Dropping malformed policy ack message: {0}".format(msg))
+
+                elif mtype == A1_POLICY_QUERY:
+                    try:
+                        # got a query, do a lookup and send out all instances
+                        pti = json.loads(msg["payload"])["policy_type_id"]
+                        mdc_logger.debug("Received query for: {0}".format(pti))
+                        for pii in data.get_instance_list(pti):
+                            instance = data.get_policy_instance(pti, pii)
+                            payload = json.dumps(messages.a1_to_handler("CREATE", pti, pii, instance)).encode("utf-8")
+                            sbuf = self._rts_msg(payload, sbuf, A1_POLICY_REQUEST)
+                    except (PolicyTypeNotFound, PolicyInstanceNotFound):
+                        mdc_logger.debug("Received a query for a non-existent type: {0}".format(msg))
+                    except (KeyError, TypeError, json.decoder.JSONDecodeError):
+                        mdc_logger.debug("Dropping malformed policy query message: {0}".format(msg))
+
+                else:
+                    mdc_logger.debug("Received message type {0} but A1 does not handle this".format(mtype))
+
+                # we must free each sbuf
+                rmr.rmr_free_msg(sbuf)
+
             self.last_ran = time.time()
             time.sleep(1)
 
@@ -144,12 +195,12 @@ def stop_rmr_thread():
     __RMR_LOOP__.keep_going = False
 
 
-def queue_work(item):
+def queue_instance_send(item):
     """
     push an item into the work queue
     currently the only type of work is to send out messages
     """
-    __RMR_LOOP__.work_queue.put(item)
+    __RMR_LOOP__.instance_send_queue.put(item)
 
 
 def healthcheck_rmr_thread(seconds=30):