Initial commit of A1
[ric-plt/a1.git] / a1 / a1rmr.py
1 # ==================================================================================
2 #       Copyright (c) 2019 Nokia
3 #       Copyright (c) 2018-2019 AT&T Intellectual Property.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 # ==================================================================================
17 import os
18 import gevent
19 from rmr import rmr
20 from a1 import get_module_logger
21 from a1.exceptions import MessageSendFailure, ExpectedAckNotReceived
22
23 logger = get_module_logger(__name__)
24
25
26 RMR_RCV_RETRY_INTERVAL = int(os.environ.get("RMR_RCV_RETRY_INTERVAL", 1000))
27 RETRY_TIMES = int(os.environ.get("RMR_RETRY_TIMES", 4))
28 MRC = None
29
30
31 RECEIVED_MESSAGES = []  # used to store messages we need but havent been procedded yet
32 WAITING_TRANSIDS = {}  # used to store transactionids we are waiting for, so we can filter other stuff out
33
34
35 def _dequeue_all_waiting_messages():
36     """
37     dequeue all waiting rmr messages from rmr, put them into RECEIVED_MESSAGES
38     """
39     new_messages = []
40     sbuf = rmr.rmr_alloc_msg(MRC, 4096)
41     while True:
42         sbuf = rmr.rmr_torcv_msg(MRC, sbuf, 0)  # set the timeout to 0 so this doesn't block!!
43         summary = rmr.message_summary(sbuf)
44         if summary["message state"] == 12 and summary["message status"] == "RMR_ERR_TIMEOUT":
45             break
46         elif summary["transaction id"] in WAITING_TRANSIDS:  # message is relevent
47             new_messages.append(summary)
48         else:
49             logger.debug("A message was received by a1, but a1 was not expecting it! It's being dropped: %s", summary)
50             # do nothing with message, effectively dropped
51     return new_messages
52
53
54 def _check_if_ack_received(target_transid, target_type):
55     """
56     Try to recieve the latest messages, then search the current queue for the target ACK
57     TODO: probably a slightly more efficient data structure than list. Maybe a dict by message type
58         However, in the near term, where there are not many xapps under A1, this is fine. Revisit later.
59     TODO: do we need to deal with duplicate ACKs for the same transaction id?
60         Is it possible if the downstream xapp uses rmr_rts? Might be harmless to sit in queue.. might slow things
61
62     """
63     new_messages = _dequeue_all_waiting_messages()  # dequeue all waiting messages
64     global RECEIVED_MESSAGES  # this is ugly, but fine.. we just need an in memory list across the async calls
65     RECEIVED_MESSAGES += new_messages
66     for index, summary in enumerate(RECEIVED_MESSAGES):  # Search the queue for the target message
67         if (
68             summary["message state"] == 0
69             and summary["message status"] == "RMR_OK"
70             and summary["message type"] == target_type
71             and summary["transaction id"] == target_transid
72         ):  # Found; delete it from queue
73             del RECEIVED_MESSAGES[index]
74             return summary
75     return None
76
77
78 def init_rmr():
79     """
80     called from run; not called for unit tests
81     """
82     global MRC
83     MRC = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, 0x00)
84
85     while rmr.rmr_ready(MRC) == 0:
86         gevent.sleep(1)
87         logger.debug("not yet ready")
88
89
90 def send(payload, message_type=0):
91     """
92     sends a message up to RETRY_TIMES
93     If the message is sent successfully, it returns the transactionid
94     Raises an exception (MessageSendFailure) otherwise
95     """
96     # we may be called many times in asyncronous loops, so for now, it is safer not to share buffers. We can investifgate later whether this is really a problem.
97     sbuf = rmr.rmr_alloc_msg(MRC, 4096)
98     payload = payload if isinstance(payload, bytes) else payload.encode("utf-8")
99
100     # retry RETRY_TIMES to send the message
101     tried = 0
102     while True:
103         # setup the send message
104         rmr.set_payload_and_length(payload, sbuf)
105         rmr.generate_and_set_transaction_id(sbuf)
106         sbuf.contents.state = 0
107         sbuf.contents.mtype = message_type
108         pre_send_summary = rmr.message_summary(sbuf)
109         logger.debug("Pre message send summary: %s", pre_send_summary)
110         transaction_id = pre_send_summary["transaction id"]  # save the transactionid because we need it later
111
112         # send
113         sbuf = rmr.rmr_send_msg(MRC, sbuf)
114         post_send_summary = rmr.message_summary(sbuf)
115         logger.debug("Post message send summary: %s", rmr.message_summary(sbuf))
116
117         # check success or failure
118         if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
119             return transaction_id  # we are good
120         if post_send_summary["message state"] == 10 and post_send_summary["message status"] == "RMR_ERR_RETRY":
121             # in this state, we should retry
122             if tried == RETRY_TIMES:
123                 # we have tried RETRY_TIMES and we are still not getting a good state, raise an exception and let the caller deal with it
124                 raise MessageSendFailure(str(post_send_summary))
125             else:
126                 tried += 1
127         else:
128             # we hit a state where we should not even retry
129             raise MessageSendFailure(str(post_send_summary))
130
131
132 def send_ack_retry(payload, expected_ack_message_type, message_type=0):
133     """
134     send a message and check for an ACK.
135     If no ACK is recieved, defer execution for RMR_RCV_RETRY_INTERVAL ms, then check again.
136     If no ack is received before the timeout (set by _rmr_init), send again and try again up to RETRY_TIMES
137
138     It is critical here to set the RMR_TIMEOUT to 0 in the rmr_rcv_to function, which causes that function NOT to block.
139     Instead, if the message isn't there, we give up execution for the interval, which allows the gevent server to process other requests in the meantime.
140
141     Amazing props to https://sdiehl.github.io/gevent-tutorial/
142     (which also runs this whole server)
143     """
144
145     # try to send the msg to the downstream policy handler
146     expected_transaction_id = send(payload, message_type)
147     WAITING_TRANSIDS[expected_transaction_id] = 1
148
149     gevent.sleep(0.01)  # wait 10ms before we try the first recieve
150     for _ in range(0, RETRY_TIMES):
151         logger.debug("Seeing if return message is fufilled")
152         summary = _check_if_ack_received(expected_transaction_id, expected_ack_message_type)
153         if summary:
154             logger.debug("Target ack Message received!: %s", summary)
155             logger.debug("current queue size is %d", len(RECEIVED_MESSAGES))
156             del WAITING_TRANSIDS[expected_transaction_id]
157             return summary["payload"]
158         else:
159             logger.debug("Deffering execution for %s seconds", str(RMR_RCV_RETRY_INTERVAL / 1000))
160             gevent.sleep(RMR_RCV_RETRY_INTERVAL / 1000)
161
162     # we still didn't get the ACK we want
163     raise ExpectedAckNotReceived()