Align unit and int tests w.r.t. AC xapp
[ric-plt/a1.git] / a1 / a1rmr.py
index d6114bf..ae2bf00 100644 (file)
@@ -1,3 +1,6 @@
+"""
+a1s rmr functionality
+"""
 # ==================================================================================
 #       Copyright (c) 2019 Nokia
 #       Copyright (c) 2018-2019 AT&T Intellectual Property.
 # ==================================================================================
 #       Copyright (c) 2019 Nokia
 #       Copyright (c) 2018-2019 AT&T Intellectual Property.
@@ -18,89 +21,127 @@ import os
 import queue
 import time
 import json
 import queue
 import time
 import json
+from threading import Thread
 from rmr import rmr, helpers
 from rmr import rmr, helpers
-from a1 import get_module_logger
+from mdclogpy import Logger
 from a1 import data
 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
 
 from a1 import data
 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
 
-logger = get_module_logger(__name__)
+mdc_logger = Logger(name=__name__)
 
 
 
 
-RETRY_TIMES = int(os.environ.get("RMR_RETRY_TIMES", 4))
+RETRY_TIMES = int(os.environ.get("A1_RMR_RETRY_TIMES", 4))
 
 
-_SEND_QUEUE = queue.Queue()  # thread safe queue https://docs.python.org/3/library/queue.html
 
 
+A1_POLICY_REQUEST = 20010
+A1_POLICY_RESPONSE = 20011
+A1_POLICY_QUERY = 20012
 
 
-def _init_rmr():
+
+# Note; yes, globals are bad, but this is a private (to this module) global
+# No other module can import/access this (well, python doesn't enforce this, but all linters will complain)
+__RMR_LOOP__ = None
+
+
+class _RmrLoop:
     """
     """
-    init an rmr context
-    This gets monkeypatched out for unit testing
+    class represents an rmr loop that constantly reads from rmr and performs operations based on waiting messages
+    this launches a thread, it should probably only be called once; the public facing method to access these ensures this
     """
     """
-    # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
-    # internal ring of messages, and receive calls read from that
-    # currently the size is 2048 messages, so this is fine for the foreseeable future
-    mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
 
 
-    while rmr.rmr_ready(mrc) == 0:
-        time.sleep(0.5)
+    def __init__(self, init_func_override=None, rcv_func_override=None):
+        self.keep_going = True
+        self.rcv_func = None
+        self.last_ran = time.time()
+        self.work_queue = queue.Queue()  # thread safe queue https://docs.python.org/3/library/queue.html
+
+        # intialize rmr context
+        if init_func_override:
+            self.mrc = init_func_override()
+        else:
+            mdc_logger.debug("Waiting for rmr to initialize..")
+            # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
+            # internal ring of messages, and receive calls read from that
+            # currently the size is 2048 messages, so this is fine for the foreseeable future
+            self.mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
+            while rmr.rmr_ready(self.mrc) == 0:
+                time.sleep(0.5)
+
+        # set the receive function
+        # TODO: when policy query is implemented, add A1_POLICY_QUERY
+        self.rcv_func = (
+            rcv_func_override if rcv_func_override else lambda: helpers.rmr_rcvall_msgs(self.mrc, [A1_POLICY_RESPONSE])
+        )
+
+        # start the work loop
+        self.thread = Thread(target=self.loop)
+        self.thread.start()
+
+    def loop(self):
+        """
+        This loop runs forever, and has 3 jobs:
+        - send out any messages that have to go out (create instance, delete instance)
+        - read a1s mailbox and update the status of all instances based on acks from downstream policy handlers
+        - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
+        """
+        # loop forever
+        mdc_logger.debug("Work loop starting")
+        while self.keep_going:
+
+            # send out all messages waiting for us
+            while not self.work_queue.empty():
+                work_item = self.work_queue.get(block=False, timeout=None)
+
+                pay = work_item["payload"].encode("utf-8")
+                for _ in range(0, RETRY_TIMES):
+                    # Waiting on an rmr bugfix regarding the over-allocation: https://rancodev.atlassian.net/browse/RICPLT-2490
+                    sbuf = rmr.rmr_alloc_msg(self.mrc, 4096, pay, True, A1_POLICY_REQUEST)
+                    # TODO: after next rmr is released, this can be done in the alloc call. but that's not avail in pypi yet
+                    sbuf.contents.sub_id = work_item["ptid"]
+                    pre_send_summary = rmr.message_summary(sbuf)
+                    sbuf = rmr.rmr_send_msg(self.mrc, sbuf)  # send
+                    post_send_summary = rmr.message_summary(sbuf)
+                    mdc_logger.debug(
+                        "Pre-send summary: {0}, Post-send summary: {1}".format(pre_send_summary, post_send_summary)
+                    )
+                    rmr.rmr_free_msg(sbuf)  # free
+                    if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
+                        mdc_logger.debug("Message sent successfully!")
+                        break
+
+            # read our mailbox and update statuses
+            for msg in self.rcv_func():
+                try:
+                    pay = json.loads(msg["payload"])
+                    pti = pay["policy_type_id"]
+                    pii = pay["policy_instance_id"]
+                    data.set_policy_instance_status(pti, pii, pay["handler_id"], pay["status"])
+                except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError, TypeError, json.decoder.JSONDecodeError):
+                    # TODO: in the future we may also have to catch SDL errors
+                    mdc_logger.debug("Dropping malformed or non applicable message: {0}".format(msg))
+
+            # TODO: what's a reasonable sleep time? we don't want to hammer redis too much, and a1 isn't a real time component
+            self.last_ran = time.time()
+            time.sleep(1)
+
 
 
-    return mrc
+# Public
 
 
 
 
-def _send(mrc, payload, message_type=0):
-    """
-    Sends a message up to RETRY_TIMES
-    If the message is sent successfully, it returns the transactionid
-    Does nothing otherwise
-    """
-    # TODO: investigate moving this below and allocating the space based on the payload size
-    sbuf = rmr.rmr_alloc_msg(mrc, 4096)
-    payload = payload if isinstance(payload, bytes) else payload.encode("utf-8")
-
-    # retry RETRY_TIMES to send the message
-    for _ in range(0, RETRY_TIMES):
-        # setup the send message
-        rmr.set_payload_and_length(payload, sbuf)
-        rmr.generate_and_set_transaction_id(sbuf)
-        sbuf.contents.state = 0
-        sbuf.contents.mtype = message_type
-        pre_send_summary = rmr.message_summary(sbuf)
-        logger.debug("Pre message send summary: %s", pre_send_summary)
-        transaction_id = pre_send_summary["transaction id"]  # save the transactionid because we need it later
-
-        # send
-        sbuf = rmr.rmr_send_msg(mrc, sbuf)
-        post_send_summary = rmr.message_summary(sbuf)
-        logger.debug("Post message send summary: %s", rmr.message_summary(sbuf))
-
-        # check success or failure
-        if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
-            # we are good
-            logger.debug("Message sent successfully!")
-            rmr.rmr_free_msg(sbuf)
-            return transaction_id
-
-    # we failed all RETRY_TIMES
-    logger.debug("Send failed all %s times, stopping", RETRY_TIMES)
-    rmr.rmr_free_msg(sbuf)
-    return None
-
-
-def _update_all_statuses(mrc):
+def start_rmr_thread(init_func_override=None, rcv_func_override=None):
     """
     """
