Cleanups only (no code changes)
[ric-plt/a1.git] / a1 / a1rmr.py
index ae2bf00..58ec1c0 100644 (file)
@@ -2,8 +2,8 @@
 a1s rmr functionality
 """
 # ==================================================================================
 a1s rmr functionality
 """
 # ==================================================================================
-#       Copyright (c) 2019 Nokia
-#       Copyright (c) 2018-2019 AT&T Intellectual Property.
+#       Copyright (c) 2019-2020 Nokia
+#       Copyright (c) 2018-2020 AT&T Intellectual Property.
 #
 #   Licensed under the Apache License, Version 2.0 (the "License");
 #   you may not use this file except in compliance with the License.
 #
 #   Licensed under the Apache License, Version 2.0 (the "License");
 #   you may not use this file except in compliance with the License.
@@ -24,15 +24,13 @@ import json
 from threading import Thread
 from rmr import rmr, helpers
 from mdclogpy import Logger
 from threading import Thread
 from rmr import rmr, helpers
 from mdclogpy import Logger
-from a1 import data
+from a1 import data, messages
 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
 
 mdc_logger = Logger(name=__name__)
 
 
 RETRY_TIMES = int(os.environ.get("A1_RMR_RETRY_TIMES", 4))
 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
 
 mdc_logger = Logger(name=__name__)
 
 
 RETRY_TIMES = int(os.environ.get("A1_RMR_RETRY_TIMES", 4))
-
-
 A1_POLICY_REQUEST = 20010
 A1_POLICY_RESPONSE = 20011
 A1_POLICY_QUERY = 20012
 A1_POLICY_REQUEST = 20010
 A1_POLICY_RESPONSE = 20011
 A1_POLICY_QUERY = 20012
@@ -53,7 +51,9 @@ class _RmrLoop:
         self.keep_going = True
         self.rcv_func = None
         self.last_ran = time.time()
         self.keep_going = True
         self.rcv_func = None
         self.last_ran = time.time()
-        self.work_queue = queue.Queue()  # thread safe queue https://docs.python.org/3/library/queue.html
+
+        # see docs/overview#resiliency for a discussion of this
+        self.instance_send_queue = queue.Queue()  # thread safe queue https://docs.python.org/3/library/queue.html
 
         # intialize rmr context
         if init_func_override:
 
         # intialize rmr context
         if init_func_override:
@@ -68,15 +68,52 @@ class _RmrLoop:
                 time.sleep(0.5)
 
         # set the receive function
                 time.sleep(0.5)
 
         # set the receive function
-        # TODO: when policy query is implemented, add A1_POLICY_QUERY
         self.rcv_func = (
         self.rcv_func = (
-            rcv_func_override if rcv_func_override else lambda: helpers.rmr_rcvall_msgs(self.mrc, [A1_POLICY_RESPONSE])
+            rcv_func_override
+            if rcv_func_override
+            else lambda: helpers.rmr_rcvall_msgs_raw(self.mrc, [A1_POLICY_RESPONSE, A1_POLICY_QUERY])
         )
 
         # start the work loop
         self.thread = Thread(target=self.loop)
         self.thread.start()
 
         )
 
         # start the work loop
         self.thread = Thread(target=self.loop)
         self.thread.start()
 
+    def _assert_good_send(self, sbuf, pre_send_summary):
+        """
+        common helper function for _send_msg and _rts_msg
+        """
+        post_send_summary = rmr.message_summary(sbuf)
+        if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
+            return True
+        mdc_logger.debug("Message NOT sent!")
+        mdc_logger.debug("Pre-send summary: {0}, Post-send summary: {1}".format(pre_send_summary, post_send_summary))
+        return False
+
+    def _send_msg(self, pay, mtype, subid):
+        """
+        sends a msg
+        """
+        for _ in range(0, RETRY_TIMES):
+            sbuf = rmr.rmr_alloc_msg(self.mrc, len(pay), payload=pay, gen_transaction_id=True, mtype=mtype, sub_id=subid)
+            sbuf.contents.sub_id = subid
+            pre_send_summary = rmr.message_summary(sbuf)
+            sbuf = rmr.rmr_send_msg(self.mrc, sbuf)  # send
+            if self._assert_good_send(sbuf, pre_send_summary):
+                rmr.rmr_free_msg(sbuf)  # free
+                break
+
+    def _rts_msg(self, pay, sbuf_rts, mtype):
+        """
+        sends a message using rts
+        we do not call free here because we may rts many times; it is called after the rts loop
+        """
+        for _ in range(0, RETRY_TIMES):
+            pre_send_summary = rmr.message_summary(sbuf_rts)
+            sbuf_rts = rmr.rmr_rts_msg(self.mrc, sbuf_rts, payload=pay, mtype=mtype)
+            if self._assert_good_send(sbuf_rts, pre_send_summary):
+                break
+        return sbuf_rts  # in some cases rts may return a new sbuf
+
     def loop(self):
         """
         This loop runs forever, and has 3 jobs:
     def loop(self):
         """
         This loop runs forever, and has 3 jobs:
