+ def loop(self):
+ """
+ This loop runs forever, and has 3 jobs:
+ - send out any messages that have to go out (create instance, delete instance)
+ - read a1s mailbox and update the status of all instances based on acks from downstream policy handlers
+ - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
+ """
+ # loop forever
+ mdc_logger.debug("Work loop starting")
+ while self.keep_going:
+
+ # Update 3/20/2020
+ # We now handle our sends in a thread (that will just exit when it's done) because there is a difference between how send works in SI95 vs NNG.
+ # Send_msg via NNG formerly never blocked.
+ # However under SI95 this send may block for some arbitrary period of time on the first send to an endpoint for which a connection is not established
+ # If this send takes too long, this loop blocks, and the healthcheck will fail, which will cause A1s healthcheck to fail, which will cause Kubernetes to whack A1 and all kinds of horrible things happen.
+ # Therefore, now under SI95, we thread this.
+ Thread(target=self._handle_sends).start()
+
+ # read our mailbox
+ for (msg, sbuf) in self.rcv_func():
+ # TODO: in the future we may also have to catch SDL errors
+ try:
+ mtype = msg[rmr.RMR_MS_MSG_TYPE]
+ except (KeyError, TypeError, json.decoder.JSONDecodeError):
+ mdc_logger.warning("Dropping malformed message: {0}".format(msg))
+
+ if mtype == A1_POLICY_RESPONSE:
+ try:
+ # got a policy response, update status
+ pay = json.loads(msg[rmr.RMR_MS_PAYLOAD])
+ data.set_policy_instance_status(
+ pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
+ )
+ mdc_logger.debug("Successfully received status update: {0}".format(pay))
+ except (PolicyTypeNotFound, PolicyInstanceNotFound):
+ mdc_logger.warning("Received a response for a non-existent type/instance: {0}".format(msg))
+ except (KeyError, TypeError, json.decoder.JSONDecodeError):
+ mdc_logger.warning("Dropping malformed policy response: {0}".format(msg))
+
+ elif mtype == A1_POLICY_QUERY:
+ try:
+ # got a query, do a lookup and send out all instances
+ pti = json.loads(msg[rmr.RMR_MS_PAYLOAD])["policy_type_id"]
+ instance_list = data.get_instance_list(pti) # will raise if a bad type
+ mdc_logger.debug("Received a query for a known policy type: {0}".format(msg))
+ for pii in instance_list:
+ instance = data.get_policy_instance(pti, pii)
+ payload = json.dumps(messages.a1_to_handler("CREATE", pti, pii, instance)).encode("utf-8")
+ sbuf = self._rts_msg(payload, sbuf, A1_POLICY_REQUEST)
+ except (PolicyTypeNotFound):
+ mdc_logger.warning("Received a policy query for a non-existent type: {0}".format(msg))
+ except (KeyError, TypeError, json.decoder.JSONDecodeError):
+ mdc_logger.warning("Dropping malformed policy query: {0}".format(msg))
+
+ elif mtype == A1_EI_QUERY_ALL:
+ mdc_logger.debug("Received messaage {0}".format(msg))
+
+ # query A1-EI co-ordinator service to get the EI-types
+ resp = requests.get(ESC_EI_TYPE_PATH)
+ if resp.status_code != 200:
+ mdc_logger.warning("Received no reponse from A1-EI service")
+
+ mdc_logger.debug("response from A1-EI service : {0}".format(resp.json()))
+
+ # send the complete list of EI-types to xApp
+ sbuf = self._rts_msg(resp.content, sbuf, AI_EI_QUERY_ALL_RESP)
+
+ elif mtype == A1_EI_CREATE_JOB:
+ mdc_logger.debug("Received message {0}".format(msg))
+ payload = json.loads(msg[rmr.RMR_MS_PAYLOAD])
+ mdc_logger.debug("Payload: {0}".format(payload))
+
+ uuidStr = payload["job-id"]
+ del payload["job-id"]
+
+ mdc_logger.debug("Payload after removing job-id: {0}".format(payload))
+
+ # 1. send request to A1-EI Service to create A1-EI JOB
+ headers = {'Content-type': 'application/json'}
+ r = requests.put(ECS_EI_JOB_PATH + uuidStr, data=json.dumps(payload), headers=headers)
+ if (r.status_code != 201) and (r.status_code != 200):
+ mdc_logger.warning("failed to create EIJOB : {0}".format(r))
+ else:
+ # 2. inform xApp for Job status
+ mdc_logger.debug("received successful response (ei-job-id) :{0}".format(uuidStr))
+ rmr_data = """{{
+ "ei_job_id": "{id}"
+ }}""".format(id=uuidStr)
+ mdc_logger.debug("rmr_Data to send: {0}".format(rmr_data))
+ sbuf = self._rts_msg(str.encode(rmr_data), sbuf, A1_EI_CREATE_JOB_RESP)
+
+ else:
+ mdc_logger.warning("Received message type {0} but A1 does not handle this".format(mtype))
+
+ # we must free each sbuf
+ rmr.rmr_free_msg(sbuf)
+
+ self.last_ran = time.time()
+ time.sleep(1)
+
+ mdc_logger.debug("RMR Thread Ending!")