Updates:
[ric-plt/a1.git] / a1 / a1rmr.py
index 359d2e0..771842a 100644 (file)
@@ -1,6 +1,9 @@
+"""
+a1s rmr functionality
+"""
 # ==================================================================================
-#       Copyright (c) 2019 Nokia
-#       Copyright (c) 2018-2019 AT&T Intellectual Property.
+#       Copyright (c) 2019-2020 Nokia
+#       Copyright (c) 2018-2020 AT&T Intellectual Property.
 #
 #   Licensed under the Apache License, Version 2.0 (the "License");
 #   you may not use this file except in compliance with the License.
@@ -20,160 +23,210 @@ import time
 import json
 from threading import Thread
 from rmr import rmr, helpers
-from a1 import get_module_logger
-from a1 import data
+from mdclogpy import Logger
+from a1 import data, messages
 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
 
-logger = get_module_logger(__name__)
+mdc_logger = Logger(name=__name__)
 
 
-RETRY_TIMES = int(os.environ.get("RMR_RETRY_TIMES", 4))
+RETRY_TIMES = int(os.environ.get("A1_RMR_RETRY_TIMES", 4))
+A1_POLICY_REQUEST = 20010
+A1_POLICY_RESPONSE = 20011
+A1_POLICY_QUERY = 20012
 
-_SEND_QUEUE = queue.Queue()  # thread safe queue https://docs.python.org/3/library/queue.html
 
+# Note; yes, globals are bad, but this is a private (to this module) global
+# No other module can import/access this (well, python doesn't enforce this, but all linters will complain)
+__RMR_LOOP__ = None
 
-def _init_rmr():
+
+class _RmrLoop:
     """
-    init an rmr context
-    This gets monkeypatched out for unit testing
+    class represents an rmr loop that constantly reads from rmr and performs operations based on waiting messages
+    this launches a thread, it should probably only be called once; the public facing method to access these ensures this
     """
-    # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
-    # internal ring of messages, and receive calls read from that
-    # currently the size is 2048 messages, so this is fine for the foreseeable future
-    logger.debug("Waiting for rmr to initialize..")
-    mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
-    while rmr.rmr_ready(mrc) == 0:
-        time.sleep(0.5)
 
-    return mrc
+    def __init__(self, init_func_override=None, rcv_func_override=None):
+        self.keep_going = True
+        self.rcv_func = None
+        self.last_ran = time.time()
+
+        # see docs/overview#resiliency for a discussion of this
+        self.instance_send_queue = queue.Queue()  # thread safe queue https://docs.python.org/3/library/queue.html
+
+        # intialize rmr context
+        if init_func_override:
+            self.mrc = init_func_override()
+        else:
+            mdc_logger.debug("Waiting for rmr to initialize..")
+            # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
+            # internal ring of messages, and receive calls read from that
+            # currently the size is 2048 messages, so this is fine for the foreseeable future
+            self.mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
+            while rmr.rmr_ready(self.mrc) == 0:
+                time.sleep(0.5)
+
+        # set the receive function
+        self.rcv_func = (
+            rcv_func_override
+            if rcv_func_override
+            else lambda: helpers.rmr_rcvall_msgs_raw(self.mrc, [A1_POLICY_RESPONSE, A1_POLICY_QUERY])
+        )
 
+        # start the work loop
+        self.thread = Thread(target=self.loop)
+        self.thread.start()
 
-def _send(mrc, payload, message_type=0):
-    """
-    Sends a message up to RETRY_TIMES
-    If the message is sent successfully, it returns the transactionid
-    Does nothing otherwise
-    """
-    # TODO: investigate moving this below and allocating the space based on the payload size
-    sbuf = rmr.rmr_alloc_msg(mrc, 4096)
-    payload = payload if isinstance(payload, bytes) else payload.encode("utf-8")
-
-    # retry RETRY_TIMES to send the message
-    for _ in range(0, RETRY_TIMES):
-        # setup the send message
-        rmr.set_payload_and_length(payload, sbuf)
-        rmr.generate_and_set_transaction_id(sbuf)
-        sbuf.contents.state = 0
-        sbuf.contents.mtype = message_type
-        pre_send_summary = rmr.message_summary(sbuf)
-        logger.debug("Pre message send summary: %s", pre_send_summary)
-        transaction_id = pre_send_summary["transaction id"]  # save the transactionid because we need it later
-
-        # send
-        sbuf = rmr.rmr_send_msg(mrc, sbuf)
+    def _assert_good_send(self, sbuf, pre_send_summary):
+        """
+        common helper function for _send_msg and _rts_msg
+        """
         post_send_summary = rmr.message_summary(sbuf)
-        logger.debug("Post message send summary: %s", rmr.message_summary(sbuf))
-
-        # check success or failure
         if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
