1 # ==================================================================================
2 # Copyright (c) 2019 Nokia
3 # Copyright (c) 2018-2019 AT&T Intellectual Property.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ==================================================================================
20 from a1 import get_module_logger
22 logger = get_module_logger(__name__)
25 RETRY_TIMES = int(os.environ.get("RMR_RETRY_TIMES", 4))
31 called from run; not called for unit tests
34 MRC = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, 0x00)
36 while rmr.rmr_ready(MRC) == 0:
38 logger.debug("not yet ready")
41 def send(payload, message_type=0):
43 Sends a message up to RETRY_TIMES
44 If the message is sent successfully, it returns the transactionid
45 Does nothing otherwise
47 # we may be called many times in asyncronous loops, so for now, it is safer not to share buffers. We can investifgate later whether this is really a problem.
48 sbuf = rmr.rmr_alloc_msg(MRC, 4096)
49 payload = payload if isinstance(payload, bytes) else payload.encode("utf-8")
51 # retry RETRY_TIMES to send the message
52 for _ in range(0, RETRY_TIMES):
53 # setup the send message
54 rmr.set_payload_and_length(payload, sbuf)
55 rmr.generate_and_set_transaction_id(sbuf)
56 sbuf.contents.state = 0
57 sbuf.contents.mtype = message_type
58 pre_send_summary = rmr.message_summary(sbuf)
59 logger.debug("Pre message send summary: %s", pre_send_summary)
60 transaction_id = pre_send_summary["transaction id"] # save the transactionid because we need it later
63 sbuf = rmr.rmr_send_msg(MRC, sbuf)
64 post_send_summary = rmr.message_summary(sbuf)
65 logger.debug("Post message send summary: %s", rmr.message_summary(sbuf))
67 # check success or failure
68 if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
70 logger.debug("Message sent successfully!")
73 # we failed all RETRY_TIMES
74 logger.debug("Send failed all %s times, stopping", RETRY_TIMES)
77 def dequeue_all_waiting_messages(filter_type=None):
79 dequeue all waiting rmr messages from rmr
80 We only add messages of type 21024; we drop other "spam";
81 see https://rancodev.atlassian.net/wiki/spaces/RICPLT/pages/60784719/RIC+message+types
84 sbuf = rmr.rmr_alloc_msg(MRC, 4096)
86 sbuf = rmr.rmr_torcv_msg(MRC, sbuf, 0) # set the timeout to 0 so this doesn't block!!
87 summary = rmr.message_summary(sbuf)
88 if summary["message state"] == 12 and summary["message status"] == "RMR_ERR_TIMEOUT":
92 if (not filter_type) or (summary["message type"] == filter_type):
94 new_messages.append(summary)
96 # "spam", do nothing with message, effectively dropped
97 logger.debug("A message was received by a1, but it was not desired, droping: %s", summary)