Align unit and int tests w.r.t. AC xapp
[ric-plt/a1.git] / a1 / a1rmr.py
index fd0d87f..ae2bf00 100644 (file)
@@ -23,14 +23,20 @@ import time
 import json
 from threading import Thread
 from rmr import rmr, helpers
-from a1 import get_module_logger
+from mdclogpy import Logger
 from a1 import data
 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
 
-logger = get_module_logger(__name__)
+mdc_logger = Logger(name=__name__)
 
 
-RETRY_TIMES = int(os.environ.get("RMR_RETRY_TIMES", 4))
+RETRY_TIMES = int(os.environ.get("A1_RMR_RETRY_TIMES", 4))
+
+
+A1_POLICY_REQUEST = 20010
+A1_POLICY_RESPONSE = 20011
+A1_POLICY_QUERY = 20012
+
 
 # Note; yes, globals are bad, but this is a private (to this module) global
 # No other module can import/access this (well, python doesn't enforce this, but all linters will complain)
@@ -53,7 +59,7 @@ class _RmrLoop:
         if init_func_override:
             self.mrc = init_func_override()
         else:
-            logger.debug("Waiting for rmr to initialize..")
+            mdc_logger.debug("Waiting for rmr to initialize..")
             # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
             # internal ring of messages, and receive calls read from that
             # currently the size is 2048 messages, so this is fine for the foreseeable future
@@ -62,7 +68,10 @@ class _RmrLoop:
                 time.sleep(0.5)
 
         # set the receive function
-        self.rcv_func = rcv_func_override if rcv_func_override else lambda: helpers.rmr_rcvall_msgs(self.mrc, [21024])
+        # TODO: when policy query is implemented, add A1_POLICY_QUERY
+        self.rcv_func = (
+            rcv_func_override if rcv_func_override else lambda: helpers.rmr_rcvall_msgs(self.mrc, [A1_POLICY_RESPONSE])
+        )
 
         # start the work loop
         self.thread = Thread(target=self.loop)
@@ -76,7 +85,7 @@ class _RmrLoop:
         - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
         """
         # loop forever
-        logger.debug("Work loop starting")
+        mdc_logger.debug("Work loop starting")
         while self.keep_going:
 
             # send out all messages waiting for us
@@ -86,33 +95,30 @@ class _RmrLoop:
                 pay = work_item["payload"].encode("utf-8")
                 for _ in range(0, RETRY_TIMES):
                     # Waiting on an rmr bugfix regarding the over-allocation: https://rancodev.atlassian.net/browse/RICPLT-2490
-                    sbuf = rmr.rmr_alloc_msg(self.mrc, 4096, pay, True, work_item["msg type"])
+                    sbuf = rmr.rmr_alloc_msg(self.mrc, 4096, pay, True, A1_POLICY_REQUEST)
+                    # TODO: after next rmr is released, this can be done in the alloc call. but that's not avail in pypi yet
+                    sbuf.contents.sub_id = work_item["ptid"]
                     pre_send_summary = rmr.message_summary(sbuf)
                     sbuf = rmr.rmr_send_msg(self.mrc, sbuf)  # send
                     post_send_summary = rmr.message_summary(sbuf)
-                    logger.debug("Pre-send summary: %s, Post-send summary: %s", pre_send_summary, post_send_summary)
+                    mdc_logger.debug(
+                        "Pre-send summary: {0}, Post-send summary: {1}".format(pre_send_summary, post_send_summary)
+                    )
                     rmr.rmr_free_msg(sbuf)  # free
                     if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
-                        logger.debug("Message sent successfully!")
+                        mdc_logger.debug("Message sent successfully!")
                         break
 
             # read our mailbox and update statuses
-            updated_instances = set()
             for msg in self.rcv_func():
                 try:
                     pay = json.loads(msg["payload"])
                     pti = pay["policy_type_id"]
                     pii = pay["policy_instance_id"]
-                    data.set_status(pti, pii, pay["handler_id"], pay["status"])
-                    updated_instances.add((pti, pii))
-                except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError, json.decoder.JSONDecodeError):
+                    data.set_policy_instance_status(pti, pii, pay["handler_id"], pay["status"])
+                except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError, TypeError, json.decoder.JSONDecodeError):
                     # TODO: in the future we may also have to catch SDL errors
-                    logger.debug(("Dropping malformed or non applicable message", msg))
-
-            # for all updated instances, see if we can trigger a delete
-            # should be no catch needed here, since the status update would have failed if it was a bad pair
-            for ut in updated_instances:
-                data.clean_up_instance(ut[0], ut[1])
+                    mdc_logger.debug("Dropping malformed or non applicable message: {0}".format(msg))
 
             # TODO: what's a reasonable sleep time? we don't want to hammer redis too much, and a1 isn't a real time component
             self.last_ran = time.time()