# ==================================================================================
import os
import gevent
-from rmr import rmr
+from rmr import rmr, helpers
from a1 import get_module_logger
logger = get_module_logger(__name__)
called from run; not called for unit tests
"""
global MRC
- MRC = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, 0x00)
+ # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
+ # internal ring of messages, and receive calls read from that
+ # currently the size is 2048 messages, so this is fine for the foreseeable future
+ MRC = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
while rmr.rmr_ready(MRC) == 0:
gevent.sleep(1)
If the message is sent successfully, it returns the transactionid
Does nothing otherwise
"""
- # we may be called many times in asyncronous loops, so for now, it is safer not to share buffers. We can investifgate later whether this is really a problem.
+ # we may be called many times in asynchronous loops, so for now, it is safer not to share buffers. We can investigate later whether this is really a problem.
sbuf = rmr.rmr_alloc_msg(MRC, 4096)
payload = payload if isinstance(payload, bytes) else payload.encode("utf-8")
if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
# we are good
logger.debug("Message sent successfully!")
+ rmr.rmr_free_msg(sbuf)
return transaction_id
# we failed all RETRY_TIMES
logger.debug("Send failed all %s times, stopping", RETRY_TIMES)
+ rmr.rmr_free_msg(sbuf)
return None
-def dequeue_all_waiting_messages(filter_type=None):
+def dequeue_all_waiting_messages(filter_type=[]):
"""
dequeue all waiting rmr messages from rmr
- We only add messages of type 21024; we drop other "spam";
- see https://rancodev.atlassian.net/wiki/spaces/RICPLT/pages/60784719/RIC+message+types
"""
- new_messages = []
- sbuf = rmr.rmr_alloc_msg(MRC, 4096)
- while True:
- sbuf = rmr.rmr_torcv_msg(MRC, sbuf, 0) # set the timeout to 0 so this doesn't block!!
- summary = rmr.message_summary(sbuf)
- if summary["message state"] == 12 and summary["message status"] == "RMR_ERR_TIMEOUT":
- # no new messages
- break
- else:
- if (not filter_type) or (summary["message type"] == filter_type):
- # message is relevent
- new_messages.append(summary)
- else:
- # "spam", do nothing with message, effectively dropped
- logger.debug("A message was received by a1, but it was not desired, droping: %s", summary)
-
- return new_messages
+ return helpers.rmr_rcvall_msgs(MRC, filter_type)