X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=a1%2Fcontroller.py;h=92ff278ae9cf2d97c08f12a67c58a376cac0875e;hb=40caa314d23122f0bd25c0e66b65d10303538164;hp=ca9618b72f0d870a6df8ebca14c5589fcbaac546;hpb=5ad8f03e1fc7683bb59da31f59edc2f6c0b2372b;p=ric-plt%2Fa1.git diff --git a/a1/controller.py b/a1/controller.py index ca9618b..92ff278 100644 --- a/a1/controller.py +++ b/a1/controller.py @@ -17,28 +17,30 @@ from flask import Response import connexion import json +from jsonschema.exceptions import ValidationError from a1 import get_module_logger -from a1 import a1rmr, exceptions, utils +from a1 import a1rmr, exceptions, utils, data logger = get_module_logger(__name__) -def _get_needed_policy_info(policyname): - """ - Get the needed info for a policy - """ +def _get_policy_definition(policy_type_id): # Currently we read the manifest on each call, which would seem to allow updating A1 in place. Revisit this? manifest = utils.get_ric_manifest() for m in manifest["controls"]: - if m["name"] == policyname: - schema = m["message_receives_payload_schema"] if "message_receives_payload_schema" in m else None - return ( - utils.rmr_string_to_int(m["message_receives_rmr_type"]), - schema, - utils.rmr_string_to_int(m["message_sends_rmr_type"]), - ) - raise exceptions.PolicyNotFound() + if m["name"] == policy_type_id: + return m + + raise exceptions.PolicyTypeNotFound() + + +def _get_policy_schema(policy_type_id): + """ + Get the needed info for a policy + """ + m = _get_policy_definition(policy_type_id) + return m["message_receives_payload_schema"] if "message_receives_payload_schema" in m else None def _try_func_return(func): @@ -47,7 +49,13 @@ def _try_func_return(func): """ try: return func() - except exceptions.PolicyNotFound as exc: + except ValidationError as exc: + logger.exception(exc) + return "", 400 + except exceptions.PolicyTypeNotFound as exc: + logger.exception(exc) + return "", 404 + except exceptions.PolicyInstanceNotFound as exc: logger.exception(exc) return "", 404 except exceptions.MissingManifest as exc: @@ -56,58 +64,152 @@ def _try_func_return(func): except exceptions.MissingRmrString as exc: logger.exception(exc) return "A1 does not have a mapping for the desired rmr string. report this!", 500 - except exceptions.MessageSendFailure as exc: - logger.exception(exc) - return "A1 was unable to send a needed message to a downstream subscriber", 504 - except exceptions.ExpectedAckNotReceived as exc: - logger.exception(exc) - return "A1 was expecting an ACK back but it didn't receive one or didn't recieve the expected ACK", 504 except BaseException as exc: # catch all, should never happen... logger.exception(exc) return Response(status=500) -def _put_handler(policyname, data): +def _put_handler(policy_type_id, policy_instance_id, instance): """ Handles policy put - """ - mtype_send, schema, mtype_return = _get_needed_policy_info(policyname) + For now, policy_type_id is used as the message type + """ + # check for 404 + data.type_is_valid(policy_type_id) # validate the PUT against the schema, or if there is no shema, make sure the pUT is empty + schema = _get_policy_schema(policy_type_id) if schema: - utils.validate_json(data, schema) - elif data != {}: + utils.validate_json(instance, schema) + elif instance != {}: return "BODY SUPPLIED BUT POLICY HAS NO EXPECTED BODY", 400 - # send rmr, wait for ACK - return_payload = a1rmr.send_ack_retry(json.dumps(data), message_type=mtype_send, expected_ack_message_type=mtype_return) + # store the instance + data.store_policy_instance(policy_type_id, policy_instance_id, instance) + + body = { + "operation": "CREATE", + "policy_type_id": policy_type_id, + "policy_instance_id": policy_instance_id, + "payload": instance, + } + + # send rmr (best effort) + a1rmr.send(json.dumps(body), message_type=policy_type_id) + + return "", 201 + + +def _get_status_handler(policy_type_id, policy_instance_id): + """ + Pop trough A1s mailbox, insert the latest status updates into the database, and then return the status vector + + NOTE: this is done lazily. Meaning, when someone performs a GET on this API, we pop through a1s mailbox. + THis may not work in the future if there are "thousands" of policy acknowledgements that hit a1 before this is called, + because the rmr mailbox may fill. However, in the near term, we do not expect this to happen. + """ + # check validity to 404 first: + data.type_is_valid(policy_type_id) + data.instance_is_valid(policy_type_id, policy_instance_id) + + # pop a1s mailbox, looking for policy notifications + new_messages = a1rmr.dequeue_all_waiting_messages(21024) + + # try to parse the messages as responses. Drop those that are malformed + for msg in new_messages: + # note, we don't use the parameters "policy_type_id, policy_instance" from above here, + # because we are popping the whole mailbox, which might include other statuses + pay = json.loads(msg["payload"]) + if "policy_type_id" in pay and "policy_instance_id" in pay and "handler_id" in pay and "status" in pay: + data.set_policy_instance_status( + pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"] + ) + else: + logger.debug("Dropping message") + logger.debug(pay) + + # return the status vector + return data.get_policy_instance_statuses(policy_type_id, policy_instance_id) + + +# Healthcheck + + +def get_healthcheck(): + """ + Handles healthcheck GET + Currently, this basically checks the server is alive.a1rmr + """ + return "", 200 - # right now it is assumed that xapps respond with JSON payloads - # it is further assumed that they include a field "status" and that the value "SUCCESS" indicates a good policy change - try: - rpj = json.loads(return_payload) - return (rpj, 200) if rpj["status"] == "SUCCESS" else ({"reason": "BAD STATUS", "return_payload": rpj}, 502) - except json.decoder.JSONDecodeError: - return {"reason": "NOT JSON", "return_payload": return_payload}, 502 - except KeyError: - return {"reason": "NO STATUS", "return_payload": rpj}, 502 +# Policy types -# Public + +def get_all_policy_types(): + """ + Handles GET /a1-p/policytypes + """ + return "", 501 + + +def create_policy_type(policy_type_id): + """ + Handles PUT /a1-p/policytypes/policy_type_id + """ + return "", 501 + + +def get_policy_type(policy_type_id): + """ + Handles GET /a1-p/policytypes/policy_type_id + """ + return "", 501 + + +def delete_policy_type(policy_type_id): + """ + Handles DELETE /a1-p/policytypes/policy_type_id + """ + return "", 501 + + +# Policy instances + + +def get_all_instances_for_type(policy_type_id): + """ + Handles GET /a1-p/policytypes/policy_type_id/policies + """ + return "", 501 + + +def get_policy_instance(policy_type_id, policy_instance_id): + """ + Handles GET /a1-p/policytypes/polidyid/policies/policy_instance_id + """ + return _try_func_return(lambda: data.get_policy_instance(policy_type_id, policy_instance_id)) + + +def get_policy_instance_status(policy_type_id, policy_instance_id): + """ + Handles GET /a1-p/policytypes/polidyid/policies/policy_instance_id/status + """ + return _try_func_return(lambda: _get_status_handler(policy_type_id, policy_instance_id)) -def put_handler(policyname): +def create_or_replace_policy_instance(policy_type_id, policy_instance_id): """ - Handles policy replacement + Handles PUT /a1-p/policytypes/polidyid/policies/policy_instance_id """ - data = connexion.request.json - return _try_func_return(lambda: _put_handler(policyname, data)) + instance = connexion.request.json + return _try_func_return(lambda: _put_handler(policy_type_id, policy_instance_id, instance)) -def get_handler(policyname): +def delete_policy_instance(policy_type_id, policy_instance_id): """ - Handles policy GET + Handles DELETE /a1-p/policytypes/polidyid/policies/policy_instance_id """ return "", 501