Threading pt 2 (of 3, likely)
[ric-plt/a1.git] / a1 / a1rmr.py
index d6114bf..359d2e0 100644 (file)
@@ -18,6 +18,7 @@ import os
 import queue
 import time
 import json
+from threading import Thread
 from rmr import rmr, helpers
 from a1 import get_module_logger
 from a1 import data
@@ -39,8 +40,8 @@ def _init_rmr():
     # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
     # internal ring of messages, and receive calls read from that
     # currently the size is 2048 messages, so this is fine for the foreseeable future
+    logger.debug("Waiting for rmr to initialize..")
     mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
-
     while rmr.rmr_ready(mrc) == 0:
         time.sleep(0.5)
 
@@ -86,20 +87,6 @@ def _send(mrc, payload, message_type=0):
     return None
 
 
-def _update_all_statuses(mrc):
-    """
-    get all waiting messages, and try to parse them as status updates
-    (currently, those are the only messages a1 should get, this may have to be revisited later)
-    """
-    for msg in helpers.rmr_rcvall_msgs(mrc, [21024]):
-        try:
-            pay = json.loads(msg["payload"])
-            data.set_status(pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"])
-        except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError):
-            logger.debug("Dropping malformed or non applicable message")
-            logger.debug(msg)
-
-
 # Public
 
 
@@ -116,10 +103,12 @@ class RmrLoop:
     class represents an rmr loop meant to be called as a longstanding separate thread
     """
 
-    def __init__(self, real_init=True):
+    def __init__(self, _init_func_override=None, rcv_func_override=None):
         self._rmr_is_ready = False
         self._keep_going = True
-        self._real_init = real_init  # useful for unit testing to turn off initialization
+        self._init_func_override = _init_func_override  # useful for unit testing
+        self._rcv_func_override = rcv_func_override  # useful for unit testing to mock certain recieve scenarios
+        self._rcv_func = None
 
     def rmr_is_ready(self):
         """returns whether rmr has been initialized"""
@@ -138,33 +127,53 @@ class RmrLoop:
         """
 
         # get a context
-        mrc = None
-        logger.debug("Waiting for rmr to initialize...")
-        if self._real_init:
-            mrc = _init_rmr()
+        mrc = self._init_func_override() if self._init_func_override else _init_rmr()
         self._rmr_is_ready = True
         logger.debug("Rmr is ready")
 
+        # set the receive function called below
+        self._rcv_func = (
+            self._rcv_func_override if self._rcv_func_override else lambda: helpers.rmr_rcvall_msgs(mrc, [21024])
+        )
+
         # loop forever
         logger.debug("Work loop starting")
         while self._keep_going:
-            """
-            We never raise an exception here. Log and keep moving
-            Bugs will eventually be caught be examining logs.
-            """
-            try:
-                # First, send out all messages waiting for us
-                while not _SEND_QUEUE.empty():
-                    work_item = _SEND_QUEUE.get(block=False, timeout=None)
-                    _send(mrc, payload=work_item["payload"], message_type=work_item["msg type"])
-
-                # Next, update all statuses waiting in a1s mailbox
-                _update_all_statuses(mrc)
-
-                # TODO: next body of work is to try to clean up the database for any updated statuses
-
-            except Exception as e:
-                logger.debug("Polling thread encountered an unexpected exception, but it will continue:")
-                logger.exception(e)
-
-            time.sleep(1)
+            # send out all messages waiting for us
+            while not _SEND_QUEUE.empty():
+                work_item = _SEND_QUEUE.get(block=False, timeout=None)
+                _send(mrc, payload=work_item["payload"], message_type=work_item["msg type"])
+
+            # read our mailbox and update statuses
+            updated_instances = set()
+            for msg in self._rcv_func():
+                try:
+                    pay = json.loads(msg["payload"])
+                    pti = pay["policy_type_id"]
+                    pii = pay["policy_instance_id"]
+                    data.set_status(pti, pii, pay["handler_id"], pay["status"])
+                    updated_instances.add((pti, pii))
+                except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError, json.decoder.JSONDecodeError):
+                    # TODO: in the future we may also have to catch SDL errors
+                    logger.debug(("Dropping malformed or non applicable message", msg))
+
+            # for all updated instances, see if we can trigger a delete
+            # should be no catch needed here, since the status update would have failed if it was a bad pair
+            for ut in updated_instances:
+                data.clean_up_instance(ut[0], ut[1])
+
+        # TODO: what's a reasonable sleep time? we don't want to hammer redis too much, and a1 isn't a real time component
+        time.sleep(1)
+
+
+def start_rmr_thread(init_func_override=None, rcv_func_override=None):
+    """
+    Start a1s rmr thread
+    Also called during unit testing
+    """
+    rmr_loop = RmrLoop(init_func_override, rcv_func_override)
+    thread = Thread(target=rmr_loop.loop)
+    thread.start()
+    while not rmr_loop.rmr_is_ready():
+        time.sleep(0.5)
+    return rmr_loop  # return the handle; useful during unit testing