Move to ricxappframe rmr, rmr3.6.3
[ric-plt/a1.git] / a1 / a1rmr.py
1 """
2 a1s rmr functionality
3 """
4 # ==================================================================================
5 #       Copyright (c) 2019-2020 Nokia
6 #       Copyright (c) 2018-2020 AT&T Intellectual Property.
7 #
8 #   Licensed under the Apache License, Version 2.0 (the "License");
9 #   you may not use this file except in compliance with the License.
10 #   You may obtain a copy of the License at
11 #
12 #          http://www.apache.org/licenses/LICENSE-2.0
13 #
14 #   Unless required by applicable law or agreed to in writing, software
15 #   distributed under the License is distributed on an "AS IS" BASIS,
16 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 #   See the License for the specific language governing permissions and
18 #   limitations under the License.
19 # ==================================================================================
20 import os
21 import queue
22 import time
23 import json
24 from threading import Thread
25 from ricxappframe.rmr import rmr, helpers
26 from mdclogpy import Logger
27 from a1 import data, messages
28 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
29
30 mdc_logger = Logger(name=__name__)
31
32
33 RETRY_TIMES = int(os.environ.get("A1_RMR_RETRY_TIMES", 4))
34 A1_POLICY_REQUEST = 20010
35 A1_POLICY_RESPONSE = 20011
36 A1_POLICY_QUERY = 20012
37
38
39 # Note; yes, globals are bad, but this is a private (to this module) global
40 # No other module can import/access this (well, python doesn't enforce this, but all linters will complain)
41 __RMR_LOOP__ = None
42
43
44 class _RmrLoop:
45     """
46     class represents an rmr loop that constantly reads from rmr and performs operations based on waiting messages
47     this launches a thread, it should probably only be called once; the public facing method to access these ensures this
48
49     TODO: the xapp frame has a version of this looping structure. See if A1 can switch to that.
50     """
51
52     def __init__(self, init_func_override=None, rcv_func_override=None):
53         self.keep_going = True
54         self.rcv_func = None
55         self.last_ran = time.time()
56
57         # see docs/overview#resiliency for a discussion of this
58         self.instance_send_queue = queue.Queue()  # thread safe queue https://docs.python.org/3/library/queue.html
59
60         # intialize rmr context
61         if init_func_override:
62             self.mrc = init_func_override()
63         else:
64             mdc_logger.debug("Waiting for rmr to initialize..")
65             # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
66             # internal ring of messages, and receive calls read from that
67             # currently the size is 2048 messages, so this is fine for the foreseeable future
68             self.mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
69             while rmr.rmr_ready(self.mrc) == 0:
70                 time.sleep(0.5)
71
72         # set the receive function
73         self.rcv_func = (
74             rcv_func_override
75             if rcv_func_override
76             else lambda: helpers.rmr_rcvall_msgs_raw(self.mrc, [A1_POLICY_RESPONSE, A1_POLICY_QUERY])
77         )
78
79         # start the work loop
80         self.thread = Thread(target=self.loop)
81         self.thread.start()
82
83     def _assert_good_send(self, sbuf, pre_send_summary):
84         """
85         common helper function for _send_msg and _rts_msg
86         """
87         post_send_summary = rmr.message_summary(sbuf)
88         if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
89             return True
90         mdc_logger.debug("Message NOT sent!")
91         mdc_logger.debug("Pre-send summary: {0}, Post-send summary: {1}".format(pre_send_summary, post_send_summary))
92         return False
93
94     def _send_msg(self, pay, mtype, subid):
95         """
96         sends a msg
97         """
98         for _ in range(0, RETRY_TIMES):
99             sbuf = rmr.rmr_alloc_msg(self.mrc, len(pay), payload=pay, gen_transaction_id=True, mtype=mtype, sub_id=subid)
100             sbuf.contents.sub_id = subid
101             pre_send_summary = rmr.message_summary(sbuf)
102             mdc_logger.debug("Trying to send message: {}".format(pre_send_summary))
103             sbuf = rmr.rmr_send_msg(self.mrc, sbuf)  # send
104             if self._assert_good_send(sbuf, pre_send_summary):
105                 rmr.rmr_free_msg(sbuf)  # free
106                 return
107
108         mdc_logger.debug("A1 did NOT send the message successfully after {} retries!".format(RETRY_TIMES))
109
110     def _rts_msg(self, pay, sbuf_rts, mtype):
111         """
112         sends a message using rts
113         we do not call free here because we may rts many times; it is called after the rts loop
114         """
115         for _ in range(0, RETRY_TIMES):
116             pre_send_summary = rmr.message_summary(sbuf_rts)
117             sbuf_rts = rmr.rmr_rts_msg(self.mrc, sbuf_rts, payload=pay, mtype=mtype)
118             if self._assert_good_send(sbuf_rts, pre_send_summary):
119                 break
120         return sbuf_rts  # in some cases rts may return a new sbuf
121
122     def _handle_sends(self):
123         # send out all messages waiting for us
124         while not self.instance_send_queue.empty():
125             work_item = self.instance_send_queue.get(block=False, timeout=None)
126             payload = json.dumps(messages.a1_to_handler(*work_item)).encode("utf-8")
127             self._send_msg(payload, A1_POLICY_REQUEST, work_item[1])
128
129     def loop(self):
130         """
131         This loop runs forever, and has 3 jobs:
132         - send out any messages that have to go out (create instance, delete instance)
133         - read a1s mailbox and update the status of all instances based on acks from downstream policy handlers
134         - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
135         """
136         # loop forever
137         mdc_logger.debug("Work loop starting")
138         while self.keep_going:
139
140             # Update 3/20/2020
141             # We now handle our sends in a thread (that will just exit when it's done) because there is a difference between how send works in SI95 vs NNG.
142             # Send_msg via NNG formerly never blocked.
143             # However under SI95 this send may block for some arbitrary period of time on the first send to an endpoint for which a connection is not established
144             # If this send takes too long, this loop blocks, and the healthcheck will fail, which will cause A1s healthcheck to fail, which will cause Kubernetes to whack A1 and all kinds of horrible things happen.
145             # Therefore, now under SI95, we thread this.
146             Thread(target=self._handle_sends).start()
147
148             # read our mailbox
149             for (msg, sbuf) in self.rcv_func():
150                 # TODO: in the future we may also have to catch SDL errors
151                 try:
152                     mtype = msg["message type"]
153                 except (KeyError, TypeError, json.decoder.JSONDecodeError):
154                     mdc_logger.debug("Dropping malformed policy ack/query message: {0}".format(msg))
155
156                 if mtype == A1_POLICY_RESPONSE:
157                     try:
158                         # got a policy response, update status
159                         pay = json.loads(msg["payload"])
160                         data.set_policy_instance_status(
161                             pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
162                         )
163                         mdc_logger.debug("Successfully received status update: {0}".format(pay))
164                     except (PolicyTypeNotFound, PolicyInstanceNotFound):
165                         mdc_logger.debug("Received a response  for a non-existent instance")
166                     except (KeyError, TypeError, json.decoder.JSONDecodeError):
167                         mdc_logger.debug("Dropping malformed policy ack message: {0}".format(msg))
168
169                 elif mtype == A1_POLICY_QUERY:
170                     try:
171                         # got a query, do a lookup and send out all instances
172                         pti = json.loads(msg["payload"])["policy_type_id"]
173                         instance_list = data.get_instance_list(pti)  # will raise if a bad type
174                         mdc_logger.debug("Received a query for a good type: {0}".format(msg))
175                         for pii in instance_list:
176                             instance = data.get_policy_instance(pti, pii)
177                             payload = json.dumps(messages.a1_to_handler("CREATE", pti, pii, instance)).encode("utf-8")
178                             sbuf = self._rts_msg(payload, sbuf, A1_POLICY_REQUEST)
179                     except (PolicyTypeNotFound):
180                         mdc_logger.debug("Received a query for a non-existent type: {0}".format(msg))
181                     except (KeyError, TypeError, json.decoder.JSONDecodeError):
182                         mdc_logger.debug("Dropping malformed policy query message: {0}".format(msg))
183
184                 else:
185                     mdc_logger.debug("Received message type {0} but A1 does not handle this".format(mtype))
186
187                 # we must free each sbuf
188                 rmr.rmr_free_msg(sbuf)
189
190             self.last_ran = time.time()
191             time.sleep(1)
192
193         mdc_logger.debug("RMR Thread Ending!")
194
195
196 # Public
197
198
199 def start_rmr_thread(init_func_override=None, rcv_func_override=None):
200     """
201     Start a1s rmr thread
202     """
203     global __RMR_LOOP__
204     if __RMR_LOOP__ is None:
205         __RMR_LOOP__ = _RmrLoop(init_func_override, rcv_func_override)
206
207
208 def stop_rmr_thread():
209     """
210     stops the rmr thread
211     """
212     __RMR_LOOP__.keep_going = False
213
214
215 def queue_instance_send(item):
216     """
217     push an item into the work queue
218     currently the only type of work is to send out messages
219     """
220     __RMR_LOOP__.instance_send_queue.put(item)
221
222
223 def healthcheck_rmr_thread(seconds=30):
224     """
225     returns a boolean representing whether the rmr loop is healthy, by checking two attributes:
226     1. is it running?,
227     2. is it stuck in a long (> seconds) loop?
228     """
229     return __RMR_LOOP__.thread.is_alive() and ((time.time() - __RMR_LOOP__.last_ran) < seconds)
230
231
232 def replace_rcv_func(rcv_func):
233     """purely for the ease of unit testing to test different rcv scenarios"""
234     __RMR_LOOP__.rcv_func = rcv_func