+ # store the instance
+ data.store_policy_instance(policy_type_id, policy_instance_id, instance)
+
+ body = {
+ "operation": "CREATE",
+ "policy_type_id": policy_type_id,
+ "policy_instance_id": policy_instance_id,
+ "payload": instance,
+ }
+
+ # send rmr (best effort)
+ a1rmr.send(json.dumps(body), message_type=policy_type_id)
+
+ return "", 201
+
+
+def _get_status_handler(policy_type_id, policy_instance_id):
+ """
+ Pop trough A1s mailbox, insert the latest status updates into the database, and then return the status vector
+
+ NOTE: this is done lazily. Meaning, when someone performs a GET on this API, we pop through a1s mailbox.
+ THis may not work in the future if there are "thousands" of policy acknowledgements that hit a1 before this is called,
+ because the rmr mailbox may fill. However, in the near term, we do not expect this to happen.
+ """
+ # check validity to 404 first:
+ data.type_is_valid(policy_type_id)
+ data.instance_is_valid(policy_type_id, policy_instance_id)
+
+ # pop a1s mailbox, looking for policy notifications
+ new_messages = a1rmr.dequeue_all_waiting_messages(21024)
+
+ # try to parse the messages as responses. Drop those that are malformed
+ for msg in new_messages:
+ # note, we don't use the parameters "policy_type_id, policy_instance" from above here,
+ # because we are popping the whole mailbox, which might include other statuses
+ pay = json.loads(msg["payload"])
+ if "policy_type_id" in pay and "policy_instance_id" in pay and "handler_id" in pay and "status" in pay:
+ data.set_policy_instance_status(
+ pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
+ )
+ else:
+ logger.debug("Dropping message")
+ logger.debug(pay)
+
+ # return the status vector
+ return data.get_policy_instance_statuses(policy_type_id, policy_instance_id)
+
+
+# Healthcheck
+
+
+def get_healthcheck():
+ """
+ Handles healthcheck GET
+ Currently, this basically checks the server is alive.a1rmr
+ """
+ return "", 200