f623024148a8c5ac7e382a24c5e63be6a31f3d22
[ric-plt/a1.git] / a1 / a1rmr.py
1 # ==================================================================================
2 #       Copyright (c) 2019-2020 Nokia
3 #       Copyright (c) 2018-2020 AT&T Intellectual Property.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 # ==================================================================================
17 """
18 A1 RMR functionality
19 """
20 import os
21 import queue
22 import time
23 import json
24 from threading import Thread
25 from ricxappframe.rmr import rmr, helpers
26 from mdclogpy import Logger
27 from a1 import data, messages
28 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
29
30 mdc_logger = Logger(name=__name__)
31
32
33 # With Nanomsg and NNG it was possible for a send attempt to have a "soft"
34 # failure which did warrant some retries if the status of the send is RMR_ERR_RETRY.
35 # Because of the way NNG worked, it sometimes required many tens of retries,
36 # and a retry state happened often for even moderately "verbose" applications.
37 # With SI95 there is still a possibility that a retry is necessary, but it is very rare.
38 RETRY_TIMES = int(os.environ.get("A1_RMR_RETRY_TIMES", 4))
39 A1_POLICY_REQUEST = 20010
40 A1_POLICY_RESPONSE = 20011
41 A1_POLICY_QUERY = 20012
42
43
44 # Note; yes, globals are bad, but this is a private (to this module) global
45 # No other module can import/access this (well, python doesn't enforce this, but all linters will complain)
46 __RMR_LOOP__ = None
47
48
49 class _RmrLoop:
50     """
51     Class represents an rmr loop that constantly reads from rmr and performs operations
52     based on waiting messages.  This launches a thread, it should probably only be called
53     once; the public facing method to access these ensures this.
54
55     TODO: the xapp frame has a version of this looping structure. See if A1 can switch to that.
56     """
57
58     def __init__(self, init_func_override=None, rcv_func_override=None):
59         """
60         Init
61
62         Parameters
63         ----------
64         init_func_override: function (optional)
65             Function that initializes RMR and answers an RMR context.
66             Supply an empty function to skip initializing RMR.
67
68         rcv_func_override: function (optional)
69             Function that receives messages from RMR and answers a list.
70             Supply a trivial function to skip reading from RMR.
71         """
72         self.keep_going = True
73         self.rcv_func = None
74         self.last_ran = time.time()
75
76         # see docs/overview#resiliency for a discussion of this
77         self.instance_send_queue = queue.Queue()  # thread safe queue https://docs.python.org/3/library/queue.html
78
79         # intialize rmr context
80         if init_func_override:
81             self.mrc = init_func_override()
82         else:
83             mdc_logger.debug("Waiting for rmr to initialize..")
84             # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread
85             # populates an internal ring of messages, and receive calls read from that.
86             # currently the size is 2048 messages, so this is fine for the foreseeable future
87             self.mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
88             while rmr.rmr_ready(self.mrc) == 0:
89                 time.sleep(0.5)
90
91         # set the receive function
92         self.rcv_func = (
93             rcv_func_override
94             if rcv_func_override
95             else lambda: helpers.rmr_rcvall_msgs_raw(self.mrc, [A1_POLICY_RESPONSE, A1_POLICY_QUERY])
96         )
97
98         # start the work loop
99         self.thread = Thread(target=self.loop)
100         self.thread.start()
101
102     def _assert_good_send(self, sbuf, pre_send_summary):
103         """
104         Extracts the send result and logs a detailed warning if the send failed.
105         Returns the message state, an integer that indicates the result.
106         """
107         post_send_summary = rmr.message_summary(sbuf)
108         if post_send_summary[rmr.RMR_MS_MSG_STATE] != rmr.RMR_OK:
109             mdc_logger.warning("RMR send failed; pre-send summary: {0}, post-send summary: {1}".format(pre_send_summary, post_send_summary))
110         return post_send_summary[rmr.RMR_MS_MSG_STATE]
111
112     def _send_msg(self, pay, mtype, subid):
113         """
114         Creates and sends a message via RMR's send-message feature with the specified payload
115         using the specified message type and subscription ID.
116         """
117         sbuf = rmr.rmr_alloc_msg(self.mrc, len(pay), payload=pay, gen_transaction_id=True, mtype=mtype, sub_id=subid)
118         sbuf.contents.sub_id = subid
119         pre_send_summary = rmr.message_summary(sbuf)
120         for _ in range(0, RETRY_TIMES):
121             mdc_logger.debug("_send_msg: sending: {}".format(pre_send_summary))
122             sbuf = rmr.rmr_send_msg(self.mrc, sbuf)
123             msg_state = self._assert_good_send(sbuf, pre_send_summary)
124             if msg_state != rmr.RMR_ERR_RETRY:
125                 break
126
127         rmr.rmr_free_msg(sbuf)
128         if msg_state != rmr.RMR_OK:
129             mdc_logger.warning("_send_msg: failed after {} retries".format(RETRY_TIMES))
130
131     def _rts_msg(self, pay, sbuf_rts, mtype):
132         """
133         Sends a message via RMR's return-to-sender feature.
134         This neither allocates nor frees a message buffer because we may rts many times.
135         Returns the message buffer from the RTS function, which may reallocate it.
136         """
137         pre_send_summary = rmr.message_summary(sbuf_rts)
138         for _ in range(0, RETRY_TIMES):
139             mdc_logger.debug("_rts_msg: sending: {}".format(pre_send_summary))
140             sbuf_rts = rmr.rmr_rts_msg(self.mrc, sbuf_rts, payload=pay, mtype=mtype)
141             msg_state = self._assert_good_send(sbuf_rts, pre_send_summary)
142             if msg_state != rmr.RMR_ERR_RETRY:
143                 break
144
145         if msg_state != rmr.RMR_OK:
146             mdc_logger.warning("_rts_msg: failed after {} retries".format(RETRY_TIMES))
147         return sbuf_rts  # in some cases rts may return a new sbuf
148
149     def _handle_sends(self):
150         # send out all messages waiting for us
151         while not self.instance_send_queue.empty():
152             work_item = self.instance_send_queue.get(block=False, timeout=None)
153             payload = json.dumps(messages.a1_to_handler(*work_item)).encode("utf-8")
154             self._send_msg(payload, A1_POLICY_REQUEST, work_item[1])
155
156     def loop(self):
157         """
158         This loop runs forever, and has 3 jobs:
159         - send out any messages that have to go out (create instance, delete instance)
160         - read a1s mailbox and update the status of all instances based on acks from downstream policy handlers
161         - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
162         """
163         # loop forever
164         mdc_logger.debug("Work loop starting")
165         while self.keep_going:
166
167             # Update 3/20/2020
168             # We now handle our sends in a thread (that will just exit when it's done) because there is a difference between how send works in SI95 vs NNG.
169             # Send_msg via NNG formerly never blocked.
170             # However under SI95 this send may block for some arbitrary period of time on the first send to an endpoint for which a connection is not established
171             # If this send takes too long, this loop blocks, and the healthcheck will fail, which will cause A1s healthcheck to fail, which will cause Kubernetes to whack A1 and all kinds of horrible things happen.
172             # Therefore, now under SI95, we thread this.
173             Thread(target=self._handle_sends).start()
174
175             # read our mailbox
176             for (msg, sbuf) in self.rcv_func():
177                 # TODO: in the future we may also have to catch SDL errors
178                 try:
179                     mtype = msg[rmr.RMR_MS_MSG_TYPE]
180                 except (KeyError, TypeError, json.decoder.JSONDecodeError):
181                     mdc_logger.warning("Dropping malformed message: {0}".format(msg))
182
183                 if mtype == A1_POLICY_RESPONSE:
184                     try:
185                         # got a policy response, update status
186                         pay = json.loads(msg[rmr.RMR_MS_PAYLOAD])
187                         data.set_policy_instance_status(
188                             pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
189                         )
190                         mdc_logger.debug("Successfully received status update: {0}".format(pay))
191                     except (PolicyTypeNotFound, PolicyInstanceNotFound):
192                         mdc_logger.warning("Received a response for a non-existent type/instance: {0}".format(msg))
193                     except (KeyError, TypeError, json.decoder.JSONDecodeError):
194                         mdc_logger.warning("Dropping malformed policy response: {0}".format(msg))
195
196                 elif mtype == A1_POLICY_QUERY:
197                     try:
198                         # got a query, do a lookup and send out all instances
199                         pti = json.loads(msg[rmr.RMR_MS_PAYLOAD])["policy_type_id"]
200                         instance_list = data.get_instance_list(pti)  # will raise if a bad type
201                         mdc_logger.debug("Received a query for a known policy type: {0}".format(msg))
202                         for pii in instance_list:
203                             instance = data.get_policy_instance(pti, pii)
204                             payload = json.dumps(messages.a1_to_handler("CREATE", pti, pii, instance)).encode("utf-8")
205                             sbuf = self._rts_msg(payload, sbuf, A1_POLICY_REQUEST)
206                     except (PolicyTypeNotFound):
207                         mdc_logger.warning("Received a policy query for a non-existent type: {0}".format(msg))
208                     except (KeyError, TypeError, json.decoder.JSONDecodeError):
209                         mdc_logger.warning("Dropping malformed policy query: {0}".format(msg))
210
211                 else:
212                     mdc_logger.warning("Received message type {0} but A1 does not handle this".format(mtype))
213
214                 # we must free each sbuf
215                 rmr.rmr_free_msg(sbuf)
216
217             self.last_ran = time.time()
218             time.sleep(1)
219
220         mdc_logger.debug("RMR Thread Ending!")
221
222
223 # Public
224
225
226 def start_rmr_thread(init_func_override=None, rcv_func_override=None):
227     """
228     Start a1s rmr thread
229
230     Parameters
231     ----------
232     init_func_override: function (optional)
233         Function that initializes RMR and answers an RMR context.
234         Supply an empty function to skip initializing RMR.
235
236     rcv_func_override: function (optional)
237         Function that receives messages from RMR and answers a list.
238         Supply a trivial function to skip reading from RMR.
239     """
240     global __RMR_LOOP__
241     if __RMR_LOOP__ is None:
242         __RMR_LOOP__ = _RmrLoop(init_func_override, rcv_func_override)
243
244
245 def stop_rmr_thread():
246     """
247     stops the rmr thread
248     """
249     __RMR_LOOP__.keep_going = False
250
251
252 def queue_instance_send(item):
253     """
254     push an item into the work queue
255     currently the only type of work is to send out messages
256     """
257     __RMR_LOOP__.instance_send_queue.put(item)
258
259
260 def healthcheck_rmr_thread(seconds=30):
261     """
262     returns a boolean representing whether the rmr loop is healthy, by checking two attributes:
263     1. is it running?,
264     2. is it stuck in a long (> seconds) loop?
265     """
266     return __RMR_LOOP__.thread.is_alive() and ((time.time() - __RMR_LOOP__.last_ran) < seconds)
267
268
269 def replace_rcv_func(rcv_func):
270     """purely for the ease of unit testing to test different rcv scenarios"""
271     __RMR_LOOP__.rcv_func = rcv_func