4 # ==================================================================================
5 # Copyright (c) 2019-2020 Nokia
6 # Copyright (c) 2018-2020 AT&T Intellectual Property.
8 # Licensed under the Apache License, Version 2.0 (the "License");
9 # you may not use this file except in compliance with the License.
10 # You may obtain a copy of the License at
12 # http://www.apache.org/licenses/LICENSE-2.0
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS,
16 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 # See the License for the specific language governing permissions and
18 # limitations under the License.
19 # ==================================================================================
24 from threading import Thread
25 from rmr import rmr, helpers
26 from mdclogpy import Logger
27 from a1 import data, messages
28 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
30 mdc_logger = Logger(name=__name__)
33 RETRY_TIMES = int(os.environ.get("A1_RMR_RETRY_TIMES", 4))
34 A1_POLICY_REQUEST = 20010
35 A1_POLICY_RESPONSE = 20011
36 A1_POLICY_QUERY = 20012
39 # Note; yes, globals are bad, but this is a private (to this module) global
40 # No other module can import/access this (well, python doesn't enforce this, but all linters will complain)
46 class represents an rmr loop that constantly reads from rmr and performs operations based on waiting messages
47 this launches a thread, it should probably only be called once; the public facing method to access these ensures this
50 def __init__(self, init_func_override=None, rcv_func_override=None):
51 self.keep_going = True
53 self.last_ran = time.time()
55 # see docs/overview#resiliency for a discussion of this
56 self.instance_send_queue = queue.Queue() # thread safe queue https://docs.python.org/3/library/queue.html
58 # intialize rmr context
59 if init_func_override:
60 self.mrc = init_func_override()
62 mdc_logger.debug("Waiting for rmr to initialize..")
63 # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
64 # internal ring of messages, and receive calls read from that
65 # currently the size is 2048 messages, so this is fine for the foreseeable future
66 self.mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
67 while rmr.rmr_ready(self.mrc) == 0:
70 # set the receive function
74 else lambda: helpers.rmr_rcvall_msgs_raw(self.mrc, [A1_POLICY_RESPONSE, A1_POLICY_QUERY])
78 self.thread = Thread(target=self.loop)
81 def _assert_good_send(self, sbuf, pre_send_summary):
83 common helper function for _send_msg and _rts_msg
85 post_send_summary = rmr.message_summary(sbuf)
86 if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
88 mdc_logger.debug("Message NOT sent!")
89 mdc_logger.debug("Pre-send summary: {0}, Post-send summary: {1}".format(pre_send_summary, post_send_summary))
92 def _send_msg(self, pay, mtype, subid):
96 for _ in range(0, RETRY_TIMES):
97 sbuf = rmr.rmr_alloc_msg(self.mrc, len(pay), payload=pay, gen_transaction_id=True, mtype=mtype, sub_id=subid)
98 sbuf.contents.sub_id = subid
99 pre_send_summary = rmr.message_summary(sbuf)
100 mdc_logger.debug("Trying to send message: {}".format(pre_send_summary))
101 sbuf = rmr.rmr_send_msg(self.mrc, sbuf) # send
102 if self._assert_good_send(sbuf, pre_send_summary):
103 rmr.rmr_free_msg(sbuf) # free
106 mdc_logger.debug("A1 did NOT send the message successfully after {} retries!".format(RETRY_TIMES))
108 def _rts_msg(self, pay, sbuf_rts, mtype):
110 sends a message using rts
111 we do not call free here because we may rts many times; it is called after the rts loop
113 for _ in range(0, RETRY_TIMES):
114 pre_send_summary = rmr.message_summary(sbuf_rts)
115 sbuf_rts = rmr.rmr_rts_msg(self.mrc, sbuf_rts, payload=pay, mtype=mtype)
116 if self._assert_good_send(sbuf_rts, pre_send_summary):
118 return sbuf_rts # in some cases rts may return a new sbuf
120 def _handle_sends(self):
121 # send out all messages waiting for us
122 while not self.instance_send_queue.empty():
123 work_item = self.instance_send_queue.get(block=False, timeout=None)
124 payload = json.dumps(messages.a1_to_handler(*work_item)).encode("utf-8")
125 self._send_msg(payload, A1_POLICY_REQUEST, work_item[1])
129 This loop runs forever, and has 3 jobs:
130 - send out any messages that have to go out (create instance, delete instance)
131 - read a1s mailbox and update the status of all instances based on acks from downstream policy handlers
132 - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
135 mdc_logger.debug("Work loop starting")
136 while self.keep_going:
139 # We now handle our sends in a thread (that will just exit when it's done) because there is a difference between how send works in SI95 vs NNG.
140 # Send_msg via NNG formerly never blocked.
141 # However under SI95 this send may block for some arbitrary period of time on the first send to an endpoint for which a connection is not established
142 # If this send takes too long, this loop blocks, and the healthcheck will fail, which will cause A1s healthcheck to fail, which will cause Kubernetes to whack A1 and all kinds of horrible things happen.
143 # Therefore, now under SI95, we thread this.
144 Thread(target=self._handle_sends).start()
147 for (msg, sbuf) in self.rcv_func():
148 # TODO: in the future we may also have to catch SDL errors
150 mtype = msg["message type"]
151 except (KeyError, TypeError, json.decoder.JSONDecodeError):
152 mdc_logger.debug("Dropping malformed policy ack/query message: {0}".format(msg))
154 if mtype == A1_POLICY_RESPONSE:
156 # got a policy response, update status
157 pay = json.loads(msg["payload"])
158 data.set_policy_instance_status(
159 pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
161 mdc_logger.debug("Successfully received status update: {0}".format(pay))
162 except (PolicyTypeNotFound, PolicyInstanceNotFound):
163 mdc_logger.debug("Received a response for a non-existent instance")
164 except (KeyError, TypeError, json.decoder.JSONDecodeError):
165 mdc_logger.debug("Dropping malformed policy ack message: {0}".format(msg))
167 elif mtype == A1_POLICY_QUERY:
169 # got a query, do a lookup and send out all instances
170 pti = json.loads(msg["payload"])["policy_type_id"]
171 instance_list = data.get_instance_list(pti) # will raise if a bad type
172 mdc_logger.debug("Received a query for a good type: {0}".format(msg))
173 for pii in instance_list:
174 instance = data.get_policy_instance(pti, pii)
175 payload = json.dumps(messages.a1_to_handler("CREATE", pti, pii, instance)).encode("utf-8")
176 sbuf = self._rts_msg(payload, sbuf, A1_POLICY_REQUEST)
177 except (PolicyTypeNotFound):
178 mdc_logger.debug("Received a query for a non-existent type: {0}".format(msg))
179 except (KeyError, TypeError, json.decoder.JSONDecodeError):
180 mdc_logger.debug("Dropping malformed policy query message: {0}".format(msg))
183 mdc_logger.debug("Received message type {0} but A1 does not handle this".format(mtype))
185 # we must free each sbuf
186 rmr.rmr_free_msg(sbuf)
188 self.last_ran = time.time()
191 mdc_logger.debug("RMR Thread Ending!")
197 def start_rmr_thread(init_func_override=None, rcv_func_override=None):
202 if __RMR_LOOP__ is None:
203 __RMR_LOOP__ = _RmrLoop(init_func_override, rcv_func_override)
206 def stop_rmr_thread():
210 __RMR_LOOP__.keep_going = False
213 def queue_instance_send(item):
215 push an item into the work queue
216 currently the only type of work is to send out messages
218 __RMR_LOOP__.instance_send_queue.put(item)
221 def healthcheck_rmr_thread(seconds=30):
223 returns a boolean representing whether the rmr loop is healthy, by checking two attributes:
225 2. is it stuck in a long (> seconds) loop?
227 return __RMR_LOOP__.thread.is_alive() and ((time.time() - __RMR_LOOP__.last_ran) < seconds)
230 def replace_rcv_func(rcv_func):
231 """purely for the ease of unit testing to test different rcv scenarios"""
232 __RMR_LOOP__.rcv_func = rcv_func