Towards a1 1.0.0: rmr improvements
[ric-plt/a1.git] / a1 / a1rmr.py
index abbb84f..51b5694 100644 (file)
@@ -16,7 +16,7 @@
 # ==================================================================================
 import os
 import gevent
-from rmr import rmr
+from rmr import rmr, helpers
 from a1 import get_module_logger
 
 logger = get_module_logger(__name__)
@@ -31,7 +31,10 @@ def init_rmr():
     called from run; not called for unit tests
     """
     global MRC
-    MRC = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, 0x00)
+    # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
+    # internal ring of messages, and receive calls read from that
+    # currently the size is 2048 messages, so this is fine for the foreseeable future
+    MRC = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
 
     while rmr.rmr_ready(MRC) == 0:
         gevent.sleep(1)
@@ -44,7 +47,7 @@ def send(payload, message_type=0):
     If the message is sent successfully, it returns the transactionid
     Does nothing otherwise
     """
-    # we may be called many times in asyncronous loops, so for now, it is safer not to share buffers. We can investifgate later whether this is really a problem.
+    # we may be called many times in asynchronous loops, so for now, it is safer not to share buffers. We can investigate later whether this is really a problem.
     sbuf = rmr.rmr_alloc_msg(MRC, 4096)
     payload = payload if isinstance(payload, bytes) else payload.encode("utf-8")
 
@@ -68,33 +71,17 @@ def send(payload, message_type=0):
         if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
             # we are good
             logger.debug("Message sent successfully!")
+            rmr.rmr_free_msg(sbuf)
             return transaction_id
 
     # we failed all RETRY_TIMES
     logger.debug("Send failed all %s times, stopping", RETRY_TIMES)
+    rmr.rmr_free_msg(sbuf)
     return None
 
 
-def dequeue_all_waiting_messages(filter_type=None):
+def dequeue_all_waiting_messages(filter_type=[]):
     """
     dequeue all waiting rmr messages from rmr
-    We only add messages of type 21024; we drop other "spam";
-    see https://rancodev.atlassian.net/wiki/spaces/RICPLT/pages/60784719/RIC+message+types
     """
-    new_messages = []
-    sbuf = rmr.rmr_alloc_msg(MRC, 4096)
-    while True:
-        sbuf = rmr.rmr_torcv_msg(MRC, sbuf, 0)  # set the timeout to 0 so this doesn't block!!
-        summary = rmr.message_summary(sbuf)
-        if summary["message state"] == 12 and summary["message status"] == "RMR_ERR_TIMEOUT":
-            # no new messages
-            break
-        else:
-            if (not filter_type) or (summary["message type"] == filter_type):
-                # message is relevent
-                new_messages.append(summary)
-            else:
-                # "spam", do nothing with message, effectively dropped
-                logger.debug("A message was received by a1, but it was not desired, droping: %s", summary)
-
-    return new_messages
+    return helpers.rmr_rcvall_msgs(MRC, filter_type)