import json
from jsonschema.exceptions import ValidationError
from a1 import get_module_logger
-from a1 import a1rmr, exceptions, utils
+from a1 import a1rmr, exceptions, utils, data
logger = get_module_logger(__name__)
-def _get_needed_policy_info(policyname):
- """
- Get the needed info for a policy
- """
+def _get_policy_definition(policy_type_id):
# Currently we read the manifest on each call, which would seem to allow updating A1 in place. Revisit this?
manifest = utils.get_ric_manifest()
for m in manifest["controls"]:
- if m["name"] == policyname:
- schema = m["message_receives_payload_schema"] if "message_receives_payload_schema" in m else None
- return (
- utils.rmr_string_to_int(m["message_receives_rmr_type"]),
- schema,
- utils.rmr_string_to_int(m["message_sends_rmr_type"]),
- )
- raise exceptions.PolicyNotFound()
+ if m["name"] == policy_type_id:
+ return m
+
+ raise exceptions.PolicyTypeNotFound()
+
+
+def _get_policy_schema(policy_type_id):
+ """
+ Get the needed info for a policy
+ """
+ m = _get_policy_definition(policy_type_id)
+ return m["message_receives_payload_schema"] if "message_receives_payload_schema" in m else None
def _try_func_return(func):
except ValidationError as exc:
logger.exception(exc)
return "", 400
- except exceptions.PolicyNotFound as exc:
+ except exceptions.PolicyTypeNotFound as exc:
+ logger.exception(exc)
+ return "", 404
+ except exceptions.PolicyInstanceNotFound as exc:
logger.exception(exc)
return "", 404
except exceptions.MissingManifest as exc:
except exceptions.MissingRmrString as exc:
logger.exception(exc)
return "A1 does not have a mapping for the desired rmr string. report this!", 500
- except exceptions.MessageSendFailure as exc:
- logger.exception(exc)
- return "A1 was unable to send a needed message to a downstream subscriber", 504
- except exceptions.ExpectedAckNotReceived as exc:
- logger.exception(exc)
- return "A1 was expecting an ACK back but it didn't receive one or didn't recieve the expected ACK", 504
except BaseException as exc:
# catch all, should never happen...
logger.exception(exc)
return Response(status=500)
-def _put_handler(policyname, data):
+def _put_handler(policy_type_id, policy_instance_id, instance):
"""
Handles policy put
- """
- mtype_send, schema, mtype_return = _get_needed_policy_info(policyname)
+ For now, policy_type_id is used as the message type
+ """
+ # check for 404
+ data.type_is_valid(policy_type_id)
# validate the PUT against the schema, or if there is no shema, make sure the pUT is empty
+ schema = _get_policy_schema(policy_type_id)
if schema:
- utils.validate_json(data, schema)
- elif data != {}:
+ utils.validate_json(instance, schema)
+ elif instance != {}:
return "BODY SUPPLIED BUT POLICY HAS NO EXPECTED BODY", 400
- # send rmr, wait for ACK
- return_payload = a1rmr.send_ack_retry(json.dumps(data), message_type=mtype_send, expected_ack_message_type=mtype_return)
+ # store the instance
+ data.store_policy_instance(policy_type_id, policy_instance_id, instance)
- # right now it is assumed that xapps respond with JSON payloads
- # it is further assumed that they include a field "status" and that the value "SUCCESS" indicates a good policy change
- try:
- rpj = json.loads(return_payload)
- return (rpj, 200) if rpj["status"] == "SUCCESS" else ({"reason": "BAD STATUS", "return_payload": rpj}, 502)
- except json.decoder.JSONDecodeError:
- return {"reason": "NOT JSON", "return_payload": return_payload}, 502
- except KeyError:
- return {"reason": "NO STATUS", "return_payload": rpj}, 502
+ body = {
+ "operation": "CREATE",
+ "policy_type_id": policy_type_id,
+ "policy_instance_id": policy_instance_id,
+ "payload": instance,
+ }
+
+ # send rmr (best effort)
+ a1rmr.send(json.dumps(body), message_type=policy_type_id)
+
+ return "", 201
+
+
+def _get_status_handler(policy_type_id, policy_instance_id):
+ """
+ Pop trough A1s mailbox, insert the latest status updates into the database, and then return the status vector
+
+ NOTE: this is done lazily. Meaning, when someone performs a GET on this API, we pop through a1s mailbox.
+ THis may not work in the future if there are "thousands" of policy acknowledgements that hit a1 before this is called,
+ because the rmr mailbox may fill. However, in the near term, we do not expect this to happen.
+ """
+ # check validity to 404 first:
+ data.type_is_valid(policy_type_id)
+ data.instance_is_valid(policy_type_id, policy_instance_id)
+
+ # pop a1s mailbox, looking for policy notifications
+ new_messages = a1rmr.dequeue_all_waiting_messages(21024)
+
+ # try to parse the messages as responses. Drop those that are malformed
+ for msg in new_messages:
+ # note, we don't use the parameters "policy_type_id, policy_instance" from above here,
+ # because we are popping the whole mailbox, which might include other statuses
+ pay = json.loads(msg["payload"])
+ if "policy_type_id" in pay and "policy_instance_id" in pay and "handler_id" in pay and "status" in pay:
+ data.set_policy_instance_status(
+ pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
+ )
+ else:
+ logger.debug("Dropping message")
+ logger.debug(pay)
+
+ # return the status vector
+ return data.get_policy_instance_statuses(policy_type_id, policy_instance_id)
+
+
+# Healthcheck
+
+
+def get_healthcheck():
+ """
+ Handles healthcheck GET
+ Currently, this basically checks the server is alive.a1rmr
+ """
+ return "", 200
+
+
+# Policy types
-# Public
+def get_all_policy_types():
+ """
+ Handles GET /a1-p/policytypes
+ """
+ return "", 501
+
+
+def create_policy_type(policy_type_id):
+ """
+ Handles PUT /a1-p/policytypes/policy_type_id
+ """
+ return "", 501
+
+
+def get_policy_type(policy_type_id):
+ """
+ Handles GET /a1-p/policytypes/policy_type_id
+ """
+ return "", 501
+
+
+def delete_policy_type(policy_type_id):
+ """
+ Handles DELETE /a1-p/policytypes/policy_type_id
+ """
+ return "", 501
+
+
+# Policy instances
+
+
+def get_all_instances_for_type(policy_type_id):
+ """
+ Handles GET /a1-p/policytypes/policy_type_id/policies
+ """
+ return "", 501
+
+
+def get_policy_instance(policy_type_id, policy_instance_id):
+ """
+ Handles GET /a1-p/policytypes/polidyid/policies/policy_instance_id
+ """
+ return _try_func_return(lambda: data.get_policy_instance(policy_type_id, policy_instance_id))
+
+
+def get_policy_instance_status(policy_type_id, policy_instance_id):
+ """
+ Handles GET /a1-p/policytypes/polidyid/policies/policy_instance_id/status
+ """
+ return _try_func_return(lambda: _get_status_handler(policy_type_id, policy_instance_id))
-def put_handler(policyname):
+def create_or_replace_policy_instance(policy_type_id, policy_instance_id):
"""
- Handles policy replacement
+ Handles PUT /a1-p/policytypes/polidyid/policies/policy_instance_id
"""
- data = connexion.request.json
- return _try_func_return(lambda: _put_handler(policyname, data))
+ instance = connexion.request.json
+ return _try_func_return(lambda: _put_handler(policy_type_id, policy_instance_id, instance))
-def get_handler(policyname):
+def delete_policy_instance(policy_type_id, policy_instance_id):
"""
- Handles policy GET
+ Handles DELETE /a1-p/policytypes/polidyid/policies/policy_instance_id
"""
return "", 501