-            # we are good
-            logger.debug("Message sent successfully!")
-            rmr.rmr_free_msg(sbuf)
-            return transaction_id
-
-    # we failed all RETRY_TIMES
-    logger.debug("Send failed all %s times, stopping", RETRY_TIMES)
-    rmr.rmr_free_msg(sbuf)
-    return None
-
-
-# Public
-
+            return True
+        mdc_logger.debug("Message NOT sent!")
+        mdc_logger.debug("Pre-send summary: {0}, Post-send summary: {1}".format(pre_send_summary, post_send_summary))
+        return False
 
-def queue_work(item):
-    """
-    push an item into the work queue
-    currently the only type of work is to send out messages
-    """
-    _SEND_QUEUE.put(item)
-
-
-class RmrLoop:
-    """
-    class represents an rmr loop meant to be called as a longstanding separate thread
-    """
-
-    def __init__(self, _init_func_override=None, rcv_func_override=None):
-        self._rmr_is_ready = False
-        self._keep_going = True
-        self._init_func_override = _init_func_override  # useful for unit testing
-        self._rcv_func_override = rcv_func_override  # useful for unit testing to mock certain recieve scenarios
-        self._rcv_func = None
-
-    def rmr_is_ready(self):
-        """returns whether rmr has been initialized"""
-        return self._rmr_is_ready
-
-    def stop(self):
-        """sets a flag for the loop to end"""
-        self._keep_going = False
+    def _send_msg(self, pay, mtype, subid):
+        """
+        sends a msg
+        """
+        for _ in range(0, RETRY_TIMES):
+            sbuf = rmr.rmr_alloc_msg(self.mrc, len(pay), payload=pay, gen_transaction_id=True, mtype=mtype, sub_id=subid)
+            sbuf.contents.sub_id = subid
+            pre_send_summary = rmr.message_summary(sbuf)
+            mdc_logger.debug("Trying to send message: {}".format(pre_send_summary))
+            sbuf = rmr.rmr_send_msg(self.mrc, sbuf)  # send
+            if self._assert_good_send(sbuf, pre_send_summary):
+                rmr.rmr_free_msg(sbuf)  # free
+                return
+
+        mdc_logger.debug("A1 did NOT send the message successfully after {} retries!".format(RETRY_TIMES))
+
+    def _rts_msg(self, pay, sbuf_rts, mtype):
+        """
+        sends a message using rts
+        we do not call free here because we may rts many times; it is called after the rts loop
+        """
+        for _ in range(0, RETRY_TIMES):
+            pre_send_summary = rmr.message_summary(sbuf_rts)
+            sbuf_rts = rmr.rmr_rts_msg(self.mrc, sbuf_rts, payload=pay, mtype=mtype)
+            if self._assert_good_send(sbuf_rts, pre_send_summary):
+                break
+        return sbuf_rts  # in some cases rts may return a new sbuf
+
+    def _handle_sends(self):
+        # send out all messages waiting for us
+        while not self.instance_send_queue.empty():
+            work_item = self.instance_send_queue.get(block=False, timeout=None)
+            payload = json.dumps(messages.a1_to_handler(*work_item)).encode("utf-8")
+            self._send_msg(payload, A1_POLICY_REQUEST, work_item[1])
 
     def loop(self):
         """
-        This loop runs in an a1 thread forever, and has 3 jobs:
+        This loop runs forever, and has 3 jobs:
         - send out any messages that have to go out (create instance, delete instance)
         - read a1s mailbox and update the status of all instances based on acks from downstream policy handlers
         - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
         """
-
-        # get a context
-        mrc = self._init_func_override() if self._init_func_override else _init_rmr()
-        self._rmr_is_ready = True
-        logger.debug("Rmr is ready")
-
-        # set the receive function called below
-        self._rcv_func = (
-            self._rcv_func_override if self._rcv_func_override else lambda: helpers.rmr_rcvall_msgs(mrc, [21024])
-        )
-
         # loop forever
-        logger.debug("Work loop starting")
-        while self._keep_going:
-            # send out all messages waiting for us
-            while not _SEND_QUEUE.empty():
-                work_item = _SEND_QUEUE.get(block=False, timeout=None)
-                _send(mrc, payload=work_item["payload"], message_type=work_item["msg type"])
-
-            # read our mailbox and update statuses
-            updated_instances = set()
-            for msg in self._rcv_func():
+        mdc_logger.debug("Work loop starting")
+        while self.keep_going:
+
+            # Update 3/20/2020
+            # We now handle our sends in a thread (that will just exit when it's done) because there is a difference between how send works in SI95 vs NNG.
+            # Send_msg via NNG formerly never blocked.
+            # However under SI95 this send may block for some arbitrary period of time on the first send to an endpoint for which a connection is not established
+            # If this send takes too long, this loop blocks, and the healthcheck will fail, which will cause A1s healthcheck to fail, which will cause Kubernetes to whack A1 and all kinds of horrible things happen.
+            # Therefore, now under SI95, we thread this.
+            Thread(target=self._handle_sends).start()
+
+            # read our mailbox
+            for (msg, sbuf) in self.rcv_func():
+                # TODO: in the future we may also have to catch SDL errors
                 try:
