1 # ==================================================================================
2 # Copyright (c) 2019-2020 Nokia
3 # Copyright (c) 2018-2020 AT&T Intellectual Property.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ==================================================================================
24 from threading import Thread
25 from ricxappframe.rmr import rmr, helpers
26 from mdclogpy import Logger
27 from a1 import data, messages
28 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
30 mdc_logger = Logger(name=__name__)
33 # With Nanomsg and NNG it was possible for a send attempt to have a "soft"
34 # failure which did warrant some retries if the status of the send is RMR_ERR_RETRY.
35 # Because of the way NNG worked, it sometimes required many tens of retries,
36 # and a retry state happened often for even moderately "verbose" applications.
37 # With SI95 there is still a possibility that a retry is necessary, but it is very rare.
38 RETRY_TIMES = int(os.environ.get("A1_RMR_RETRY_TIMES", 4))
39 A1_POLICY_REQUEST = 20010
40 A1_POLICY_RESPONSE = 20011
41 A1_POLICY_QUERY = 20012
44 # Note; yes, globals are bad, but this is a private (to this module) global
45 # No other module can import/access this (well, python doesn't enforce this, but all linters will complain)
51 Class represents an rmr loop that constantly reads from rmr and performs operations
52 based on waiting messages. This launches a thread, it should probably only be called
53 once; the public facing method to access these ensures this.
55 TODO: the xapp frame has a version of this looping structure. See if A1 can switch to that.
58 def __init__(self, init_func_override=None, rcv_func_override=None):
64 init_func_override: function (optional)
65 Function that initializes RMR and answers an RMR context.
66 Supply an empty function to skip initializing RMR.
68 rcv_func_override: function (optional)
69 Function that receives messages from RMR and answers a list.
70 Supply a trivial function to skip reading from RMR.
72 self.keep_going = True
74 self.last_ran = time.time()
76 # see docs/overview#resiliency for a discussion of this
77 self.instance_send_queue = queue.Queue() # thread safe queue https://docs.python.org/3/library/queue.html
79 # intialize rmr context
80 if init_func_override:
81 self.mrc = init_func_override()
83 mdc_logger.debug("Waiting for rmr to initialize..")
84 # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread
85 # populates an internal ring of messages, and receive calls read from that.
86 # currently the size is 2048 messages, so this is fine for the foreseeable future
87 self.mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
88 while rmr.rmr_ready(self.mrc) == 0:
91 # set the receive function
95 else lambda: helpers.rmr_rcvall_msgs_raw(self.mrc, [A1_POLICY_RESPONSE, A1_POLICY_QUERY])
99 self.thread = Thread(target=self.loop)
102 def _assert_good_send(self, sbuf, pre_send_summary):
104 Extracts the send result and logs a detailed warning if the send failed.
105 Returns the message state, an integer that indicates the result.
107 post_send_summary = rmr.message_summary(sbuf)
108 if post_send_summary[rmr.RMR_MS_MSG_STATE] != rmr.RMR_OK:
109 mdc_logger.warning("RMR send failed; pre-send summary: {0}, post-send summary: {1}".format(pre_send_summary, post_send_summary))
110 return post_send_summary[rmr.RMR_MS_MSG_STATE]
112 def _send_msg(self, pay, mtype, subid):
114 Creates and sends a message via RMR's send-message feature with the specified payload
115 using the specified message type and subscription ID.
117 sbuf = rmr.rmr_alloc_msg(self.mrc, len(pay), payload=pay, gen_transaction_id=True, mtype=mtype, sub_id=subid)
118 sbuf.contents.sub_id = subid
119 pre_send_summary = rmr.message_summary(sbuf)
120 for _ in range(0, RETRY_TIMES):
121 mdc_logger.debug("_send_msg: sending: {}".format(pre_send_summary))
122 sbuf = rmr.rmr_send_msg(self.mrc, sbuf)
123 msg_state = self._assert_good_send(sbuf, pre_send_summary)
124 mdc_logger.debug("_send_msg: result message state: {}".format(msg_state))
125 if msg_state != rmr.RMR_ERR_RETRY:
128 rmr.rmr_free_msg(sbuf)
129 if msg_state != rmr.RMR_OK:
130 mdc_logger.warning("_send_msg: failed after {} retries".format(RETRY_TIMES))
132 def _rts_msg(self, pay, sbuf_rts, mtype):
134 Sends a message via RMR's return-to-sender feature.
135 This neither allocates nor frees a message buffer because we may rts many times.
136 Returns the message buffer from the RTS function, which may reallocate it.
138 pre_send_summary = rmr.message_summary(sbuf_rts)
139 for _ in range(0, RETRY_TIMES):
140 mdc_logger.debug("_rts_msg: sending: {}".format(pre_send_summary))
141 sbuf_rts = rmr.rmr_rts_msg(self.mrc, sbuf_rts, payload=pay, mtype=mtype)
142 msg_state = self._assert_good_send(sbuf_rts, pre_send_summary)
143 mdc_logger.debug("_rts_msg: result message state: {}".format(msg_state))
144 if msg_state != rmr.RMR_ERR_RETRY:
147 if msg_state != rmr.RMR_OK:
148 mdc_logger.warning("_rts_msg: failed after {} retries".format(RETRY_TIMES))
149 return sbuf_rts # in some cases rts may return a new sbuf
151 def _handle_sends(self):
152 # send out all messages waiting for us
153 while not self.instance_send_queue.empty():
154 work_item = self.instance_send_queue.get(block=False, timeout=None)
155 payload = json.dumps(messages.a1_to_handler(*work_item)).encode("utf-8")
156 self._send_msg(payload, A1_POLICY_REQUEST, work_item[1])
160 This loop runs forever, and has 3 jobs:
161 - send out any messages that have to go out (create instance, delete instance)
162 - read a1s mailbox and update the status of all instances based on acks from downstream policy handlers
163 - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
166 mdc_logger.debug("Work loop starting")
167 while self.keep_going:
170 # We now handle our sends in a thread (that will just exit when it's done) because there is a difference between how send works in SI95 vs NNG.
171 # Send_msg via NNG formerly never blocked.
172 # However under SI95 this send may block for some arbitrary period of time on the first send to an endpoint for which a connection is not established
173 # If this send takes too long, this loop blocks, and the healthcheck will fail, which will cause A1s healthcheck to fail, which will cause Kubernetes to whack A1 and all kinds of horrible things happen.
174 # Therefore, now under SI95, we thread this.
175 Thread(target=self._handle_sends).start()
178 for (msg, sbuf) in self.rcv_func():
179 # TODO: in the future we may also have to catch SDL errors
181 mtype = msg[rmr.RMR_MS_MSG_TYPE]
182 except (KeyError, TypeError, json.decoder.JSONDecodeError):
183 mdc_logger.warning("Dropping malformed message: {0}".format(msg))
185 if mtype == A1_POLICY_RESPONSE:
187 # got a policy response, update status
188 pay = json.loads(msg[rmr.RMR_MS_PAYLOAD])
189 data.set_policy_instance_status(
190 pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
192 mdc_logger.debug("Successfully received status update: {0}".format(pay))
193 except (PolicyTypeNotFound, PolicyInstanceNotFound):
194 mdc_logger.warning("Received a response for a non-existent type/instance: {0}".format(msg))
195 except (KeyError, TypeError, json.decoder.JSONDecodeError):
196 mdc_logger.warning("Dropping malformed policy response: {0}".format(msg))
198 elif mtype == A1_POLICY_QUERY:
200 # got a query, do a lookup and send out all instances
201 pti = json.loads(msg[rmr.RMR_MS_PAYLOAD])["policy_type_id"]
202 instance_list = data.get_instance_list(pti) # will raise if a bad type
203 mdc_logger.debug("Received a query for a known policy type: {0}".format(msg))
204 for pii in instance_list:
205 instance = data.get_policy_instance(pti, pii)
206 payload = json.dumps(messages.a1_to_handler("CREATE", pti, pii, instance)).encode("utf-8")
207 sbuf = self._rts_msg(payload, sbuf, A1_POLICY_REQUEST)
208 except (PolicyTypeNotFound):
209 mdc_logger.warning("Received a policy query for a non-existent type: {0}".format(msg))
210 except (KeyError, TypeError, json.decoder.JSONDecodeError):
211 mdc_logger.warning("Dropping malformed policy query: {0}".format(msg))
214 mdc_logger.warning("Received message type {0} but A1 does not handle this".format(mtype))
216 # we must free each sbuf
217 rmr.rmr_free_msg(sbuf)
219 self.last_ran = time.time()
222 mdc_logger.debug("RMR Thread Ending!")
228 def start_rmr_thread(init_func_override=None, rcv_func_override=None):
234 init_func_override: function (optional)
235 Function that initializes RMR and answers an RMR context.
236 Supply an empty function to skip initializing RMR.
238 rcv_func_override: function (optional)
239 Function that receives messages from RMR and answers a list.
240 Supply a trivial function to skip reading from RMR.
243 if __RMR_LOOP__ is None:
244 __RMR_LOOP__ = _RmrLoop(init_func_override, rcv_func_override)
247 def stop_rmr_thread():
251 __RMR_LOOP__.keep_going = False
254 def queue_instance_send(item):
256 push an item into the work queue
257 currently the only type of work is to send out messages
259 __RMR_LOOP__.instance_send_queue.put(item)
262 def healthcheck_rmr_thread(seconds=30):
264 returns a boolean representing whether the rmr loop is healthy, by checking two attributes:
266 2. is it stuck in a long (> seconds) loop?
268 return __RMR_LOOP__.thread.is_alive() and ((time.time() - __RMR_LOOP__.last_ran) < seconds)
271 def replace_rcv_func(rcv_func):
272 """purely for the ease of unit testing to test different rcv scenarios"""
273 __RMR_LOOP__.rcv_func = rcv_func