X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=a1%2Fcontroller.py;h=cde7483404412df90ff69afcfe03459e0547021f;hb=refs%2Fchanges%2F07%2F1907%2F13;hp=e6e093fe1c91375f55b0c58aabd6eb14d3562744;hpb=fdf050451414e1a816e343bcd56f33186a742e49;p=ric-plt%2Fa1.git diff --git a/a1/controller.py b/a1/controller.py index e6e093f..cde7483 100644 --- a/a1/controller.py +++ b/a1/controller.py @@ -1,3 +1,6 @@ +""" +Main a1 controller +""" # ================================================================================== # Copyright (c) 2019 Nokia # Copyright (c) 2018-2019 AT&T Intellectual Property. @@ -15,48 +18,14 @@ # limitations under the License. # ================================================================================== from flask import Response -import connexion -import json +from jsonschema import validate from jsonschema.exceptions import ValidationError -from a1 import get_module_logger -from a1 import a1rmr, exceptions, utils - - -logger = get_module_logger(__name__) - - -def _get_policy_definition(policyname): - # Currently we read the manifest on each call, which would seem to allow updating A1 in place. Revisit this? - manifest = utils.get_ric_manifest() - for m in manifest["controls"]: - if m["name"] == policyname: - return m - raise exceptions.PolicyNotFound() - - -def _get_needed_policy_info(policyname): - """ - Get the needed info for a policy - """ - m = _get_policy_definition(policyname) - return ( - utils.rmr_string_to_int(m["message_receives_rmr_type"]), - m["message_receives_payload_schema"] if "message_receives_payload_schema" in m else None, - utils.rmr_string_to_int(m["message_sends_rmr_type"]), - ) +import connexion +from mdclogpy import Logger +from a1 import a1rmr, exceptions, data -def _get_needed_policy_fetch_info(policyname): - """ - Get the needed info for fetching a policy state - """ - m = _get_policy_definition(policyname) - req_k = "control_state_request_rmr_type" - ack_k = "control_state_request_reply_rmr_type" - return ( - utils.rmr_string_to_int(m[req_k]) if req_k in m else None, - utils.rmr_string_to_int(m[ack_k]) if ack_k in m else None, - ) +mdc_logger = Logger(name=__name__) def _try_func_return(func): @@ -65,97 +34,146 @@ def _try_func_return(func): """ try: return func() - except ValidationError as exc: - logger.exception(exc) + except (ValidationError, exceptions.PolicyTypeAlreadyExists, exceptions.CantDeleteNonEmptyType): return "", 400 - except exceptions.PolicyNotFound as exc: - logger.exception(exc) + except (exceptions.PolicyTypeNotFound, exceptions.PolicyInstanceNotFound): return "", 404 - except exceptions.MissingManifest as exc: - logger.exception(exc) - return "A1 was unable to find the required RIC manifest. report this!", 500 - except exceptions.MissingRmrString as exc: - logger.exception(exc) - return "A1 does not have a mapping for the desired rmr string. report this!", 500 - except exceptions.MessageSendFailure as exc: - logger.exception(exc) - return "A1 was unable to send a needed message to a downstream subscriber", 504 - except exceptions.ExpectedAckNotReceived as exc: - logger.exception(exc) - return "A1 was expecting an ACK back but it didn't receive one or didn't recieve the expected ACK", 504 except BaseException as exc: # catch all, should never happen... - logger.exception(exc) + mdc_logger.exception(exc) return Response(status=500) -def _put_handler(policyname, data): +# Healthcheck + + +def get_healthcheck(): """ - Handles policy put + Handles healthcheck GET + Currently, this checks: + 1. whether the a1 webserver is up (if it isn't, this won't even be called, so even entering this function confirms it is) + 2. checks whether the rmr thread is running and has completed a loop recently + TODO: make "seconds" to pass in a configurable parameter? """ + if a1rmr.healthcheck_rmr_thread(): + return "", 200 + return "rmr thread is unhealthy", 500 - mtype_send, schema, mtype_return = _get_needed_policy_info(policyname) - # validate the PUT against the schema, or if there is no shema, make sure the pUT is empty - if schema: - utils.validate_json(data, schema) - elif data != {}: - return "BODY SUPPLIED BUT POLICY HAS NO EXPECTED BODY", 400 +# Policy types - # send rmr, wait for ACK - return_payload = a1rmr.send_ack_retry(json.dumps(data), message_type=mtype_send, expected_ack_message_type=mtype_return) - # right now it is assumed that xapps respond with JSON payloads - # it is further assumed that they include a field "status" and that the value "SUCCESS" indicates a good policy change - try: - rpj = json.loads(return_payload) - return (rpj, 200) if rpj["status"] == "SUCCESS" else ({"reason": "BAD STATUS", "return_payload": rpj}, 502) - except json.decoder.JSONDecodeError: - return {"reason": "NOT JSON", "return_payload": return_payload}, 502 - except KeyError: - return {"reason": "NO STATUS", "return_payload": rpj}, 502 +def get_all_policy_types(): + """ + Handles GET /a1-p/policytypes + """ + return _try_func_return(data.get_type_list) -def _get_handler(policyname): +def create_policy_type(policy_type_id): """ - Handles policy GET + Handles PUT /a1-p/policytypes/policy_type_id """ - mtype_send, mtype_return = _get_needed_policy_fetch_info(policyname) - if not (mtype_send and mtype_return): - return "POLICY DOES NOT SUPPORT FETCHING", 400 + def put_type_handler(): + data.store_policy_type(policy_type_id, body) + return "", 201 - # send rmr, wait for ACK - return_payload = a1rmr.send_ack_retry("", message_type=mtype_send, expected_ack_message_type=mtype_return) + body = connexion.request.json + return _try_func_return(put_type_handler) - # right now it is assumed that xapps respond with JSON payloads - try: - return (json.loads(return_payload), 200) - except json.decoder.JSONDecodeError: - return {"reason": "NOT JSON", "return_payload": return_payload}, 502 +def get_policy_type(policy_type_id): + """ + Handles GET /a1-p/policytypes/policy_type_id + """ + return _try_func_return(lambda: data.get_policy_type(policy_type_id)) + + +def delete_policy_type(policy_type_id): + """ + Handles DELETE /a1-p/policytypes/policy_type_id + """ -# Public + def delete_policy_type_handler(): + data.delete_policy_type(policy_type_id) + return "", 204 + return _try_func_return(delete_policy_type_handler) -def put_handler(policyname): + +# Policy instances + + +def get_all_instances_for_type(policy_type_id): """ - Handles policy replacement + Handles GET /a1-p/policytypes/policy_type_id/policies """ - data = connexion.request.json - return _try_func_return(lambda: _put_handler(policyname, data)) + return _try_func_return(lambda: data.get_instance_list(policy_type_id)) -def get_handler(policyname): +def get_policy_instance(policy_type_id, policy_instance_id): """ - Handles policy GET + Handles GET /a1-p/policytypes/polidyid/policies/policy_instance_id """ - return _try_func_return(lambda: _get_handler(policyname)) + return _try_func_return(lambda: data.get_policy_instance(policy_type_id, policy_instance_id)) -def healthcheck_handler(): +def get_policy_instance_status(policy_type_id, policy_instance_id): """ - Handles healthcheck GET - Currently, this basically checks the server is alive.a1rmr + Handles GET /a1-p/policytypes/polidyid/policies/policy_instance_id/status + + Return the aggregated status. The order of rules is as follows: + 1. If a1 has received at least one status, and *all* received statuses are "DELETED", we blow away the instance and return a 404 + 2. if a1 has received at least one status and at least one is OK, we return "IN EFFECT" + 3. "NOT IN EFFECT" otherwise (no statuses, or none are OK but not all are deleted) + """ + + return _try_func_return(lambda: data.get_policy_instance_status(policy_type_id, policy_instance_id)) + + +def create_or_replace_policy_instance(policy_type_id, policy_instance_id): + """ + Handles PUT /a1-p/policytypes/polidyid/policies/policy_instance_id + """ + instance = connexion.request.json + + def put_instance_handler(): + """ + Handles policy instance put + + For now, policy_type_id is used as the message type + """ + # validate the PUT against the schema + schema = data.get_policy_type(policy_type_id)["create_schema"] + validate(instance=instance, schema=schema) + + # store the instance + data.store_policy_instance(policy_type_id, policy_instance_id, instance) + + # queue rmr send (best effort) + a1rmr.queue_instance_send(("CREATE", policy_type_id, policy_instance_id, instance)) + + return "", 202 + + return _try_func_return(put_instance_handler) + + +def delete_policy_instance(policy_type_id, policy_instance_id): + """ + Handles DELETE /a1-p/policytypes/polidyid/policies/policy_instance_id """ - return "", 200 + + def delete_instance_handler(): + """ + here we send out the DELETEs but we don't delete the instance until a GET is called where we check the statuses + We also set the status as deleted which would be reflected in a GET to ../status (before the DELETE completes) + """ + data.delete_policy_instance(policy_type_id, policy_instance_id) + + # queue rmr send (best effort) + a1rmr.queue_instance_send(("DELETE", policy_type_id, policy_instance_id, "")) + + return "", 202 + + return _try_func_return(delete_instance_handler)