- pay = json.loads(msg["payload"])
- pti = pay["policy_type_id"]
- pii = pay["policy_instance_id"]
- data.set_policy_instance_status(pti, pii, pay["handler_id"], pay["status"])
- except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError, TypeError, json.decoder.JSONDecodeError):
- # TODO: in the future we may also have to catch SDL errors
- mdc_logger.debug("Dropping malformed or non applicable message: {0}".format(msg))
-
- # TODO: what's a reasonable sleep time? we don't want to hammer redis too much, and a1 isn't a real time component
+ mtype = msg["message type"]
+ except (KeyError, TypeError, json.decoder.JSONDecodeError):
+ mdc_logger.debug("Dropping malformed policy ack/query message: {0}".format(msg))
+
+ if mtype == A1_POLICY_RESPONSE:
+ try:
+ # got a policy response, update status
+ pay = json.loads(msg["payload"])
+ data.set_policy_instance_status(
+ pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
+ )
+ mdc_logger.debug("Successfully received status update: {0}".format(pay))
+ except (PolicyTypeNotFound, PolicyInstanceNotFound):
+ mdc_logger.debug("Received a response for a non-existent instance")
+ except (KeyError, TypeError, json.decoder.JSONDecodeError):
+ mdc_logger.debug("Dropping malformed policy ack message: {0}".format(msg))
+
+ elif mtype == A1_POLICY_QUERY:
+ try:
+ # got a query, do a lookup and send out all instances
+ pti = json.loads(msg["payload"])["policy_type_id"]
+ mdc_logger.debug("Received query for: {0}".format(pti))
+ for pii in data.get_instance_list(pti):
+ instance = data.get_policy_instance(pti, pii)
+ payload = json.dumps(messages.a1_to_handler("CREATE", pti, pii, instance)).encode("utf-8")
+ sbuf = self._rts_msg(payload, sbuf, A1_POLICY_REQUEST)
+ except (PolicyTypeNotFound, PolicyInstanceNotFound):
+ mdc_logger.debug("Received a query for a non-existent type: {0}".format(msg))
+ except (KeyError, TypeError, json.decoder.JSONDecodeError):
+ mdc_logger.debug("Dropping malformed policy query message: {0}".format(msg))
+
+ else:
+ mdc_logger.debug("Received message type {0} but A1 does not handle this".format(mtype))
+
+ # we must free each sbuf
+ rmr.rmr_free_msg(sbuf)
+