Moving RMR message reciver into go routine
[ric-plt/a1.git] / a1 / a1rmr.py
1 # ==================================================================================
2 #       Copyright (c) 2019-2020 Nokia
3 #       Copyright (c) 2018-2020 AT&T Intellectual Property.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 # ==================================================================================
17 """
18 A1 RMR functionality
19 """
20 import os
21 import queue
22 import time
23 import json
24 import requests
25 from threading import Thread
26 from ricxappframe.rmr import rmr, helpers
27 from mdclogpy import Logger
28 from a1 import data, messages
29 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
30
31 mdc_logger = Logger()
32 mdc_logger.mdclog_format_init(configmap_monitor=True)
33
34
35 # With Nanomsg and NNG it was possible for a send attempt to have a "soft"
36 # failure which did warrant some retries if the status of the send is RMR_ERR_RETRY.
37 # Because of the way NNG worked, it sometimes required many tens of retries,
38 # and a retry state happened often for even moderately "verbose" applications.
39 # With SI95 there is still a possibility that a retry is necessary, but it is very rare.
40 RETRY_TIMES = int(os.environ.get("A1_RMR_RETRY_TIMES", 4))
41 A1_POLICY_REQUEST = 20010
42 A1_POLICY_RESPONSE = 20011
43 A1_POLICY_QUERY = 20012
44 A1_EI_QUERY_ALL = 20013
45 AI_EI_QUERY_ALL_RESP = 20014
46 A1_EI_CREATE_JOB = 20015
47 A1_EI_CREATE_JOB_RESP = 20016
48 A1_EI_DATA_DELIVERY = 20017
49 ECS_SERVICE_HOST = os.environ.get("ECS_SERVICE_HOST", "http://ecs-service:8083")
50 ESC_EI_TYPE_PATH = ECS_SERVICE_HOST + "/A1-EI/v1/eitypes"
51 ECS_EI_JOB_PATH = ECS_SERVICE_HOST + "/A1-EI/v1/eijobs/"
52
53
54 # Note; yes, globals are bad, but this is a private (to this module) global
55 # No other module can import/access this (well, python doesn't enforce this, but all linters will complain)
56 __RMR_LOOP__ = None
57
58
59 class _RmrLoop:
60     """
61     Class represents an rmr loop that constantly reads from rmr and performs operations
62     based on waiting messages.  This launches a thread, it should probably only be called
63     once; the public facing method to access these ensures this.
64
65     TODO: the xapp frame has a version of this looping structure. See if A1 can switch to that.
66     """
67
68     def __init__(self, init_func_override=None, rcv_func_override=None):
69         """
70         Init
71
72         Parameters
73         ----------
74         init_func_override: function (optional)
75             Function that initializes RMR and answers an RMR context.
76             Supply an empty function to skip initializing RMR.
77
78         rcv_func_override: function (optional)
79             Function that receives messages from RMR and answers a list.
80             Supply a trivial function to skip reading from RMR.
81         """
82         self.keep_going = True
83         self.rcv_func = None
84         self.last_ran = time.time()
85
86         # see docs/overview#resiliency for a discussion of this
87         self.instance_send_queue = queue.Queue()  # thread safe queue https://docs.python.org/3/library/queue.html
88         # queue for data delivery item
89         self.ei_job_result_queue = queue.Queue()
90
91         # intialize rmr context
92         if init_func_override:
93             self.mrc = init_func_override()
94         else:
95             mdc_logger.debug("Waiting for rmr to initialize..")
96             # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread
97             # populates an internal ring of messages, and receive calls read from that.
98             # currently the size is 2048 messages, so this is fine for the foreseeable future
99             self.mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
100             while rmr.rmr_ready(self.mrc) == 0:
101                 time.sleep(0.5)
102
103         # set the receive function
104         self.rcv_func = (
105             rcv_func_override
106             if rcv_func_override
107             else lambda: helpers.rmr_rcvall_msgs_raw(self.mrc, [A1_POLICY_RESPONSE, A1_POLICY_QUERY, A1_EI_QUERY_ALL, A1_EI_CREATE_JOB])
108         )
109
110         # start the work loop
111         self.thread = Thread(target=self.loop)
112         self.thread.start()
113
114     def _assert_good_send(self, sbuf, pre_send_summary):
115         """
116         Extracts the send result and logs a detailed warning if the send failed.
117         Returns the message state, an integer that indicates the result.
118         """
119         post_send_summary = rmr.message_summary(sbuf)
120         if post_send_summary[rmr.RMR_MS_MSG_STATE] != rmr.RMR_OK:
121             mdc_logger.warning("RMR send failed; pre-send summary: {0}, post-send summary: {1}".format(pre_send_summary, post_send_summary))
122         return post_send_summary[rmr.RMR_MS_MSG_STATE]
123
124     def _send_msg(self, pay, mtype, subid):
125         """
126         Creates and sends a message via RMR's send-message feature with the specified payload
127         using the specified message type and subscription ID.
128         """
129         sbuf = rmr.rmr_alloc_msg(self.mrc, len(pay), payload=pay, gen_transaction_id=True, mtype=mtype, sub_id=subid)
130         sbuf.contents.sub_id = subid
131         pre_send_summary = rmr.message_summary(sbuf)
132         for _ in range(0, RETRY_TIMES):
133             mdc_logger.debug("_send_msg: sending: {}".format(pre_send_summary))
134             sbuf = rmr.rmr_send_msg(self.mrc, sbuf)
135             msg_state = self._assert_good_send(sbuf, pre_send_summary)
136             mdc_logger.debug("_send_msg: result message state: {}".format(msg_state))
137             if msg_state != rmr.RMR_ERR_RETRY:
138                 break
139
140         rmr.rmr_free_msg(sbuf)
141         if msg_state != rmr.RMR_OK:
142             mdc_logger.warning("_send_msg: failed after {} retries".format(RETRY_TIMES))
143
144     def _rts_msg(self, pay, sbuf_rts, mtype):
145         """
146         Sends a message via RMR's return-to-sender feature.
147         This neither allocates nor frees a message buffer because we may rts many times.
148         Returns the message buffer from the RTS function, which may reallocate it.
149         """
150         pre_send_summary = rmr.message_summary(sbuf_rts)
151         for _ in range(0, RETRY_TIMES):
152             mdc_logger.debug("_rts_msg: sending: {}".format(pre_send_summary))
153             sbuf_rts = rmr.rmr_rts_msg(self.mrc, sbuf_rts, payload=pay, mtype=mtype)
154             msg_state = self._assert_good_send(sbuf_rts, pre_send_summary)
155             mdc_logger.debug("_rts_msg: result message state: {}".format(msg_state))
156             if msg_state != rmr.RMR_ERR_RETRY:
157                 break
158
159         if msg_state != rmr.RMR_OK:
160             mdc_logger.warning("_rts_msg: failed after {} retries".format(RETRY_TIMES))
161         return sbuf_rts  # in some cases rts may return a new sbuf
162
163     def _handle_sends(self):
164         # send out all messages waiting for us
165         while not self.instance_send_queue.empty():
166             work_item = self.instance_send_queue.get(block=False, timeout=None)
167             payload = json.dumps(messages.a1_to_handler(*work_item)).encode("utf-8")
168             self._send_msg(payload, A1_POLICY_REQUEST, work_item[1])
169
170         # now send all the ei-job related data
171         while not self.ei_job_result_queue.empty():
172             mdc_logger.debug("perform data delivery to consumer")
173
174             work_item = self.ei_job_result_queue.get(block=False, timeout=None)
175             payload = json.dumps(messages.ei_to_handler(*work_item)).encode("utf-8")
176             ei_job_id = int(work_item[0])
177             mdc_logger.debug("data-delivery: {}".format(payload))
178
179             # send the payload to consumer subscribed for ei_job_id
180             self._send_msg(payload, A1_EI_DATA_DELIVERY, ei_job_id)
181
182     def loop(self):
183         """
184         This loop runs forever, and has 3 jobs:
185         - send out any messages that have to go out (create instance, delete instance)
186         - read a1s mailbox and update the status of all instances based on acks from downstream policy handlers
187         - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
188         """
189         # loop forever
190         mdc_logger.debug("Work loop starting")
191         while self.keep_going:
192
193             # Update 3/20/2020
194             # We now handle our sends in a thread (that will just exit when it's done) because there is a difference between how send works in SI95 vs NNG.
195             # Send_msg via NNG formerly never blocked.
196             # However under SI95 this send may block for some arbitrary period of time on the first send to an endpoint for which a connection is not established
197             # If this send takes too long, this loop blocks, and the healthcheck will fail, which will cause A1s healthcheck to fail, which will cause Kubernetes to whack A1 and all kinds of horrible things happen.
198             # Therefore, now under SI95, we thread this.
199             Thread(target=self._handle_sends).start()
200
201             # read our mailbox
202             for (msg, sbuf) in self.rcv_func():
203                 # TODO: in the future we may also have to catch SDL errors
204                 try:
205                     mtype = msg[rmr.RMR_MS_MSG_TYPE]
206                 except (KeyError, TypeError, json.decoder.JSONDecodeError):
207                     mdc_logger.warning("Dropping malformed message: {0}".format(msg))
208
209                 if mtype == A1_POLICY_RESPONSE:
210                     try:
211                         # got a policy response, update status
212                         pay = json.loads(msg[rmr.RMR_MS_PAYLOAD])
213                         data.set_policy_instance_status(
214                             pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
215                         )
216                         mdc_logger.debug("Successfully received status update: {0}".format(pay))
217                     except (PolicyTypeNotFound, PolicyInstanceNotFound):
218                         mdc_logger.warning("Received a response for a non-existent type/instance: {0}".format(msg))
219                     except (KeyError, TypeError, json.decoder.JSONDecodeError):
220                         mdc_logger.warning("Dropping malformed policy response: {0}".format(msg))
221
222                 elif mtype == A1_POLICY_QUERY:
223                     try:
224                         # got a query, do a lookup and send out all instances
225                         pti = json.loads(msg[rmr.RMR_MS_PAYLOAD])["policy_type_id"]
226                         instance_list = data.get_instance_list(pti)  # will raise if a bad type
227                         mdc_logger.debug("Received a query for a known policy type: {0}".format(msg))
228                         for pii in instance_list:
229                             instance = data.get_policy_instance(pti, pii)
230                             payload = json.dumps(messages.a1_to_handler("CREATE", pti, pii, instance)).encode("utf-8")
231                             sbuf = self._rts_msg(payload, sbuf, A1_POLICY_REQUEST)
232                     except (PolicyTypeNotFound):
233                         mdc_logger.warning("Received a policy query for a non-existent type: {0}".format(msg))
234                     except (KeyError, TypeError, json.decoder.JSONDecodeError):
235                         mdc_logger.warning("Dropping malformed policy query: {0}".format(msg))
236
237                 elif mtype == A1_EI_QUERY_ALL:
238                     mdc_logger.debug("Received messaage {0}".format(msg))
239
240                     # query A1-EI co-ordinator service to get the EI-types
241                     resp = requests.get(ESC_EI_TYPE_PATH)
242                     if resp.status_code != 200:
243                         mdc_logger.warning("Received no reponse from A1-EI service")
244
245                     mdc_logger.debug("response from A1-EI service : {0}".format(resp.json()))
246
247                     # send the complete list of EI-types to xApp
248                     sbuf = self._rts_msg(resp.content, sbuf, AI_EI_QUERY_ALL_RESP)
249
250                 elif mtype == A1_EI_CREATE_JOB:
251                     mdc_logger.debug("Received message {0}".format(msg))
252                     payload = json.loads(msg[rmr.RMR_MS_PAYLOAD])
253                     mdc_logger.debug("Payload: {0}".format(payload))
254
255                     uuidStr = payload["job-id"]
256                     del payload["job-id"]
257
258                     mdc_logger.debug("Payload after removing job-id: {0}".format(payload))
259
260                     # 1. send request to A1-EI Service to create A1-EI JOB
261                     headers = {'Content-type': 'application/json'}
262                     r = requests.put(ECS_EI_JOB_PATH + uuidStr, data=json.dumps(payload), headers=headers)
263                     if (r.status_code != 201) and (r.status_code != 200):
264                         mdc_logger.warning("failed to create EIJOB : {0}".format(r))
265                     else:
266                         # 2. inform xApp for Job status
267                         mdc_logger.debug("received successful response (ei-job-id) :{0}".format(uuidStr))
268                         rmr_data = """{{
269                                 "ei_job_id": "{id}"
270                                 }}""".format(id=uuidStr)
271                         mdc_logger.debug("rmr_Data to send: {0}".format(rmr_data))
272                         sbuf = self._rts_msg(str.encode(rmr_data), sbuf, A1_EI_CREATE_JOB_RESP)
273
274                 else:
275                     mdc_logger.warning("Received message type {0} but A1 does not handle this".format(mtype))
276
277                 # we must free each sbuf
278                 rmr.rmr_free_msg(sbuf)
279             self.last_ran = time.time()
280             time.sleep(1)
281
282         mdc_logger.debug("RMR Thread Ending!")
283
284
285 # Public
286
287
288 def start_rmr_thread(init_func_override=None, rcv_func_override=None):
289     """
290     Start a1s rmr thread
291
292     Parameters
293     ----------
294     init_func_override: function (optional)
295         Function that initializes RMR and answers an RMR context.
296         Supply an empty function to skip initializing RMR.
297
298     rcv_func_override: function (optional)
299         Function that receives messages from RMR and answers a list.
300         Supply a trivial function to skip reading from RMR.
301     """
302     global __RMR_LOOP__
303     if __RMR_LOOP__ is None:
304         __RMR_LOOP__ = _RmrLoop(init_func_override, rcv_func_override)
305
306
307 def stop_rmr_thread():
308     """
309     stops the rmr thread
310     """
311     __RMR_LOOP__.keep_going = False
312
313
314 def queue_instance_send(item):
315     """
316     push an item into the work queue
317     currently the only type of work is to send out messages
318     """
319     __RMR_LOOP__.instance_send_queue.put(item)
320
321
322 def queue_ei_job_result(item):
323     """
324     push an item into the ei_job_queue
325     """
326     mdc_logger.debug("queuing data delivery item {0}".format(item))
327     __RMR_LOOP__.ei_job_result_queue.put(item)
328
329
330 def healthcheck_rmr_thread(seconds=30):
331     """
332     returns a boolean representing whether the rmr loop is healthy, by checking two attributes:
333     1. is it running?,
334     2. is it stuck in a long (> seconds) loop?
335     """
336     return __RMR_LOOP__.thread.is_alive() and ((time.time() - __RMR_LOOP__.last_ran) < seconds)
337
338
339 def replace_rcv_func(rcv_func):
340     """purely for the ease of unit testing to test different rcv scenarios"""
341     __RMR_LOOP__.rcv_func = rcv_func