Extend Dockerfile and improve documentation
[ric-plt/a1.git] / a1 / a1rmr.py
1 # ==================================================================================
2 #       Copyright (c) 2019-2020 Nokia
3 #       Copyright (c) 2018-2020 AT&T Intellectual Property.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 # ==================================================================================
17 """
18 A1 RMR functionality
19 """
20 import os
21 import queue
22 import time
23 import json
24 from threading import Thread
25 from ricxappframe.rmr import rmr, helpers
26 from mdclogpy import Logger
27 from a1 import data, messages
28 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
29
30 mdc_logger = Logger(name=__name__)
31
32
33 RETRY_TIMES = int(os.environ.get("A1_RMR_RETRY_TIMES", 4))
34 A1_POLICY_REQUEST = 20010
35 A1_POLICY_RESPONSE = 20011
36 A1_POLICY_QUERY = 20012
37
38
39 # Note; yes, globals are bad, but this is a private (to this module) global
40 # No other module can import/access this (well, python doesn't enforce this, but all linters will complain)
41 __RMR_LOOP__ = None
42
43
44 class _RmrLoop:
45     """
46     Class represents an rmr loop that constantly reads from rmr and performs operations
47     based on waiting messages.  This launches a thread, it should probably only be called
48     once; the public facing method to access these ensures this.
49
50     TODO: the xapp frame has a version of this looping structure. See if A1 can switch to that.
51     """
52
53     def __init__(self, init_func_override=None, rcv_func_override=None):
54         """
55         Init
56
57         Parameters
58         ----------
59         init_func_override: function (optional)
60             Function that initializes RMR and answers an RMR context.
61             Supply an empty function to skip initializing RMR.
62
63         rcv_func_override: function (optional)
64             Function that receives messages from RMR and answers a list.
65             Supply a trivial function to skip reading from RMR.
66         """
67         self.keep_going = True
68         self.rcv_func = None
69         self.last_ran = time.time()
70
71         # see docs/overview#resiliency for a discussion of this
72         self.instance_send_queue = queue.Queue()  # thread safe queue https://docs.python.org/3/library/queue.html
73
74         # intialize rmr context
75         if init_func_override:
76             self.mrc = init_func_override()
77         else:
78             mdc_logger.debug("Waiting for rmr to initialize..")
79             # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
80             # internal ring of messages, and receive calls read from that
81             # currently the size is 2048 messages, so this is fine for the foreseeable future
82             self.mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
83             while rmr.rmr_ready(self.mrc) == 0:
84                 time.sleep(0.5)
85
86         # set the receive function
87         self.rcv_func = (
88             rcv_func_override
89             if rcv_func_override
90             else lambda: helpers.rmr_rcvall_msgs_raw(self.mrc, [A1_POLICY_RESPONSE, A1_POLICY_QUERY])
91         )
92
93         # start the work loop
94         self.thread = Thread(target=self.loop)
95         self.thread.start()
96
97     def _assert_good_send(self, sbuf, pre_send_summary):
98         """
99         common helper function for _send_msg and _rts_msg
100         """
101         post_send_summary = rmr.message_summary(sbuf)
102         if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
103             return True
104         mdc_logger.debug("Message NOT sent!")
105         mdc_logger.debug("Pre-send summary: {0}, Post-send summary: {1}".format(pre_send_summary, post_send_summary))
106         return False
107
108     def _send_msg(self, pay, mtype, subid):
109         """
110         sends a msg
111         """
112         for _ in range(0, RETRY_TIMES):
113             sbuf = rmr.rmr_alloc_msg(self.mrc, len(pay), payload=pay, gen_transaction_id=True, mtype=mtype, sub_id=subid)
114             sbuf.contents.sub_id = subid
115             pre_send_summary = rmr.message_summary(sbuf)
116             mdc_logger.debug("Trying to send message: {}".format(pre_send_summary))
117             sbuf = rmr.rmr_send_msg(self.mrc, sbuf)  # send
118             if self._assert_good_send(sbuf, pre_send_summary):
119                 rmr.rmr_free_msg(sbuf)  # free
120                 return
121
122         mdc_logger.debug("A1 did NOT send the message successfully after {} retries!".format(RETRY_TIMES))
123
124     def _rts_msg(self, pay, sbuf_rts, mtype):
125         """
126         sends a message using rts
127         we do not call free here because we may rts many times; it is called after the rts loop
128         """
129         for _ in range(0, RETRY_TIMES):
130             pre_send_summary = rmr.message_summary(sbuf_rts)
131             sbuf_rts = rmr.rmr_rts_msg(self.mrc, sbuf_rts, payload=pay, mtype=mtype)
132             if self._assert_good_send(sbuf_rts, pre_send_summary):
133                 break
134         return sbuf_rts  # in some cases rts may return a new sbuf
135
136     def _handle_sends(self):
137         # send out all messages waiting for us
138         while not self.instance_send_queue.empty():
139             work_item = self.instance_send_queue.get(block=False, timeout=None)
140             payload = json.dumps(messages.a1_to_handler(*work_item)).encode("utf-8")
141             self._send_msg(payload, A1_POLICY_REQUEST, work_item[1])
142
143     def loop(self):
144         """
145         This loop runs forever, and has 3 jobs:
146         - send out any messages that have to go out (create instance, delete instance)
147         - read a1s mailbox and update the status of all instances based on acks from downstream policy handlers
148         - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
149         """
150         # loop forever
151         mdc_logger.debug("Work loop starting")
152         while self.keep_going:
153
154             # Update 3/20/2020
155             # We now handle our sends in a thread (that will just exit when it's done) because there is a difference between how send works in SI95 vs NNG.
156             # Send_msg via NNG formerly never blocked.
157             # However under SI95 this send may block for some arbitrary period of time on the first send to an endpoint for which a connection is not established
158             # If this send takes too long, this loop blocks, and the healthcheck will fail, which will cause A1s healthcheck to fail, which will cause Kubernetes to whack A1 and all kinds of horrible things happen.
159             # Therefore, now under SI95, we thread this.
160             Thread(target=self._handle_sends).start()
161
162             # read our mailbox
163             for (msg, sbuf) in self.rcv_func():
164                 # TODO: in the future we may also have to catch SDL errors
165                 try:
166                     mtype = msg["message type"]
167                 except (KeyError, TypeError, json.decoder.JSONDecodeError):
168                     mdc_logger.debug("Dropping malformed policy ack/query message: {0}".format(msg))
169
170                 if mtype == A1_POLICY_RESPONSE:
171                     try:
172                         # got a policy response, update status
173                         pay = json.loads(msg["payload"])
174                         data.set_policy_instance_status(
175                             pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
176                         )
177                         mdc_logger.debug("Successfully received status update: {0}".format(pay))
178                     except (PolicyTypeNotFound, PolicyInstanceNotFound):
179                         mdc_logger.debug("Received a response  for a non-existent instance")
180                     except (KeyError, TypeError, json.decoder.JSONDecodeError):
181                         mdc_logger.debug("Dropping malformed policy ack message: {0}".format(msg))
182
183                 elif mtype == A1_POLICY_QUERY:
184                     try:
185                         # got a query, do a lookup and send out all instances
186                         pti = json.loads(msg["payload"])["policy_type_id"]
187                         instance_list = data.get_instance_list(pti)  # will raise if a bad type
188                         mdc_logger.debug("Received a query for a good type: {0}".format(msg))
189                         for pii in instance_list:
190                             instance = data.get_policy_instance(pti, pii)
191                             payload = json.dumps(messages.a1_to_handler("CREATE", pti, pii, instance)).encode("utf-8")
192                             sbuf = self._rts_msg(payload, sbuf, A1_POLICY_REQUEST)
193                     except (PolicyTypeNotFound):
194                         mdc_logger.debug("Received a query for a non-existent type: {0}".format(msg))
195                     except (KeyError, TypeError, json.decoder.JSONDecodeError):
196                         mdc_logger.debug("Dropping malformed policy query message: {0}".format(msg))
197
198                 else:
199                     mdc_logger.debug("Received message type {0} but A1 does not handle this".format(mtype))
200
201                 # we must free each sbuf
202                 rmr.rmr_free_msg(sbuf)
203
204             self.last_ran = time.time()
205             time.sleep(1)
206
207         mdc_logger.debug("RMR Thread Ending!")
208
209
210 # Public
211
212
213 def start_rmr_thread(init_func_override=None, rcv_func_override=None):
214     """
215     Start a1s rmr thread
216
217     Parameters
218     ----------
219     init_func_override: function (optional)
220         Function that initializes RMR and answers an RMR context.
221         Supply an empty function to skip initializing RMR.
222
223     rcv_func_override: function (optional)
224         Function that receives messages from RMR and answers a list.
225         Supply a trivial function to skip reading from RMR.
226     """
227     global __RMR_LOOP__
228     if __RMR_LOOP__ is None:
229         __RMR_LOOP__ = _RmrLoop(init_func_override, rcv_func_override)
230
231
232 def stop_rmr_thread():
233     """
234     stops the rmr thread
235     """
236     __RMR_LOOP__.keep_going = False
237
238
239 def queue_instance_send(item):
240     """
241     push an item into the work queue
242     currently the only type of work is to send out messages
243     """
244     __RMR_LOOP__.instance_send_queue.put(item)
245
246
247 def healthcheck_rmr_thread(seconds=30):
248     """
249     returns a boolean representing whether the rmr loop is healthy, by checking two attributes:
250     1. is it running?,
251     2. is it stuck in a long (> seconds) loop?
252     """
253     return __RMR_LOOP__.thread.is_alive() and ((time.time() - __RMR_LOOP__.last_ran) < seconds)
254
255
256 def replace_rcv_func(rcv_func):
257     """purely for the ease of unit testing to test different rcv scenarios"""
258     __RMR_LOOP__.rcv_func = rcv_func