-                    pay = json.loads(msg["payload"])
-                    pti = pay["policy_type_id"]
-                    pii = pay["policy_instance_id"]
-                    data.set_status(pti, pii, pay["handler_id"], pay["status"])
-                    updated_instances.add((pti, pii))
-                except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError, json.decoder.JSONDecodeError):
-                    # TODO: in the future we may also have to catch SDL errors
-                    logger.debug(("Dropping malformed or non applicable message", msg))
+                    mtype = msg["message type"]
+                except (KeyError, TypeError, json.decoder.JSONDecodeError):
+                    mdc_logger.debug("Dropping malformed policy ack/query message: {0}".format(msg))
+
+                if mtype == A1_POLICY_RESPONSE:
+                    try:
+                        # got a policy response, update status
+                        pay = json.loads(msg["payload"])
+                        data.set_policy_instance_status(
+                            pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
+                        )
+                        mdc_logger.debug("Successfully received status update: {0}".format(pay))
+                    except (PolicyTypeNotFound, PolicyInstanceNotFound):
+                        mdc_logger.debug("Received a response  for a non-existent instance")
+                    except (KeyError, TypeError, json.decoder.JSONDecodeError):
+                        mdc_logger.debug("Dropping malformed policy ack message: {0}".format(msg))
+
+                elif mtype == A1_POLICY_QUERY:
+                    try:
+                        # got a query, do a lookup and send out all instances
+                        pti = json.loads(msg["payload"])["policy_type_id"]
+                        instance_list = data.get_instance_list(pti)  # will raise if a bad type
+                        mdc_logger.debug("Received a query for a good type: {0}".format(msg))
+                        for pii in instance_list:
+                            instance = data.get_policy_instance(pti, pii)
+                            payload = json.dumps(messages.a1_to_handler("CREATE", pti, pii, instance)).encode("utf-8")
+                            sbuf = self._rts_msg(payload, sbuf, A1_POLICY_REQUEST)
+                    except (PolicyTypeNotFound):
+                        mdc_logger.debug("Received a query for a non-existent type: {0}".format(msg))
+                    except (KeyError, TypeError, json.decoder.JSONDecodeError):
+                        mdc_logger.debug("Dropping malformed policy query message: {0}".format(msg))
+
+                else:
+                    mdc_logger.debug("Received message type {0} but A1 does not handle this".format(mtype))
+
+                # we must free each sbuf
+                rmr.rmr_free_msg(sbuf)
+
+            self.last_ran = time.time()
+            time.sleep(1)
+
+        mdc_logger.debug("RMR Thread Ending!")
 
-            # for all updated instances, see if we can trigger a delete
-            # should be no catch needed here, since the status update would have failed if it was a bad pair
-            for ut in updated_instances:
-                data.clean_up_instance(ut[0], ut[1])
 
-        # TODO: what's a reasonable sleep time? we don't want to hammer redis too much, and a1 isn't a real time component
-        time.sleep(1)
+# Public
 
 
 def start_rmr_thread(init_func_override=None, rcv_func_override=None):
     """
     Start a1s rmr thread
-    Also called during unit testing
     """
-    rmr_loop = RmrLoop(init_func_override, rcv_func_override)
-    thread = Thread(target=rmr_loop.loop)
-    thread.start()
-    while not rmr_loop.rmr_is_ready():
-        time.sleep(0.5)
-    return rmr_loop  # return the handle; useful during unit testing
+    global __RMR_LOOP__
+    if __RMR_LOOP__ is None:
+        __RMR_LOOP__ = _RmrLoop(init_func_override, rcv_func_override)
+
+
+def stop_rmr_thread():
+    """
+    stops the rmr thread
+    """
+    __RMR_LOOP__.keep_going = False
+
+
+def queue_instance_send(item):
+    """
+    push an item into the work queue
+    currently the only type of work is to send out messages
+    """
+    __RMR_LOOP__.instance_send_queue.put(item)
+
+
+def healthcheck_rmr_thread(seconds=30):
+    """
+    returns a boolean representing whether the rmr loop is healthy, by checking two attributes:
+    1. is it running?,
+    2. is it stuck in a long (> seconds) loop?
+    """
+    return __RMR_LOOP__.thread.is_alive() and ((time.time() - __RMR_LOOP__.last_ran) < seconds)
+
+
+def replace_rcv_func(rcv_func):
+    """purely for the ease of unit testing to test different rcv scenarios"""
+    __RMR_LOOP__.rcv_func = rcv_func