-    get all waiting messages, and try to parse them as status updates
-    (currently, those are the only messages a1 should get, this may have to be revisited later)
+    Start a1s rmr thread
     """
     """
-    for msg in helpers.rmr_rcvall_msgs(mrc, [21024]):
-        try:
-            pay = json.loads(msg["payload"])
-            data.set_status(pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"])
-        except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError):
-            logger.debug("Dropping malformed or non applicable message")
-            logger.debug(msg)
+    global __RMR_LOOP__
+    if __RMR_LOOP__ is None:
+        __RMR_LOOP__ = _RmrLoop(init_func_override, rcv_func_override)
 
 
 
 
-# Public
+def stop_rmr_thread():
+    """
+    stops the rmr thread
+    """
+    __RMR_LOOP__.keep_going = False
 
 
 def queue_work(item):
 
 
 def queue_work(item):
@@ -108,63 +149,18 @@ def queue_work(item):
     push an item into the work queue
     currently the only type of work is to send out messages
     """
     push an item into the work queue
     currently the only type of work is to send out messages
     """
-    _SEND_QUEUE.put(item)
+    __RMR_LOOP__.work_queue.put(item)
 
 
 
 
-class RmrLoop:
+def healthcheck_rmr_thread(seconds=30):
     """
     """
-    class represents an rmr loop meant to be called as a longstanding separate thread
+    returns a boolean representing whether the rmr loop is healthy, by checking two attributes:
+    1. is it running?,
+    2. is it stuck in a long (> seconds) loop?
     """
     """
+    return __RMR_LOOP__.thread.is_alive() and ((time.time() - __RMR_LOOP__.last_ran) < seconds)
 
 
-    def __init__(self, real_init=True):
-        self._rmr_is_ready = False
-        self._keep_going = True
-        self._real_init = real_init  # useful for unit testing to turn off initialization
-
-    def rmr_is_ready(self):
-        """returns whether rmr has been initialized"""
-        return self._rmr_is_ready
-
-    def stop(self):
-        """sets a flag for the loop to end"""
-        self._keep_going = False
-
-    def loop(self):
-        """
-        This loop runs in an a1 thread forever, and has 3 jobs:
-        - send out any messages that have to go out (create instance, delete instance)
-        - read a1s mailbox and update the status of all instances based on acks from downstream policy handlers
-        - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
-        """
-
-        # get a context
-        mrc = None
-        logger.debug("Waiting for rmr to initialize...")
-        if self._real_init:
-            mrc = _init_rmr()
-        self._rmr_is_ready = True
-        logger.debug("Rmr is ready")
-
-        # loop forever
-        logger.debug("Work loop starting")
-        while self._keep_going:
-            """
-            We never raise an exception here. Log and keep moving
-            Bugs will eventually be caught be examining logs.
-            """
-            try:
-                # First, send out all messages waiting for us
-                while not _SEND_QUEUE.empty():
-                    work_item = _SEND_QUEUE.get(block=False, timeout=None)
-                    _send(mrc, payload=work_item["payload"], message_type=work_item["msg type"])
-
-                # Next, update all statuses waiting in a1s mailbox
-                _update_all_statuses(mrc)
-
-                # TODO: next body of work is to try to clean up the database for any updated statuses
-
-            except Exception as e:
-                logger.debug("Polling thread encountered an unexpected exception, but it will continue:")
-                logger.exception(e)
 
 
-            time.sleep(1)
+def replace_rcv_func(rcv_func):
+    """purely for the ease of unit testing to test different rcv scenarios"""
+    __RMR_LOOP__.rcv_func = rcv_func