58ec1c06958ac80460805059b904863f54e45321
[ric-plt/a1.git] / a1 / a1rmr.py
1 """
2 a1s rmr functionality
3 """
4 # ==================================================================================
5 #       Copyright (c) 2019-2020 Nokia
6 #       Copyright (c) 2018-2020 AT&T Intellectual Property.
7 #
8 #   Licensed under the Apache License, Version 2.0 (the "License");
9 #   you may not use this file except in compliance with the License.
10 #   You may obtain a copy of the License at
11 #
12 #          http://www.apache.org/licenses/LICENSE-2.0
13 #
14 #   Unless required by applicable law or agreed to in writing, software
15 #   distributed under the License is distributed on an "AS IS" BASIS,
16 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 #   See the License for the specific language governing permissions and
18 #   limitations under the License.
19 # ==================================================================================
20 import os
21 import queue
22 import time
23 import json
24 from threading import Thread
25 from rmr import rmr, helpers
26 from mdclogpy import Logger
27 from a1 import data, messages
28 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
29
30 mdc_logger = Logger(name=__name__)
31
32
33 RETRY_TIMES = int(os.environ.get("A1_RMR_RETRY_TIMES", 4))
34 A1_POLICY_REQUEST = 20010
35 A1_POLICY_RESPONSE = 20011
36 A1_POLICY_QUERY = 20012
37
38
39 # Note; yes, globals are bad, but this is a private (to this module) global
40 # No other module can import/access this (well, python doesn't enforce this, but all linters will complain)
41 __RMR_LOOP__ = None
42
43
44 class _RmrLoop:
45     """
46     class represents an rmr loop that constantly reads from rmr and performs operations based on waiting messages
47     this launches a thread, it should probably only be called once; the public facing method to access these ensures this
48     """
49
50     def __init__(self, init_func_override=None, rcv_func_override=None):
51         self.keep_going = True
52         self.rcv_func = None
53         self.last_ran = time.time()
54
55         # see docs/overview#resiliency for a discussion of this
56         self.instance_send_queue = queue.Queue()  # thread safe queue https://docs.python.org/3/library/queue.html
57
58         # intialize rmr context
59         if init_func_override:
60             self.mrc = init_func_override()
61         else:
62             mdc_logger.debug("Waiting for rmr to initialize..")
63             # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
64             # internal ring of messages, and receive calls read from that
65             # currently the size is 2048 messages, so this is fine for the foreseeable future
66             self.mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
67             while rmr.rmr_ready(self.mrc) == 0:
68                 time.sleep(0.5)
69
70         # set the receive function
71         self.rcv_func = (
72             rcv_func_override
73             if rcv_func_override
74             else lambda: helpers.rmr_rcvall_msgs_raw(self.mrc, [A1_POLICY_RESPONSE, A1_POLICY_QUERY])
75         )
76
77         # start the work loop
78         self.thread = Thread(target=self.loop)
79         self.thread.start()
80
81     def _assert_good_send(self, sbuf, pre_send_summary):
82         """
83         common helper function for _send_msg and _rts_msg
84         """
85         post_send_summary = rmr.message_summary(sbuf)
86         if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
87             return True
88         mdc_logger.debug("Message NOT sent!")
89         mdc_logger.debug("Pre-send summary: {0}, Post-send summary: {1}".format(pre_send_summary, post_send_summary))
90         return False
91
92     def _send_msg(self, pay, mtype, subid):
93         """
94         sends a msg
95         """
96         for _ in range(0, RETRY_TIMES):
97             sbuf = rmr.rmr_alloc_msg(self.mrc, len(pay), payload=pay, gen_transaction_id=True, mtype=mtype, sub_id=subid)
98             sbuf.contents.sub_id = subid
99             pre_send_summary = rmr.message_summary(sbuf)
100             sbuf = rmr.rmr_send_msg(self.mrc, sbuf)  # send
101             if self._assert_good_send(sbuf, pre_send_summary):
102                 rmr.rmr_free_msg(sbuf)  # free
103                 break
104
105     def _rts_msg(self, pay, sbuf_rts, mtype):
106         """
107         sends a message using rts
108         we do not call free here because we may rts many times; it is called after the rts loop
109         """
110         for _ in range(0, RETRY_TIMES):
111             pre_send_summary = rmr.message_summary(sbuf_rts)
112             sbuf_rts = rmr.rmr_rts_msg(self.mrc, sbuf_rts, payload=pay, mtype=mtype)
113             if self._assert_good_send(sbuf_rts, pre_send_summary):
114                 break
115         return sbuf_rts  # in some cases rts may return a new sbuf
116
117     def loop(self):
118         """
119         This loop runs forever, and has 3 jobs:
120         - send out any messages that have to go out (create instance, delete instance)
121         - read a1s mailbox and update the status of all instances based on acks from downstream policy handlers
122         - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
123         """
124         # loop forever
125         mdc_logger.debug("Work loop starting")
126         while self.keep_going:
127
128             # send out all messages waiting for us
129             while not self.instance_send_queue.empty():
130                 work_item = self.instance_send_queue.get(block=False, timeout=None)
131                 payload = json.dumps(messages.a1_to_handler(*work_item)).encode("utf-8")
132                 self._send_msg(payload, A1_POLICY_REQUEST, work_item[1])
133
134             # read our mailbox
135             for (msg, sbuf) in self.rcv_func():
136                 # TODO: in the future we may also have to catch SDL errors
137                 try:
138                     mtype = msg["message type"]
139                 except (KeyError, TypeError, json.decoder.JSONDecodeError):
140                     mdc_logger.debug("Dropping malformed policy ack/query message: {0}".format(msg))
141
142                 if mtype == A1_POLICY_RESPONSE:
143                     try:
144                         # got a policy response, update status
145                         pay = json.loads(msg["payload"])
146                         data.set_policy_instance_status(
147                             pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
148                         )
149                         mdc_logger.debug("Successfully received status update: {0}".format(pay))
150                     except (PolicyTypeNotFound, PolicyInstanceNotFound):
151                         mdc_logger.debug("Received a response  for a non-existent instance")
152                     except (KeyError, TypeError, json.decoder.JSONDecodeError):
153                         mdc_logger.debug("Dropping malformed policy ack message: {0}".format(msg))
154
155                 elif mtype == A1_POLICY_QUERY:
156                     try:
157                         # got a query, do a lookup and send out all instances
158                         pti = json.loads(msg["payload"])["policy_type_id"]
159                         mdc_logger.debug("Received query for: {0}".format(pti))
160                         for pii in data.get_instance_list(pti):
161                             instance = data.get_policy_instance(pti, pii)
162                             payload = json.dumps(messages.a1_to_handler("CREATE", pti, pii, instance)).encode("utf-8")
163                             sbuf = self._rts_msg(payload, sbuf, A1_POLICY_REQUEST)
164                     except (PolicyTypeNotFound, PolicyInstanceNotFound):
165                         mdc_logger.debug("Received a query for a non-existent type: {0}".format(msg))
166                     except (KeyError, TypeError, json.decoder.JSONDecodeError):
167                         mdc_logger.debug("Dropping malformed policy query message: {0}".format(msg))
168
169                 else:
170                     mdc_logger.debug("Received message type {0} but A1 does not handle this".format(mtype))
171
172                 # we must free each sbuf
173                 rmr.rmr_free_msg(sbuf)
174
175             self.last_ran = time.time()
176             time.sleep(1)
177
178
179 # Public
180
181
182 def start_rmr_thread(init_func_override=None, rcv_func_override=None):
183     """
184     Start a1s rmr thread
185     """
186     global __RMR_LOOP__
187     if __RMR_LOOP__ is None:
188         __RMR_LOOP__ = _RmrLoop(init_func_override, rcv_func_override)
189
190
191 def stop_rmr_thread():
192     """
193     stops the rmr thread
194     """
195     __RMR_LOOP__.keep_going = False
196
197
198 def queue_instance_send(item):
199     """
200     push an item into the work queue
201     currently the only type of work is to send out messages
202     """
203     __RMR_LOOP__.instance_send_queue.put(item)
204
205
206 def healthcheck_rmr_thread(seconds=30):
207     """
208     returns a boolean representing whether the rmr loop is healthy, by checking two attributes:
209     1. is it running?,
210     2. is it stuck in a long (> seconds) loop?
211     """
212     return __RMR_LOOP__.thread.is_alive() and ((time.time() - __RMR_LOOP__.last_ran) < seconds)
213
214
215 def replace_rcv_func(rcv_func):
216     """purely for the ease of unit testing to test different rcv scenarios"""
217     __RMR_LOOP__.rcv_func = rcv_func