1 # ==================================================================================
2 # Copyright (c) 2019 Nokia
3 # Copyright (c) 2018-2019 AT&T Intellectual Property.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ==================================================================================
20 from a1 import get_module_logger
21 from a1.exceptions import MessageSendFailure, ExpectedAckNotReceived
23 logger = get_module_logger(__name__)
26 RMR_RCV_RETRY_INTERVAL = int(os.environ.get("RMR_RCV_RETRY_INTERVAL", 1000))
27 RETRY_TIMES = int(os.environ.get("RMR_RETRY_TIMES", 4))
31 RECEIVED_MESSAGES = [] # used to store messages we need but havent been procedded yet
32 WAITING_TRANSIDS = {} # used to store transactionids we are waiting for, so we can filter other stuff out
35 def _dequeue_all_waiting_messages():
37 dequeue all waiting rmr messages from rmr, put them into RECEIVED_MESSAGES
40 sbuf = rmr.rmr_alloc_msg(MRC, 4096)
42 sbuf = rmr.rmr_torcv_msg(MRC, sbuf, 0) # set the timeout to 0 so this doesn't block!!
43 summary = rmr.message_summary(sbuf)
44 if summary["message state"] == 12 and summary["message status"] == "RMR_ERR_TIMEOUT":
46 elif summary["transaction id"] in WAITING_TRANSIDS: # message is relevent
47 new_messages.append(summary)
49 logger.debug("A message was received by a1, but a1 was not expecting it! It's being dropped: %s", summary)
50 # do nothing with message, effectively dropped
54 def _check_if_ack_received(target_transid, target_type):
56 Try to recieve the latest messages, then search the current queue for the target ACK
57 TODO: probably a slightly more efficient data structure than list. Maybe a dict by message type
58 However, in the near term, where there are not many xapps under A1, this is fine. Revisit later.
59 TODO: do we need to deal with duplicate ACKs for the same transaction id?
60 Is it possible if the downstream xapp uses rmr_rts? Might be harmless to sit in queue.. might slow things
63 new_messages = _dequeue_all_waiting_messages() # dequeue all waiting messages
64 global RECEIVED_MESSAGES # this is ugly, but fine.. we just need an in memory list across the async calls
65 RECEIVED_MESSAGES += new_messages
66 for index, summary in enumerate(RECEIVED_MESSAGES): # Search the queue for the target message
68 summary["message state"] == 0
69 and summary["message status"] == "RMR_OK"
70 and summary["message type"] == target_type
71 and summary["transaction id"] == target_transid
72 ): # Found; delete it from queue
73 del RECEIVED_MESSAGES[index]
80 called from run; not called for unit tests
83 MRC = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, 0x00)
85 while rmr.rmr_ready(MRC) == 0:
87 logger.debug("not yet ready")
90 def send(payload, message_type=0):
92 sends a message up to RETRY_TIMES
93 If the message is sent successfully, it returns the transactionid
94 Raises an exception (MessageSendFailure) otherwise
96 # we may be called many times in asyncronous loops, so for now, it is safer not to share buffers. We can investifgate later whether this is really a problem.
97 sbuf = rmr.rmr_alloc_msg(MRC, 4096)
98 payload = payload if isinstance(payload, bytes) else payload.encode("utf-8")
100 # retry RETRY_TIMES to send the message
103 # setup the send message
104 rmr.set_payload_and_length(payload, sbuf)
105 rmr.generate_and_set_transaction_id(sbuf)
106 sbuf.contents.state = 0
107 sbuf.contents.mtype = message_type
108 pre_send_summary = rmr.message_summary(sbuf)
109 logger.debug("Pre message send summary: %s", pre_send_summary)
110 transaction_id = pre_send_summary["transaction id"] # save the transactionid because we need it later
113 sbuf = rmr.rmr_send_msg(MRC, sbuf)
114 post_send_summary = rmr.message_summary(sbuf)
115 logger.debug("Post message send summary: %s", rmr.message_summary(sbuf))
117 # check success or failure
118 if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
119 return transaction_id # we are good
120 if post_send_summary["message state"] == 10 and post_send_summary["message status"] == "RMR_ERR_RETRY":
121 # in this state, we should retry
122 if tried == RETRY_TIMES:
123 # we have tried RETRY_TIMES and we are still not getting a good state, raise an exception and let the caller deal with it
124 raise MessageSendFailure(str(post_send_summary))
128 # we hit a state where we should not even retry
129 raise MessageSendFailure(str(post_send_summary))
132 def send_ack_retry(payload, expected_ack_message_type, message_type=0):
134 send a message and check for an ACK.
135 If no ACK is recieved, defer execution for RMR_RCV_RETRY_INTERVAL ms, then check again.
136 If no ack is received before the timeout (set by _rmr_init), send again and try again up to RETRY_TIMES
138 It is critical here to set the RMR_TIMEOUT to 0 in the rmr_rcv_to function, which causes that function NOT to block.
139 Instead, if the message isn't there, we give up execution for the interval, which allows the gevent server to process other requests in the meantime.
141 Amazing props to https://sdiehl.github.io/gevent-tutorial/
142 (which also runs this whole server)
145 # try to send the msg to the downstream policy handler
146 expected_transaction_id = send(payload, message_type)
147 WAITING_TRANSIDS[expected_transaction_id] = 1
149 gevent.sleep(0.01) # wait 10ms before we try the first recieve
150 for _ in range(0, RETRY_TIMES):
151 logger.debug("Seeing if return message is fufilled")
152 summary = _check_if_ack_received(expected_transaction_id, expected_ack_message_type)
154 logger.debug("Target ack Message received!: %s", summary)
155 logger.debug("current queue size is %d", len(RECEIVED_MESSAGES))
156 del WAITING_TRANSIDS[expected_transaction_id]
157 return summary["payload"]
159 logger.debug("Deffering execution for %s seconds", str(RMR_RCV_RETRY_INTERVAL / 1000))
160 gevent.sleep(RMR_RCV_RETRY_INTERVAL / 1000)
162 # we still didn't get the ACK we want
163 raise ExpectedAckNotReceived()