1 # ==================================================================================
2 # Copyright (c) 2019 Nokia
3 # Copyright (c) 2018-2019 AT&T Intellectual Property.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ==================================================================================
19 from rmr import rmr, helpers
20 from a1 import get_module_logger
22 logger = get_module_logger(__name__)
25 RETRY_TIMES = int(os.environ.get("RMR_RETRY_TIMES", 4))
31 called from run; not called for unit tests
34 # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
35 # internal ring of messages, and receive calls read from that
36 # currently the size is 2048 messages, so this is fine for the foreseeable future
37 MRC = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
39 while rmr.rmr_ready(MRC) == 0:
41 logger.debug("not yet ready")
44 def send(payload, message_type=0):
46 Sends a message up to RETRY_TIMES
47 If the message is sent successfully, it returns the transactionid
48 Does nothing otherwise
50 # we may be called many times in asynchronous loops, so for now, it is safer not to share buffers. We can investigate later whether this is really a problem.
51 sbuf = rmr.rmr_alloc_msg(MRC, 4096)
52 payload = payload if isinstance(payload, bytes) else payload.encode("utf-8")
54 # retry RETRY_TIMES to send the message
55 for _ in range(0, RETRY_TIMES):
56 # setup the send message
57 rmr.set_payload_and_length(payload, sbuf)
58 rmr.generate_and_set_transaction_id(sbuf)
59 sbuf.contents.state = 0
60 sbuf.contents.mtype = message_type
61 pre_send_summary = rmr.message_summary(sbuf)
62 logger.debug("Pre message send summary: %s", pre_send_summary)
63 transaction_id = pre_send_summary["transaction id"] # save the transactionid because we need it later
66 sbuf = rmr.rmr_send_msg(MRC, sbuf)
67 post_send_summary = rmr.message_summary(sbuf)
68 logger.debug("Post message send summary: %s", rmr.message_summary(sbuf))
70 # check success or failure
71 if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
73 logger.debug("Message sent successfully!")
74 rmr.rmr_free_msg(sbuf)
77 # we failed all RETRY_TIMES
78 logger.debug("Send failed all %s times, stopping", RETRY_TIMES)
79 rmr.rmr_free_msg(sbuf)
83 def dequeue_all_waiting_messages(filter_type=[]):
85 dequeue all waiting rmr messages from rmr
87 return helpers.rmr_rcvall_msgs(MRC, filter_type)