4 # ==================================================================================
5 # Copyright (c) 2019-2020 Nokia
6 # Copyright (c) 2018-2020 AT&T Intellectual Property.
8 # Licensed under the Apache License, Version 2.0 (the "License");
9 # you may not use this file except in compliance with the License.
10 # You may obtain a copy of the License at
12 # http://www.apache.org/licenses/LICENSE-2.0
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS,
16 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 # See the License for the specific language governing permissions and
18 # limitations under the License.
19 # ==================================================================================
24 from threading import Thread
25 from rmr import rmr, helpers
26 from mdclogpy import Logger
27 from a1 import data, messages
28 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
30 mdc_logger = Logger(name=__name__)
33 RETRY_TIMES = int(os.environ.get("A1_RMR_RETRY_TIMES", 4))
34 A1_POLICY_REQUEST = 20010
35 A1_POLICY_RESPONSE = 20011
36 A1_POLICY_QUERY = 20012
39 # Note; yes, globals are bad, but this is a private (to this module) global
40 # No other module can import/access this (well, python doesn't enforce this, but all linters will complain)
46 class represents an rmr loop that constantly reads from rmr and performs operations based on waiting messages
47 this launches a thread, it should probably only be called once; the public facing method to access these ensures this
50 def __init__(self, init_func_override=None, rcv_func_override=None):
51 self.keep_going = True
53 self.last_ran = time.time()
55 # see docs/overview#resiliency for a discussion of this
56 self.instance_send_queue = queue.Queue() # thread safe queue https://docs.python.org/3/library/queue.html
58 # intialize rmr context
59 if init_func_override:
60 self.mrc = init_func_override()
62 mdc_logger.debug("Waiting for rmr to initialize..")
63 # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
64 # internal ring of messages, and receive calls read from that
65 # currently the size is 2048 messages, so this is fine for the foreseeable future
66 self.mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
67 while rmr.rmr_ready(self.mrc) == 0:
70 # set the receive function
74 else lambda: helpers.rmr_rcvall_msgs_raw(self.mrc, [A1_POLICY_RESPONSE, A1_POLICY_QUERY])
78 self.thread = Thread(target=self.loop)
81 def _assert_good_send(self, sbuf, pre_send_summary):
83 common helper function for _send_msg and _rts_msg
85 post_send_summary = rmr.message_summary(sbuf)
86 if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
88 mdc_logger.debug("Message NOT sent!")
89 mdc_logger.debug("Pre-send summary: {0}, Post-send summary: {1}".format(pre_send_summary, post_send_summary))
92 def _send_msg(self, pay, mtype, subid):
96 for _ in range(0, RETRY_TIMES):
97 sbuf = rmr.rmr_alloc_msg(self.mrc, len(pay), payload=pay, gen_transaction_id=True, mtype=mtype, sub_id=subid)
98 sbuf.contents.sub_id = subid
99 pre_send_summary = rmr.message_summary(sbuf)
100 sbuf = rmr.rmr_send_msg(self.mrc, sbuf) # send
101 if self._assert_good_send(sbuf, pre_send_summary):
102 rmr.rmr_free_msg(sbuf) # free
105 def _rts_msg(self, pay, sbuf_rts, mtype):
107 sends a message using rts
108 we do not call free here because we may rts many times; it is called after the rts loop
110 for _ in range(0, RETRY_TIMES):
111 pre_send_summary = rmr.message_summary(sbuf_rts)
112 sbuf_rts = rmr.rmr_rts_msg(self.mrc, sbuf_rts, payload=pay, mtype=mtype)
113 if self._assert_good_send(sbuf_rts, pre_send_summary):
115 return sbuf_rts # in some cases rts may return a new sbuf
119 This loop runs forever, and has 3 jobs:
120 - send out any messages that have to go out (create instance, delete instance)
121 - read a1s mailbox and update the status of all instances based on acks from downstream policy handlers
122 - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
125 mdc_logger.debug("Work loop starting")
126 while self.keep_going:
128 # send out all messages waiting for us
129 while not self.instance_send_queue.empty():
130 work_item = self.instance_send_queue.get(block=False, timeout=None)
131 payload = json.dumps(messages.a1_to_handler(*work_item)).encode("utf-8")
132 self._send_msg(payload, A1_POLICY_REQUEST, work_item[1])
135 for (msg, sbuf) in self.rcv_func():
136 # TODO: in the future we may also have to catch SDL errors
138 mtype = msg["message type"]
139 except (KeyError, TypeError, json.decoder.JSONDecodeError):
140 mdc_logger.debug("Dropping malformed policy ack/query message: {0}".format(msg))
142 if mtype == A1_POLICY_RESPONSE:
144 # got a policy response, update status
145 pay = json.loads(msg["payload"])
146 data.set_policy_instance_status(
147 pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
149 mdc_logger.debug("Successfully received status update: {0}".format(pay))
150 except (PolicyTypeNotFound, PolicyInstanceNotFound):
151 mdc_logger.debug("Received a response for a non-existent instance")
152 except (KeyError, TypeError, json.decoder.JSONDecodeError):
153 mdc_logger.debug("Dropping malformed policy ack message: {0}".format(msg))
155 elif mtype == A1_POLICY_QUERY:
157 # got a query, do a lookup and send out all instances
158 pti = json.loads(msg["payload"])["policy_type_id"]
159 mdc_logger.debug("Received query for: {0}".format(pti))
160 for pii in data.get_instance_list(pti):
161 instance = data.get_policy_instance(pti, pii)
162 payload = json.dumps(messages.a1_to_handler("CREATE", pti, pii, instance)).encode("utf-8")
163 sbuf = self._rts_msg(payload, sbuf, A1_POLICY_REQUEST)
164 except (PolicyTypeNotFound, PolicyInstanceNotFound):
165 mdc_logger.debug("Received a query for a non-existent type: {0}".format(msg))
166 except (KeyError, TypeError, json.decoder.JSONDecodeError):
167 mdc_logger.debug("Dropping malformed policy query message: {0}".format(msg))
170 mdc_logger.debug("Received message type {0} but A1 does not handle this".format(mtype))
172 # we must free each sbuf
173 rmr.rmr_free_msg(sbuf)
175 self.last_ran = time.time()
182 def start_rmr_thread(init_func_override=None, rcv_func_override=None):
187 if __RMR_LOOP__ is None:
188 __RMR_LOOP__ = _RmrLoop(init_func_override, rcv_func_override)
191 def stop_rmr_thread():
195 __RMR_LOOP__.keep_going = False
198 def queue_instance_send(item):
200 push an item into the work queue
201 currently the only type of work is to send out messages
203 __RMR_LOOP__.instance_send_queue.put(item)
206 def healthcheck_rmr_thread(seconds=30):
208 returns a boolean representing whether the rmr loop is healthy, by checking two attributes:
210 2. is it stuck in a long (> seconds) loop?
212 return __RMR_LOOP__.thread.is_alive() and ((time.time() - __RMR_LOOP__.last_ran) < seconds)
215 def replace_rcv_func(rcv_func):
216 """purely for the ease of unit testing to test different rcv scenarios"""
217 __RMR_LOOP__.rcv_func = rcv_func