@@ -89,38 +126,52 @@ class _RmrLoop:
         while self.keep_going:
 
             # send out all messages waiting for us
         while self.keep_going:
 
             # send out all messages waiting for us
-            while not self.work_queue.empty():
-                work_item = self.work_queue.get(block=False, timeout=None)
-
-                pay = work_item["payload"].encode("utf-8")
-                for _ in range(0, RETRY_TIMES):
-                    # Waiting on an rmr bugfix regarding the over-allocation: https://rancodev.atlassian.net/browse/RICPLT-2490
-                    sbuf = rmr.rmr_alloc_msg(self.mrc, 4096, pay, True, A1_POLICY_REQUEST)
-                    # TODO: after next rmr is released, this can be done in the alloc call. but that's not avail in pypi yet
-                    sbuf.contents.sub_id = work_item["ptid"]
-                    pre_send_summary = rmr.message_summary(sbuf)
-                    sbuf = rmr.rmr_send_msg(self.mrc, sbuf)  # send
-                    post_send_summary = rmr.message_summary(sbuf)
-                    mdc_logger.debug(
-                        "Pre-send summary: {0}, Post-send summary: {1}".format(pre_send_summary, post_send_summary)
-                    )
-                    rmr.rmr_free_msg(sbuf)  # free
-                    if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
-                        mdc_logger.debug("Message sent successfully!")
-                        break
-
-            # read our mailbox and update statuses
-            for msg in self.rcv_func():
+            while not self.instance_send_queue.empty():
+                work_item = self.instance_send_queue.get(block=False, timeout=None)
+                payload = json.dumps(messages.a1_to_handler(*work_item)).encode("utf-8")
+                self._send_msg(payload, A1_POLICY_REQUEST, work_item[1])
+
+            # read our mailbox
+            for (msg, sbuf) in self.rcv_func():
+                # TODO: in the future we may also have to catch SDL errors
                 try:
                 try:
-                    pay = json.loads(msg["payload"])
-                    pti = pay["policy_type_id"]
-                    pii = pay["policy_instance_id"]
-                    data.set_policy_instance_status(pti, pii, pay["handler_id"], pay["status"])
-                except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError, TypeError, json.decoder.JSONDecodeError):
-                    # TODO: in the future we may also have to catch SDL errors
-                    mdc_logger.debug("Dropping malformed or non applicable message: {0}".format(msg))
-
-            # TODO: what's a reasonable sleep time? we don't want to hammer redis too much, and a1 isn't a real time component
+                    mtype = msg["message type"]
+                except (KeyError, TypeError, json.decoder.JSONDecodeError):
+                    mdc_logger.debug("Dropping malformed policy ack/query message: {0}".format(msg))
+
+                if mtype == A1_POLICY_RESPONSE:
+                    try:
+                        # got a policy response, update status
+                        pay = json.loads(msg["payload"])
+                        data.set_policy_instance_status(
+                            pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
+                        )
+                        mdc_logger.debug("Successfully received status update: {0}".format(pay))
+                    except (PolicyTypeNotFound, PolicyInstanceNotFound):
+                        mdc_logger.debug("Received a response  for a non-existent instance")
+                    except (KeyError, TypeError, json.decoder.JSONDecodeError):
+                        mdc_logger.debug("Dropping malformed policy ack message: {0}".format(msg))
+
+                elif mtype == A1_POLICY_QUERY:
+                    try:
+                        # got a query, do a lookup and send out all instances
+                        pti = json.loads(msg["payload"])["policy_type_id"]
+                        mdc_logger.debug("Received query for: {0}".format(pti))
+                        for pii in data.get_instance_list(pti):
+                            instance = data.get_policy_instance(pti, pii)
+                            payload = json.dumps(messages.a1_to_handler("CREATE", pti, pii, instance)).encode("utf-8")
+                            sbuf = self._rts_msg(payload, sbuf, A1_POLICY_REQUEST)
+                    except (PolicyTypeNotFound, PolicyInstanceNotFound):
+                        mdc_logger.debug("Received a query for a non-existent type: {0}".format(msg))
+                    except (KeyError, TypeError, json.decoder.JSONDecodeError):
+                        mdc_logger.debug("Dropping malformed policy query message: {0}".format(msg))
+
+                else:
+                    mdc_logger.debug("Received message type {0} but A1 does not handle this".format(mtype))
+
+                # we must free each sbuf
+                rmr.rmr_free_msg(sbuf)
+
             self.last_ran = time.time()
             time.sleep(1)
 
             self.last_ran = time.time()
             time.sleep(1)
 
@@ -144,12 +195,12 @@ def stop_rmr_thread():
     __RMR_LOOP__.keep_going = False
 
 
     __RMR_LOOP__.keep_going = False
 
 
-def queue_work(item):
+def queue_instance_send(item):
     """
     push an item into the work queue
     currently the only type of work is to send out messages
     """
     """
     push an item into the work queue
     currently the only type of work is to send out messages
     """
-    __RMR_LOOP__.work_queue.put(item)
+    __RMR_LOOP__.instance_send_queue.put(item)
 
 
 def healthcheck_rmr_thread(seconds=30):
 
 
 def healthcheck_rmr_thread(seconds=30):