1 # ==================================================================================
2 # Copyright (c) 2019-2020 Nokia
3 # Copyright (c) 2018-2020 AT&T Intellectual Property.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ==================================================================================
25 from threading import Thread
26 from ricxappframe.rmr import rmr, helpers
27 from mdclogpy import Logger
28 from a1 import data, messages
29 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
32 mdc_logger.mdclog_format_init(configmap_monitor=True)
35 # With Nanomsg and NNG it was possible for a send attempt to have a "soft"
36 # failure which did warrant some retries if the status of the send is RMR_ERR_RETRY.
37 # Because of the way NNG worked, it sometimes required many tens of retries,
38 # and a retry state happened often for even moderately "verbose" applications.
39 # With SI95 there is still a possibility that a retry is necessary, but it is very rare.
40 RETRY_TIMES = int(os.environ.get("A1_RMR_RETRY_TIMES", 4))
41 A1_POLICY_REQUEST = 20010
42 A1_POLICY_RESPONSE = 20011
43 A1_POLICY_QUERY = 20012
44 A1_EI_QUERY_ALL = 20013
45 AI_EI_QUERY_ALL_RESP = 20014
46 A1_EI_CREATE_JOB = 20015
47 A1_EI_CREATE_JOB_RESP = 20016
48 A1_EI_DATA_DELIVERY = 20017
49 ECS_SERVICE_HOST = os.environ.get("ECS_SERVICE_HOST", "http://ecs-service:8083")
50 ESC_EI_TYPE_PATH = ECS_SERVICE_HOST + "/A1-EI/v1/eitypes"
51 ECS_EI_JOB_PATH = ECS_SERVICE_HOST + "/A1-EI/v1/eijobs/"
54 # Note; yes, globals are bad, but this is a private (to this module) global
55 # No other module can import/access this (well, python doesn't enforce this, but all linters will complain)
61 Class represents an rmr loop that constantly reads from rmr and performs operations
62 based on waiting messages. This launches a thread, it should probably only be called
63 once; the public facing method to access these ensures this.
65 TODO: the xapp frame has a version of this looping structure. See if A1 can switch to that.
68 def __init__(self, init_func_override=None, rcv_func_override=None):
74 init_func_override: function (optional)
75 Function that initializes RMR and answers an RMR context.
76 Supply an empty function to skip initializing RMR.
78 rcv_func_override: function (optional)
79 Function that receives messages from RMR and answers a list.
80 Supply a trivial function to skip reading from RMR.
82 self.keep_going = True
84 self.last_ran = time.time()
86 # see docs/overview#resiliency for a discussion of this
87 self.instance_send_queue = queue.Queue() # thread safe queue https://docs.python.org/3/library/queue.html
88 # queue for data delivery item
89 self.ei_job_result_queue = queue.Queue()
91 # intialize rmr context
92 if init_func_override:
93 self.mrc = init_func_override()
95 mdc_logger.debug("Waiting for rmr to initialize..")
96 # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread
97 # populates an internal ring of messages, and receive calls read from that.
98 # currently the size is 2048 messages, so this is fine for the foreseeable future
99 self.mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
100 while rmr.rmr_ready(self.mrc) == 0:
103 # set the receive function
107 else lambda: helpers.rmr_rcvall_msgs_raw(self.mrc, [A1_POLICY_RESPONSE, A1_POLICY_QUERY, A1_EI_QUERY_ALL, A1_EI_CREATE_JOB])
110 # start the work loop
111 self.thread = Thread(target=self.loop)
114 def _assert_good_send(self, sbuf, pre_send_summary):
116 Extracts the send result and logs a detailed warning if the send failed.
117 Returns the message state, an integer that indicates the result.
119 post_send_summary = rmr.message_summary(sbuf)
120 if post_send_summary[rmr.RMR_MS_MSG_STATE] != rmr.RMR_OK:
121 mdc_logger.warning("RMR send failed; pre-send summary: {0}, post-send summary: {1}".format(pre_send_summary, post_send_summary))
122 return post_send_summary[rmr.RMR_MS_MSG_STATE]
124 def _send_msg(self, pay, mtype, subid):
126 Creates and sends a message via RMR's send-message feature with the specified payload
127 using the specified message type and subscription ID.
129 sbuf = rmr.rmr_alloc_msg(self.mrc, len(pay), payload=pay, gen_transaction_id=True, mtype=mtype, sub_id=subid)
130 sbuf.contents.sub_id = subid
131 pre_send_summary = rmr.message_summary(sbuf)
132 for _ in range(0, RETRY_TIMES):
133 mdc_logger.debug("_send_msg: sending: {}".format(pre_send_summary))
134 sbuf = rmr.rmr_send_msg(self.mrc, sbuf)
135 msg_state = self._assert_good_send(sbuf, pre_send_summary)
136 mdc_logger.debug("_send_msg: result message state: {}".format(msg_state))
137 if msg_state != rmr.RMR_ERR_RETRY:
140 rmr.rmr_free_msg(sbuf)
141 if msg_state != rmr.RMR_OK:
142 mdc_logger.warning("_send_msg: failed after {} retries".format(RETRY_TIMES))
144 def _rts_msg(self, pay, sbuf_rts, mtype):
146 Sends a message via RMR's return-to-sender feature.
147 This neither allocates nor frees a message buffer because we may rts many times.
148 Returns the message buffer from the RTS function, which may reallocate it.
150 pre_send_summary = rmr.message_summary(sbuf_rts)
151 for _ in range(0, RETRY_TIMES):
152 mdc_logger.debug("_rts_msg: sending: {}".format(pre_send_summary))
153 sbuf_rts = rmr.rmr_rts_msg(self.mrc, sbuf_rts, payload=pay, mtype=mtype)
154 msg_state = self._assert_good_send(sbuf_rts, pre_send_summary)
155 mdc_logger.debug("_rts_msg: result message state: {}".format(msg_state))
156 if msg_state != rmr.RMR_ERR_RETRY:
159 if msg_state != rmr.RMR_OK:
160 mdc_logger.warning("_rts_msg: failed after {} retries".format(RETRY_TIMES))
161 return sbuf_rts # in some cases rts may return a new sbuf
163 def _handle_sends(self):
164 # send out all messages waiting for us
165 while not self.instance_send_queue.empty():
166 work_item = self.instance_send_queue.get(block=False, timeout=None)
167 payload = json.dumps(messages.a1_to_handler(*work_item)).encode("utf-8")
168 self._send_msg(payload, A1_POLICY_REQUEST, work_item[1])
170 # now send all the ei-job related data
171 while not self.ei_job_result_queue.empty():
172 mdc_logger.debug("perform data delivery to consumer")
174 work_item = self.ei_job_result_queue.get(block=False, timeout=None)
175 payload = json.dumps(messages.ei_to_handler(*work_item)).encode("utf-8")
176 ei_job_id = int(work_item[0])
177 mdc_logger.debug("data-delivery: {}".format(payload))
179 # send the payload to consumer subscribed for ei_job_id
180 self._send_msg(payload, A1_EI_DATA_DELIVERY, ei_job_id)
184 This loop runs forever, and has 3 jobs:
185 - send out any messages that have to go out (create instance, delete instance)
186 - read a1s mailbox and update the status of all instances based on acks from downstream policy handlers
187 - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
190 mdc_logger.debug("Work loop starting")
191 while self.keep_going:
194 # We now handle our sends in a thread (that will just exit when it's done) because there is a difference between how send works in SI95 vs NNG.
195 # Send_msg via NNG formerly never blocked.
196 # However under SI95 this send may block for some arbitrary period of time on the first send to an endpoint for which a connection is not established
197 # If this send takes too long, this loop blocks, and the healthcheck will fail, which will cause A1s healthcheck to fail, which will cause Kubernetes to whack A1 and all kinds of horrible things happen.
198 # Therefore, now under SI95, we thread this.
199 Thread(target=self._handle_sends).start()
202 for (msg, sbuf) in self.rcv_func():
203 # TODO: in the future we may also have to catch SDL errors
205 mtype = msg[rmr.RMR_MS_MSG_TYPE]
206 except (KeyError, TypeError, json.decoder.JSONDecodeError):
207 mdc_logger.warning("Dropping malformed message: {0}".format(msg))
209 if mtype == A1_POLICY_RESPONSE:
211 # got a policy response, update status
212 pay = json.loads(msg[rmr.RMR_MS_PAYLOAD])
213 data.set_policy_instance_status(
214 pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
216 mdc_logger.debug("Successfully received status update: {0}".format(pay))
217 except (PolicyTypeNotFound, PolicyInstanceNotFound):
218 mdc_logger.warning("Received a response for a non-existent type/instance: {0}".format(msg))
219 except (KeyError, TypeError, json.decoder.JSONDecodeError):
220 mdc_logger.warning("Dropping malformed policy response: {0}".format(msg))
222 elif mtype == A1_POLICY_QUERY:
224 # got a query, do a lookup and send out all instances
225 pti = json.loads(msg[rmr.RMR_MS_PAYLOAD])["policy_type_id"]
226 instance_list = data.get_instance_list(pti) # will raise if a bad type
227 mdc_logger.debug("Received a query for a known policy type: {0}".format(msg))
228 for pii in instance_list:
229 instance = data.get_policy_instance(pti, pii)
230 payload = json.dumps(messages.a1_to_handler("CREATE", pti, pii, instance)).encode("utf-8")
231 sbuf = self._rts_msg(payload, sbuf, A1_POLICY_REQUEST)
232 except (PolicyTypeNotFound):
233 mdc_logger.warning("Received a policy query for a non-existent type: {0}".format(msg))
234 except (KeyError, TypeError, json.decoder.JSONDecodeError):
235 mdc_logger.warning("Dropping malformed policy query: {0}".format(msg))
237 elif mtype == A1_EI_QUERY_ALL:
238 mdc_logger.debug("Received messaage {0}".format(msg))
240 # query A1-EI co-ordinator service to get the EI-types
241 resp = requests.get(ESC_EI_TYPE_PATH)
242 if resp.status_code != 200:
243 mdc_logger.warning("Received no reponse from A1-EI service")
245 mdc_logger.debug("response from A1-EI service : {0}".format(resp.json()))
247 # send the complete list of EI-types to xApp
248 sbuf = self._rts_msg(resp.content, sbuf, AI_EI_QUERY_ALL_RESP)
250 elif mtype == A1_EI_CREATE_JOB:
251 mdc_logger.debug("Received message {0}".format(msg))
252 payload = json.loads(msg[rmr.RMR_MS_PAYLOAD])
253 mdc_logger.debug("Payload: {0}".format(payload))
255 uuidStr = payload["job-id"]
256 del payload["job-id"]
258 mdc_logger.debug("Payload after removing job-id: {0}".format(payload))
260 # 1. send request to A1-EI Service to create A1-EI JOB
261 headers = {'Content-type': 'application/json'}
262 r = requests.put(ECS_EI_JOB_PATH + uuidStr, data=json.dumps(payload), headers=headers)
263 if (r.status_code != 201) and (r.status_code != 200):
264 mdc_logger.warning("failed to create EIJOB : {0}".format(r))
266 # 2. inform xApp for Job status
267 mdc_logger.debug("received successful response (ei-job-id) :{0}".format(uuidStr))
270 }}""".format(id=uuidStr)
271 mdc_logger.debug("rmr_Data to send: {0}".format(rmr_data))
272 sbuf = self._rts_msg(str.encode(rmr_data), sbuf, A1_EI_CREATE_JOB_RESP)
275 mdc_logger.warning("Received message type {0} but A1 does not handle this".format(mtype))
277 # we must free each sbuf
278 rmr.rmr_free_msg(sbuf)
280 self.last_ran = time.time()
283 mdc_logger.debug("RMR Thread Ending!")
289 def start_rmr_thread(init_func_override=None, rcv_func_override=None):
295 init_func_override: function (optional)
296 Function that initializes RMR and answers an RMR context.
297 Supply an empty function to skip initializing RMR.
299 rcv_func_override: function (optional)
300 Function that receives messages from RMR and answers a list.
301 Supply a trivial function to skip reading from RMR.
304 if __RMR_LOOP__ is None:
305 __RMR_LOOP__ = _RmrLoop(init_func_override, rcv_func_override)
308 def stop_rmr_thread():
312 __RMR_LOOP__.keep_going = False
315 def queue_instance_send(item):
317 push an item into the work queue
318 currently the only type of work is to send out messages
320 __RMR_LOOP__.instance_send_queue.put(item)
323 def queue_ei_job_result(item):
325 push an item into the ei_job_queue
327 mdc_logger.debug("queuing data delivery item {0}".format(item))
328 __RMR_LOOP__.ei_job_result_queue.put(item)
331 def healthcheck_rmr_thread(seconds=30):
333 returns a boolean representing whether the rmr loop is healthy, by checking two attributes:
335 2. is it stuck in a long (> seconds) loop?
337 return __RMR_LOOP__.thread.is_alive() and ((time.time() - __RMR_LOOP__.last_ran) < seconds)
340 def replace_rcv_func(rcv_func):
341 """purely for the ease of unit testing to test different rcv scenarios"""
342 __RMR_LOOP__.rcv_func = rcv_func