050674c3a2d88805e0e4ea1ca161233de7874162
[ric-plt/a1.git] / a1 / a1rmr.py
1 # ==================================================================================
2 #       Copyright (c) 2019-2020 Nokia
3 #       Copyright (c) 2018-2020 AT&T Intellectual Property.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 # ==================================================================================
17 """
18 A1 RMR functionality
19 """
20 import os
21 import queue
22 import time
23 import json
24 from threading import Thread
25 from ricxappframe.rmr import rmr, helpers
26 from mdclogpy import Logger
27 from a1 import data, messages
28 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
29
30 mdc_logger = Logger(name=__name__)
31
32
33 # With Nanomsg and NNG it was possible for a send attempt to have a "soft"
34 # failure which did warrant some retries if the status of the send is RMR_ERR_RETRY.
35 # Because of the way NNG worked, it sometimes required many tens of retries,
36 # and a retry state happened often for even moderately "verbose" applications.
37 # With SI95 there is still a possibility that a retry is necessary, but it is very rare.
38 RETRY_TIMES = int(os.environ.get("A1_RMR_RETRY_TIMES", 4))
39 A1_POLICY_REQUEST = 20010
40 A1_POLICY_RESPONSE = 20011
41 A1_POLICY_QUERY = 20012
42
43
44 # Note; yes, globals are bad, but this is a private (to this module) global
45 # No other module can import/access this (well, python doesn't enforce this, but all linters will complain)
46 __RMR_LOOP__ = None
47
48
49 class _RmrLoop:
50     """
51     Class represents an rmr loop that constantly reads from rmr and performs operations
52     based on waiting messages.  This launches a thread, it should probably only be called
53     once; the public facing method to access these ensures this.
54
55     TODO: the xapp frame has a version of this looping structure. See if A1 can switch to that.
56     """
57
58     def __init__(self, init_func_override=None, rcv_func_override=None):
59         """
60         Init
61
62         Parameters
63         ----------
64         init_func_override: function (optional)
65             Function that initializes RMR and answers an RMR context.
66             Supply an empty function to skip initializing RMR.
67
68         rcv_func_override: function (optional)
69             Function that receives messages from RMR and answers a list.
70             Supply a trivial function to skip reading from RMR.
71         """
72         self.keep_going = True
73         self.rcv_func = None
74         self.last_ran = time.time()
75
76         # see docs/overview#resiliency for a discussion of this
77         self.instance_send_queue = queue.Queue()  # thread safe queue https://docs.python.org/3/library/queue.html
78
79         # intialize rmr context
80         if init_func_override:
81             self.mrc = init_func_override()
82         else:
83             mdc_logger.debug("Waiting for rmr to initialize..")
84             # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread
85             # populates an internal ring of messages, and receive calls read from that.
86             # currently the size is 2048 messages, so this is fine for the foreseeable future
87             self.mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
88             while rmr.rmr_ready(self.mrc) == 0:
89                 time.sleep(0.5)
90
91         # set the receive function
92         self.rcv_func = (
93             rcv_func_override
94             if rcv_func_override
95             else lambda: helpers.rmr_rcvall_msgs_raw(self.mrc, [A1_POLICY_RESPONSE, A1_POLICY_QUERY])
96         )
97
98         # start the work loop
99         self.thread = Thread(target=self.loop)
100         self.thread.start()
101
102     def _assert_good_send(self, sbuf, pre_send_summary):
103         """
104         Extracts the send result and logs a detailed warning if the send failed.
105         Returns the message state, an integer that indicates the result.
106         """
107         post_send_summary = rmr.message_summary(sbuf)
108         if post_send_summary[rmr.RMR_MS_MSG_STATE] != rmr.RMR_OK:
109             mdc_logger.warning("RMR send failed; pre-send summary: {0}, post-send summary: {1}".format(pre_send_summary, post_send_summary))
110         return post_send_summary[rmr.RMR_MS_MSG_STATE]
111
112     def _send_msg(self, pay, mtype, subid):
113         """
114         Creates and sends a message via RMR's send-message feature with the specified payload
115         using the specified message type and subscription ID.
116         """
117         sbuf = rmr.rmr_alloc_msg(self.mrc, len(pay), payload=pay, gen_transaction_id=True, mtype=mtype, sub_id=subid)
118         sbuf.contents.sub_id = subid
119         pre_send_summary = rmr.message_summary(sbuf)
120         for _ in range(0, RETRY_TIMES):
121             mdc_logger.debug("_send_msg: sending: {}".format(pre_send_summary))
122             sbuf = rmr.rmr_send_msg(self.mrc, sbuf)
123             msg_state = self._assert_good_send(sbuf, pre_send_summary)
124             mdc_logger.debug("_send_msg: result message state: {}".format(msg_state))
125             if msg_state != rmr.RMR_ERR_RETRY:
126                 break
127
128         rmr.rmr_free_msg(sbuf)
129         if msg_state != rmr.RMR_OK:
130             mdc_logger.warning("_send_msg: failed after {} retries".format(RETRY_TIMES))
131
132     def _rts_msg(self, pay, sbuf_rts, mtype):
133         """
134         Sends a message via RMR's return-to-sender feature.
135         This neither allocates nor frees a message buffer because we may rts many times.
136         Returns the message buffer from the RTS function, which may reallocate it.
137         """
138         pre_send_summary = rmr.message_summary(sbuf_rts)
139         for _ in range(0, RETRY_TIMES):
140             mdc_logger.debug("_rts_msg: sending: {}".format(pre_send_summary))
141             sbuf_rts = rmr.rmr_rts_msg(self.mrc, sbuf_rts, payload=pay, mtype=mtype)
142             msg_state = self._assert_good_send(sbuf_rts, pre_send_summary)
143             mdc_logger.debug("_rts_msg: result message state: {}".format(msg_state))
144             if msg_state != rmr.RMR_ERR_RETRY:
145                 break
146
147         if msg_state != rmr.RMR_OK:
148             mdc_logger.warning("_rts_msg: failed after {} retries".format(RETRY_TIMES))
149         return sbuf_rts  # in some cases rts may return a new sbuf
150
151     def _handle_sends(self):
152         # send out all messages waiting for us
153         while not self.instance_send_queue.empty():
154             work_item = self.instance_send_queue.get(block=False, timeout=None)
155             payload = json.dumps(messages.a1_to_handler(*work_item)).encode("utf-8")
156             self._send_msg(payload, A1_POLICY_REQUEST, work_item[1])
157
158     def loop(self):
159         """
160         This loop runs forever, and has 3 jobs:
161         - send out any messages that have to go out (create instance, delete instance)
162         - read a1s mailbox and update the status of all instances based on acks from downstream policy handlers
163         - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
164         """
165         # loop forever
166         mdc_logger.debug("Work loop starting")
167         while self.keep_going:
168
169             # Update 3/20/2020
170             # We now handle our sends in a thread (that will just exit when it's done) because there is a difference between how send works in SI95 vs NNG.
171             # Send_msg via NNG formerly never blocked.
172             # However under SI95 this send may block for some arbitrary period of time on the first send to an endpoint for which a connection is not established
173             # If this send takes too long, this loop blocks, and the healthcheck will fail, which will cause A1s healthcheck to fail, which will cause Kubernetes to whack A1 and all kinds of horrible things happen.
174             # Therefore, now under SI95, we thread this.
175             Thread(target=self._handle_sends).start()
176
177             # read our mailbox
178             for (msg, sbuf) in self.rcv_func():
179                 # TODO: in the future we may also have to catch SDL errors
180                 try:
181                     mtype = msg[rmr.RMR_MS_MSG_TYPE]
182                 except (KeyError, TypeError, json.decoder.JSONDecodeError):
183                     mdc_logger.warning("Dropping malformed message: {0}".format(msg))
184
185                 if mtype == A1_POLICY_RESPONSE:
186                     try:
187                         # got a policy response, update status
188                         pay = json.loads(msg[rmr.RMR_MS_PAYLOAD])
189                         data.set_policy_instance_status(
190                             pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
191                         )
192                         mdc_logger.debug("Successfully received status update: {0}".format(pay))
193                     except (PolicyTypeNotFound, PolicyInstanceNotFound):
194                         mdc_logger.warning("Received a response for a non-existent type/instance: {0}".format(msg))
195                     except (KeyError, TypeError, json.decoder.JSONDecodeError):
196                         mdc_logger.warning("Dropping malformed policy response: {0}".format(msg))
197
198                 elif mtype == A1_POLICY_QUERY:
199                     try:
200                         # got a query, do a lookup and send out all instances
201                         pti = json.loads(msg[rmr.RMR_MS_PAYLOAD])["policy_type_id"]
202                         instance_list = data.get_instance_list(pti)  # will raise if a bad type
203                         mdc_logger.debug("Received a query for a known policy type: {0}".format(msg))
204                         for pii in instance_list:
205                             instance = data.get_policy_instance(pti, pii)
206                             payload = json.dumps(messages.a1_to_handler("CREATE", pti, pii, instance)).encode("utf-8")
207                             sbuf = self._rts_msg(payload, sbuf, A1_POLICY_REQUEST)
208                     except (PolicyTypeNotFound):
209                         mdc_logger.warning("Received a policy query for a non-existent type: {0}".format(msg))
210                     except (KeyError, TypeError, json.decoder.JSONDecodeError):
211                         mdc_logger.warning("Dropping malformed policy query: {0}".format(msg))
212
213                 else:
214                     mdc_logger.warning("Received message type {0} but A1 does not handle this".format(mtype))
215
216                 # we must free each sbuf
217                 rmr.rmr_free_msg(sbuf)
218
219             self.last_ran = time.time()
220             time.sleep(1)
221
222         mdc_logger.debug("RMR Thread Ending!")
223
224
225 # Public
226
227
228 def start_rmr_thread(init_func_override=None, rcv_func_override=None):
229     """
230     Start a1s rmr thread
231
232     Parameters
233     ----------
234     init_func_override: function (optional)
235         Function that initializes RMR and answers an RMR context.
236         Supply an empty function to skip initializing RMR.
237
238     rcv_func_override: function (optional)
239         Function that receives messages from RMR and answers a list.
240         Supply a trivial function to skip reading from RMR.
241     """
242     global __RMR_LOOP__
243     if __RMR_LOOP__ is None:
244         __RMR_LOOP__ = _RmrLoop(init_func_override, rcv_func_override)
245
246
247 def stop_rmr_thread():
248     """
249     stops the rmr thread
250     """
251     __RMR_LOOP__.keep_going = False
252
253
254 def queue_instance_send(item):
255     """
256     push an item into the work queue
257     currently the only type of work is to send out messages
258     """
259     __RMR_LOOP__.instance_send_queue.put(item)
260
261
262 def healthcheck_rmr_thread(seconds=30):
263     """
264     returns a boolean representing whether the rmr loop is healthy, by checking two attributes:
265     1. is it running?,
266     2. is it stuck in a long (> seconds) loop?
267     """
268     return __RMR_LOOP__.thread.is_alive() and ((time.time() - __RMR_LOOP__.last_ran) < seconds)
269
270
271 def replace_rcv_func(rcv_func):
272     """purely for the ease of unit testing to test different rcv scenarios"""
273     __RMR_LOOP__.rcv_func = rcv_func