A1 Mediator enhancements for A1-EI
[ric-plt/a1.git] / a1 / a1rmr.py
1 # ==================================================================================
2 #       Copyright (c) 2019-2020 Nokia
3 #       Copyright (c) 2018-2020 AT&T Intellectual Property.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 # ==================================================================================
17 """
18 A1 RMR functionality
19 """
20 import os
21 import queue
22 import time
23 import json
24 import requests
25 from threading import Thread
26 from ricxappframe.rmr import rmr, helpers
27 from mdclogpy import Logger
28 from a1 import data, messages
29 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
30
31 mdc_logger = Logger(name=__name__)
32
33
34 # With Nanomsg and NNG it was possible for a send attempt to have a "soft"
35 # failure which did warrant some retries if the status of the send is RMR_ERR_RETRY.
36 # Because of the way NNG worked, it sometimes required many tens of retries,
37 # and a retry state happened often for even moderately "verbose" applications.
38 # With SI95 there is still a possibility that a retry is necessary, but it is very rare.
39 RETRY_TIMES = int(os.environ.get("A1_RMR_RETRY_TIMES", 4))
40 A1_POLICY_REQUEST = 20010
41 A1_POLICY_RESPONSE = 20011
42 A1_POLICY_QUERY = 20012
43 A1_EI_QUERY_ALL = 20013
44 AI_EI_QUERY_ALL_RESP = 20014
45 A1_EI_CREATE_JOB = 20015
46 A1_EI_CREATE_JOB_RESP = 20016
47 A1_EI_DATA_DELIVERY = 20017
48 ECS_SERVICE_HOST = os.environ.get("ECS_SERVICE_HOST", "http://ecs-service:8083")
49 ESC_EI_TYPE_PATH = ECS_SERVICE_HOST + "/A1-EI/v1/eitypes"
50 ECS_EI_JOB_PATH = ECS_SERVICE_HOST + "/A1-EI/v1/eijobs/"
51
52
53 # Note; yes, globals are bad, but this is a private (to this module) global
54 # No other module can import/access this (well, python doesn't enforce this, but all linters will complain)
55 __RMR_LOOP__ = None
56
57
58 class _RmrLoop:
59     """
60     Class represents an rmr loop that constantly reads from rmr and performs operations
61     based on waiting messages.  This launches a thread, it should probably only be called
62     once; the public facing method to access these ensures this.
63
64     TODO: the xapp frame has a version of this looping structure. See if A1 can switch to that.
65     """
66
67     def __init__(self, init_func_override=None, rcv_func_override=None):
68         """
69         Init
70
71         Parameters
72         ----------
73         init_func_override: function (optional)
74             Function that initializes RMR and answers an RMR context.
75             Supply an empty function to skip initializing RMR.
76
77         rcv_func_override: function (optional)
78             Function that receives messages from RMR and answers a list.
79             Supply a trivial function to skip reading from RMR.
80         """
81         self.keep_going = True
82         self.rcv_func = None
83         self.last_ran = time.time()
84
85         # see docs/overview#resiliency for a discussion of this
86         self.instance_send_queue = queue.Queue()  # thread safe queue https://docs.python.org/3/library/queue.html
87         # queue for data delivery item
88         self.ei_job_result_queue = queue.Queue()
89
90         # intialize rmr context
91         if init_func_override:
92             self.mrc = init_func_override()
93         else:
94             mdc_logger.debug("Waiting for rmr to initialize..")
95             # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread
96             # populates an internal ring of messages, and receive calls read from that.
97             # currently the size is 2048 messages, so this is fine for the foreseeable future
98             self.mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
99             while rmr.rmr_ready(self.mrc) == 0:
100                 time.sleep(0.5)
101
102         # set the receive function
103         self.rcv_func = (
104             rcv_func_override
105             if rcv_func_override
106             else lambda: helpers.rmr_rcvall_msgs_raw(self.mrc, [A1_POLICY_RESPONSE, A1_POLICY_QUERY, A1_EI_QUERY_ALL, A1_EI_CREATE_JOB])
107         )
108
109         # start the work loop
110         self.thread = Thread(target=self.loop)
111         self.thread.start()
112
113     def _assert_good_send(self, sbuf, pre_send_summary):
114         """
115         Extracts the send result and logs a detailed warning if the send failed.
116         Returns the message state, an integer that indicates the result.
117         """
118         post_send_summary = rmr.message_summary(sbuf)
119         if post_send_summary[rmr.RMR_MS_MSG_STATE] != rmr.RMR_OK:
120             mdc_logger.warning("RMR send failed; pre-send summary: {0}, post-send summary: {1}".format(pre_send_summary, post_send_summary))
121         return post_send_summary[rmr.RMR_MS_MSG_STATE]
122
123     def _send_msg(self, pay, mtype, subid):
124         """
125         Creates and sends a message via RMR's send-message feature with the specified payload
126         using the specified message type and subscription ID.
127         """
128         sbuf = rmr.rmr_alloc_msg(self.mrc, len(pay), payload=pay, gen_transaction_id=True, mtype=mtype, sub_id=subid)
129         sbuf.contents.sub_id = subid
130         pre_send_summary = rmr.message_summary(sbuf)
131         for _ in range(0, RETRY_TIMES):
132             mdc_logger.debug("_send_msg: sending: {}".format(pre_send_summary))
133             sbuf = rmr.rmr_send_msg(self.mrc, sbuf)
134             msg_state = self._assert_good_send(sbuf, pre_send_summary)
135             mdc_logger.debug("_send_msg: result message state: {}".format(msg_state))
136             if msg_state != rmr.RMR_ERR_RETRY:
137                 break
138
139         rmr.rmr_free_msg(sbuf)
140         if msg_state != rmr.RMR_OK:
141             mdc_logger.warning("_send_msg: failed after {} retries".format(RETRY_TIMES))
142
143     def _rts_msg(self, pay, sbuf_rts, mtype):
144         """
145         Sends a message via RMR's return-to-sender feature.
146         This neither allocates nor frees a message buffer because we may rts many times.
147         Returns the message buffer from the RTS function, which may reallocate it.
148         """
149         pre_send_summary = rmr.message_summary(sbuf_rts)
150         for _ in range(0, RETRY_TIMES):
151             mdc_logger.debug("_rts_msg: sending: {}".format(pre_send_summary))
152             sbuf_rts = rmr.rmr_rts_msg(self.mrc, sbuf_rts, payload=pay, mtype=mtype)
153             msg_state = self._assert_good_send(sbuf_rts, pre_send_summary)
154             mdc_logger.debug("_rts_msg: result message state: {}".format(msg_state))
155             if msg_state != rmr.RMR_ERR_RETRY:
156                 break
157
158         if msg_state != rmr.RMR_OK:
159             mdc_logger.warning("_rts_msg: failed after {} retries".format(RETRY_TIMES))
160         return sbuf_rts  # in some cases rts may return a new sbuf
161
162     def _handle_sends(self):
163         # send out all messages waiting for us
164         while not self.instance_send_queue.empty():
165             work_item = self.instance_send_queue.get(block=False, timeout=None)
166             payload = json.dumps(messages.a1_to_handler(*work_item)).encode("utf-8")
167             self._send_msg(payload, A1_POLICY_REQUEST, work_item[1])
168
169         # now send all the ei-job related data
170         while not self.ei_job_result_queue.empty():
171             mdc_logger.debug("perform data delivery to consumer")
172
173             work_item = self.ei_job_result_queue.get(block=False, timeout=None)
174             payload = json.dumps(messages.ei_to_handler(*work_item)).encode("utf-8")
175             ei_job_id = int(payload.get("ei_job_id"))
176             mdc_logger.debug("data-delivery: {}".format(payload))
177
178             # send the payload to consumer subscribed for ei_job_id
179             self._send_msg(payload, A1_EI_DATA_DELIVERY, ei_job_id)
180
181     def loop(self):
182         """
183         This loop runs forever, and has 3 jobs:
184         - send out any messages that have to go out (create instance, delete instance)
185         - read a1s mailbox and update the status of all instances based on acks from downstream policy handlers
186         - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
187         """
188         # loop forever
189         mdc_logger.debug("Work loop starting")
190         while self.keep_going:
191
192             # Update 3/20/2020
193             # We now handle our sends in a thread (that will just exit when it's done) because there is a difference between how send works in SI95 vs NNG.
194             # Send_msg via NNG formerly never blocked.
195             # However under SI95 this send may block for some arbitrary period of time on the first send to an endpoint for which a connection is not established
196             # If this send takes too long, this loop blocks, and the healthcheck will fail, which will cause A1s healthcheck to fail, which will cause Kubernetes to whack A1 and all kinds of horrible things happen.
197             # Therefore, now under SI95, we thread this.
198             Thread(target=self._handle_sends).start()
199
200             # read our mailbox
201             for (msg, sbuf) in self.rcv_func():
202                 # TODO: in the future we may also have to catch SDL errors
203                 try:
204                     mtype = msg[rmr.RMR_MS_MSG_TYPE]
205                 except (KeyError, TypeError, json.decoder.JSONDecodeError):
206                     mdc_logger.warning("Dropping malformed message: {0}".format(msg))
207
208                 if mtype == A1_POLICY_RESPONSE:
209                     try:
210                         # got a policy response, update status
211                         pay = json.loads(msg[rmr.RMR_MS_PAYLOAD])
212                         data.set_policy_instance_status(
213                             pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
214                         )
215                         mdc_logger.debug("Successfully received status update: {0}".format(pay))
216                     except (PolicyTypeNotFound, PolicyInstanceNotFound):
217                         mdc_logger.warning("Received a response for a non-existent type/instance: {0}".format(msg))
218                     except (KeyError, TypeError, json.decoder.JSONDecodeError):
219                         mdc_logger.warning("Dropping malformed policy response: {0}".format(msg))
220
221                 elif mtype == A1_POLICY_QUERY:
222                     try:
223                         # got a query, do a lookup and send out all instances
224                         pti = json.loads(msg[rmr.RMR_MS_PAYLOAD])["policy_type_id"]
225                         instance_list = data.get_instance_list(pti)  # will raise if a bad type
226                         mdc_logger.debug("Received a query for a known policy type: {0}".format(msg))
227                         for pii in instance_list:
228                             instance = data.get_policy_instance(pti, pii)
229                             payload = json.dumps(messages.a1_to_handler("CREATE", pti, pii, instance)).encode("utf-8")
230                             sbuf = self._rts_msg(payload, sbuf, A1_POLICY_REQUEST)
231                     except (PolicyTypeNotFound):
232                         mdc_logger.warning("Received a policy query for a non-existent type: {0}".format(msg))
233                     except (KeyError, TypeError, json.decoder.JSONDecodeError):
234                         mdc_logger.warning("Dropping malformed policy query: {0}".format(msg))
235
236                 elif mtype == A1_EI_QUERY_ALL:
237                     mdc_logger.debug("Received messaage {0}".format(msg))
238
239                     # query A1-EI co-ordinator service to get the EI-types
240                     resp = requests.get(ESC_EI_TYPE_PATH)
241                     if resp.status_code != 200:
242                         mdc_logger.warning("Received no reponse from A1-EI service")
243
244                     mdc_logger.debug("response from A1-EI service : {0}".format(resp.json()))
245
246                     # send the complete list of EI-types to xApp
247                     sbuf = self._rts_msg(resp.content, sbuf, AI_EI_QUERY_ALL_RESP)
248
249                 elif mtype == A1_EI_CREATE_JOB:
250                     mdc_logger.debug("Received message {0}".format(msg))
251                     payload = json.loads(msg[rmr.RMR_MS_PAYLOAD])
252                     mdc_logger.debug("Payload: {0}".format(payload))
253
254                     uuidStr = payload["job-id"]
255                     del payload["job-id"]
256
257                     mdc_logger.debug("Payload after removing job-id: {0}".format(payload))
258
259                     # 1. send request to A1-EI Service to create A1-EI JOB
260                     headers = {'Content-type': 'application/json'}
261                     r = requests.put(ECS_EI_JOB_PATH + uuidStr, data=json.dumps(payload), headers=headers)
262                     if (r.status_code != 201) and (r.status_code != 200):
263                         mdc_logger.warning("failed to create EIJOB : {0}".format(r))
264                     else:
265                         # 2. inform xApp for Job status
266                         mdc_logger.debug("received successful response (ei-job-id) :{0}".format(uuidStr))
267                         rmr_data = """{{
268                                 "ei_job_id": "{id}"
269                                 }}""".format(id=uuidStr)
270                         mdc_logger.debug("rmr_Data to send: {0}".format(rmr_data))
271                         sbuf = self._rts_msg(str.encode(rmr_data), sbuf, A1_EI_CREATE_JOB_RESP)
272
273                 else:
274                     mdc_logger.warning("Received message type {0} but A1 does not handle this".format(mtype))
275
276                 # we must free each sbuf
277                 rmr.rmr_free_msg(sbuf)
278
279             self.last_ran = time.time()
280             time.sleep(1)
281
282         mdc_logger.debug("RMR Thread Ending!")
283
284
285 # Public
286
287
288 def start_rmr_thread(init_func_override=None, rcv_func_override=None):
289     """
290     Start a1s rmr thread
291
292     Parameters
293     ----------
294     init_func_override: function (optional)
295         Function that initializes RMR and answers an RMR context.
296         Supply an empty function to skip initializing RMR.
297
298     rcv_func_override: function (optional)
299         Function that receives messages from RMR and answers a list.
300         Supply a trivial function to skip reading from RMR.
301     """
302     global __RMR_LOOP__
303     if __RMR_LOOP__ is None:
304         __RMR_LOOP__ = _RmrLoop(init_func_override, rcv_func_override)
305
306
307 def stop_rmr_thread():
308     """
309     stops the rmr thread
310     """
311     __RMR_LOOP__.keep_going = False
312
313
314 def queue_instance_send(item):
315     """
316     push an item into the work queue
317     currently the only type of work is to send out messages
318     """
319     __RMR_LOOP__.instance_send_queue.put(item)
320
321
322 def queue_ei_job_result(item):
323     """
324     push an item into the ei_job_queue
325     """
326     mdc_logger.debug("before queue {0}".format(item))
327     __RMR_LOOP__.ei_job_result_queue.put(item)
328     mdc_logger.debug("after queue")
329
330
331 def healthcheck_rmr_thread(seconds=30):
332     """
333     returns a boolean representing whether the rmr loop is healthy, by checking two attributes:
334     1. is it running?,
335     2. is it stuck in a long (> seconds) loop?
336     """
337     return __RMR_LOOP__.thread.is_alive() and ((time.time() - __RMR_LOOP__.last_ran) < seconds)
338
339
340 def replace_rcv_func(rcv_func):
341     """purely for the ease of unit testing to test different rcv scenarios"""
342     __RMR_LOOP__.rcv_func = rcv_func