1 # ==================================================================================
2 # Copyright (c) 2019-2020 Nokia
3 # Copyright (c) 2018-2020 AT&T Intellectual Property.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ==================================================================================
24 from threading import Thread
25 from ricxappframe.rmr import rmr, helpers
26 from mdclogpy import Logger
27 from a1 import data, messages
28 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
30 mdc_logger = Logger(name=__name__)
33 RETRY_TIMES = int(os.environ.get("A1_RMR_RETRY_TIMES", 4))
34 A1_POLICY_REQUEST = 20010
35 A1_POLICY_RESPONSE = 20011
36 A1_POLICY_QUERY = 20012
39 # Note; yes, globals are bad, but this is a private (to this module) global
40 # No other module can import/access this (well, python doesn't enforce this, but all linters will complain)
46 Class represents an rmr loop that constantly reads from rmr and performs operations
47 based on waiting messages. This launches a thread, it should probably only be called
48 once; the public facing method to access these ensures this.
50 TODO: the xapp frame has a version of this looping structure. See if A1 can switch to that.
53 def __init__(self, init_func_override=None, rcv_func_override=None):
59 init_func_override: function (optional)
60 Function that initializes RMR and answers an RMR context.
61 Supply an empty function to skip initializing RMR.
63 rcv_func_override: function (optional)
64 Function that receives messages from RMR and answers a list.
65 Supply a trivial function to skip reading from RMR.
67 self.keep_going = True
69 self.last_ran = time.time()
71 # see docs/overview#resiliency for a discussion of this
72 self.instance_send_queue = queue.Queue() # thread safe queue https://docs.python.org/3/library/queue.html
74 # intialize rmr context
75 if init_func_override:
76 self.mrc = init_func_override()
78 mdc_logger.debug("Waiting for rmr to initialize..")
79 # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
80 # internal ring of messages, and receive calls read from that
81 # currently the size is 2048 messages, so this is fine for the foreseeable future
82 self.mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
83 while rmr.rmr_ready(self.mrc) == 0:
86 # set the receive function
90 else lambda: helpers.rmr_rcvall_msgs_raw(self.mrc, [A1_POLICY_RESPONSE, A1_POLICY_QUERY])
94 self.thread = Thread(target=self.loop)
97 def _assert_good_send(self, sbuf, pre_send_summary):
99 common helper function for _send_msg and _rts_msg
101 post_send_summary = rmr.message_summary(sbuf)
102 if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
104 mdc_logger.debug("Message NOT sent!")
105 mdc_logger.debug("Pre-send summary: {0}, Post-send summary: {1}".format(pre_send_summary, post_send_summary))
108 def _send_msg(self, pay, mtype, subid):
112 for _ in range(0, RETRY_TIMES):
113 sbuf = rmr.rmr_alloc_msg(self.mrc, len(pay), payload=pay, gen_transaction_id=True, mtype=mtype, sub_id=subid)
114 sbuf.contents.sub_id = subid
115 pre_send_summary = rmr.message_summary(sbuf)
116 mdc_logger.debug("Trying to send message: {}".format(pre_send_summary))
117 sbuf = rmr.rmr_send_msg(self.mrc, sbuf) # send
118 if self._assert_good_send(sbuf, pre_send_summary):
119 rmr.rmr_free_msg(sbuf) # free
122 mdc_logger.debug("A1 did NOT send the message successfully after {} retries!".format(RETRY_TIMES))
124 def _rts_msg(self, pay, sbuf_rts, mtype):
126 sends a message using rts
127 we do not call free here because we may rts many times; it is called after the rts loop
129 for _ in range(0, RETRY_TIMES):
130 pre_send_summary = rmr.message_summary(sbuf_rts)
131 sbuf_rts = rmr.rmr_rts_msg(self.mrc, sbuf_rts, payload=pay, mtype=mtype)
132 if self._assert_good_send(sbuf_rts, pre_send_summary):
134 return sbuf_rts # in some cases rts may return a new sbuf
136 def _handle_sends(self):
137 # send out all messages waiting for us
138 while not self.instance_send_queue.empty():
139 work_item = self.instance_send_queue.get(block=False, timeout=None)
140 payload = json.dumps(messages.a1_to_handler(*work_item)).encode("utf-8")
141 self._send_msg(payload, A1_POLICY_REQUEST, work_item[1])
145 This loop runs forever, and has 3 jobs:
146 - send out any messages that have to go out (create instance, delete instance)
147 - read a1s mailbox and update the status of all instances based on acks from downstream policy handlers
148 - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
151 mdc_logger.debug("Work loop starting")
152 while self.keep_going:
155 # We now handle our sends in a thread (that will just exit when it's done) because there is a difference between how send works in SI95 vs NNG.
156 # Send_msg via NNG formerly never blocked.
157 # However under SI95 this send may block for some arbitrary period of time on the first send to an endpoint for which a connection is not established
158 # If this send takes too long, this loop blocks, and the healthcheck will fail, which will cause A1s healthcheck to fail, which will cause Kubernetes to whack A1 and all kinds of horrible things happen.
159 # Therefore, now under SI95, we thread this.
160 Thread(target=self._handle_sends).start()
163 for (msg, sbuf) in self.rcv_func():
164 # TODO: in the future we may also have to catch SDL errors
166 mtype = msg["message type"]
167 except (KeyError, TypeError, json.decoder.JSONDecodeError):
168 mdc_logger.debug("Dropping malformed policy ack/query message: {0}".format(msg))
170 if mtype == A1_POLICY_RESPONSE:
172 # got a policy response, update status
173 pay = json.loads(msg["payload"])
174 data.set_policy_instance_status(
175 pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
177 mdc_logger.debug("Successfully received status update: {0}".format(pay))
178 except (PolicyTypeNotFound, PolicyInstanceNotFound):
179 mdc_logger.debug("Received a response for a non-existent instance")
180 except (KeyError, TypeError, json.decoder.JSONDecodeError):
181 mdc_logger.debug("Dropping malformed policy ack message: {0}".format(msg))
183 elif mtype == A1_POLICY_QUERY:
185 # got a query, do a lookup and send out all instances
186 pti = json.loads(msg["payload"])["policy_type_id"]
187 instance_list = data.get_instance_list(pti) # will raise if a bad type
188 mdc_logger.debug("Received a query for a good type: {0}".format(msg))
189 for pii in instance_list:
190 instance = data.get_policy_instance(pti, pii)
191 payload = json.dumps(messages.a1_to_handler("CREATE", pti, pii, instance)).encode("utf-8")
192 sbuf = self._rts_msg(payload, sbuf, A1_POLICY_REQUEST)
193 except (PolicyTypeNotFound):
194 mdc_logger.debug("Received a query for a non-existent type: {0}".format(msg))
195 except (KeyError, TypeError, json.decoder.JSONDecodeError):
196 mdc_logger.debug("Dropping malformed policy query message: {0}".format(msg))
199 mdc_logger.debug("Received message type {0} but A1 does not handle this".format(mtype))
201 # we must free each sbuf
202 rmr.rmr_free_msg(sbuf)
204 self.last_ran = time.time()
207 mdc_logger.debug("RMR Thread Ending!")
213 def start_rmr_thread(init_func_override=None, rcv_func_override=None):
219 init_func_override: function (optional)
220 Function that initializes RMR and answers an RMR context.
221 Supply an empty function to skip initializing RMR.
223 rcv_func_override: function (optional)
224 Function that receives messages from RMR and answers a list.
225 Supply a trivial function to skip reading from RMR.
228 if __RMR_LOOP__ is None:
229 __RMR_LOOP__ = _RmrLoop(init_func_override, rcv_func_override)
232 def stop_rmr_thread():
236 __RMR_LOOP__.keep_going = False
239 def queue_instance_send(item):
241 push an item into the work queue
242 currently the only type of work is to send out messages
244 __RMR_LOOP__.instance_send_queue.put(item)
247 def healthcheck_rmr_thread(seconds=30):
249 returns a boolean representing whether the rmr loop is healthy, by checking two attributes:
251 2. is it stuck in a long (> seconds) loop?
253 return __RMR_LOOP__.thread.is_alive() and ((time.time() - __RMR_LOOP__.last_ran) < seconds)
256 def replace_rcv_func(rcv_func):
257 """purely for the ease of unit testing to test different rcv scenarios"""
258 __RMR_LOOP__.rcv_func = rcv_func