Threading pt 1
[ric-plt/a1.git] / a1 / a1rmr.py
index 51b5694..d6114bf 100644 (file)
 #   limitations under the License.
 # ==================================================================================
 import os
-import gevent
+import queue
+import time
+import json
 from rmr import rmr, helpers
 from a1 import get_module_logger
+from a1 import data
+from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
 
 logger = get_module_logger(__name__)
 
 
 RETRY_TIMES = int(os.environ.get("RMR_RETRY_TIMES", 4))
-MRC = None
 
+_SEND_QUEUE = queue.Queue()  # thread safe queue https://docs.python.org/3/library/queue.html
 
-def init_rmr():
+
+def _init_rmr():
     """
-    called from run; not called for unit tests
+    init an rmr context
+    This gets monkeypatched out for unit testing
     """
-    global MRC
     # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
     # internal ring of messages, and receive calls read from that
     # currently the size is 2048 messages, so this is fine for the foreseeable future
-    MRC = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
+    mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
+
+    while rmr.rmr_ready(mrc) == 0:
+        time.sleep(0.5)
 
-    while rmr.rmr_ready(MRC) == 0:
-        gevent.sleep(1)
-        logger.debug("not yet ready")
+    return mrc
 
 
-def send(payload, message_type=0):
+def _send(mrc, payload, message_type=0):
     """
     Sends a message up to RETRY_TIMES
     If the message is sent successfully, it returns the transactionid
     Does nothing otherwise
     """
-    # we may be called many times in asynchronous loops, so for now, it is safer not to share buffers. We can investigate later whether this is really a problem.
-    sbuf = rmr.rmr_alloc_msg(MRC, 4096)
+    # TODO: investigate moving this below and allocating the space based on the payload size
+    sbuf = rmr.rmr_alloc_msg(mrc, 4096)
     payload = payload if isinstance(payload, bytes) else payload.encode("utf-8")
 
     # retry RETRY_TIMES to send the message
@@ -63,7 +69,7 @@ def send(payload, message_type=0):
         transaction_id = pre_send_summary["transaction id"]  # save the transactionid because we need it later
 
         # send
-        sbuf = rmr.rmr_send_msg(MRC, sbuf)
+        sbuf = rmr.rmr_send_msg(mrc, sbuf)
         post_send_summary = rmr.message_summary(sbuf)
         logger.debug("Post message send summary: %s", rmr.message_summary(sbuf))
 
@@ -80,8 +86,85 @@ def send(payload, message_type=0):
     return None
 
 
-def dequeue_all_waiting_messages(filter_type=[]):
+def _update_all_statuses(mrc):
+    """
+    get all waiting messages, and try to parse them as status updates
+    (currently, those are the only messages a1 should get, this may have to be revisited later)
+    """
+    for msg in helpers.rmr_rcvall_msgs(mrc, [21024]):
+        try:
+            pay = json.loads(msg["payload"])
+            data.set_status(pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"])
+        except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError):
+            logger.debug("Dropping malformed or non applicable message")
+            logger.debug(msg)
+
+
+# Public
+
+
+def queue_work(item):
+    """
+    push an item into the work queue
+    currently the only type of work is to send out messages
+    """
+    _SEND_QUEUE.put(item)
+
+
+class RmrLoop:
     """
-    dequeue all waiting rmr messages from rmr
+    class represents an rmr loop meant to be called as a longstanding separate thread
     """
-    return helpers.rmr_rcvall_msgs(MRC, filter_type)
+
+    def __init__(self, real_init=True):
+        self._rmr_is_ready = False
+        self._keep_going = True
+        self._real_init = real_init  # useful for unit testing to turn off initialization
+
+    def rmr_is_ready(self):
+        """returns whether rmr has been initialized"""
+        return self._rmr_is_ready
+
+    def stop(self):
+        """sets a flag for the loop to end"""
+        self._keep_going = False
+
+    def loop(self):
+        """
+        This loop runs in an a1 thread forever, and has 3 jobs:
+        - send out any messages that have to go out (create instance, delete instance)
+        - read a1s mailbox and update the status of all instances based on acks from downstream policy handlers
+        - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
+        """
+
+        # get a context
+        mrc = None
+        logger.debug("Waiting for rmr to initialize...")
+        if self._real_init:
+            mrc = _init_rmr()
+        self._rmr_is_ready = True
+        logger.debug("Rmr is ready")
+
+        # loop forever
+        logger.debug("Work loop starting")
+        while self._keep_going:
+            """
+            We never raise an exception here. Log and keep moving
+            Bugs will eventually be caught be examining logs.
+            """
+            try:
+                # First, send out all messages waiting for us
+                while not _SEND_QUEUE.empty():
+                    work_item = _SEND_QUEUE.get(block=False, timeout=None)
+                    _send(mrc, payload=work_item["payload"], message_type=work_item["msg type"])
+
+                # Next, update all statuses waiting in a1s mailbox
+                _update_all_statuses(mrc)
+
+                # TODO: next body of work is to try to clean up the database for any updated statuses
+
+            except Exception as e:
+                logger.debug("Polling thread encountered an unexpected exception, but it will continue:")
+                logger.exception(e)
+
+            time.sleep(1)