X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=a1%2Fcontroller.py;h=9fd36d5861bd3992bae000e947d06214d51890e6;hb=91ae88989c82b08b9fb69a28f838d6b80681d953;hp=92ff278ae9cf2d97c08f12a67c58a376cac0875e;hpb=40caa314d23122f0bd25c0e66b65d10303538164;p=ric-plt%2Fa1.git diff --git a/a1/controller.py b/a1/controller.py index 92ff278..9fd36d5 100644 --- a/a1/controller.py +++ b/a1/controller.py @@ -14,35 +14,18 @@ # See the License for the specific language governing permissions and # limitations under the License. # ================================================================================== +import json from flask import Response +from jsonschema import validate import connexion -import json from jsonschema.exceptions import ValidationError from a1 import get_module_logger -from a1 import a1rmr, exceptions, utils, data +from a1 import a1rmr, exceptions, data logger = get_module_logger(__name__) -def _get_policy_definition(policy_type_id): - # Currently we read the manifest on each call, which would seem to allow updating A1 in place. Revisit this? - manifest = utils.get_ric_manifest() - for m in manifest["controls"]: - if m["name"] == policy_type_id: - return m - - raise exceptions.PolicyTypeNotFound() - - -def _get_policy_schema(policy_type_id): - """ - Get the needed info for a policy - """ - m = _get_policy_definition(policy_type_id) - return m["message_receives_payload_schema"] if "message_receives_payload_schema" in m else None - - def _try_func_return(func): """ generic caller that returns the apporp http response if exceptions are raised @@ -52,6 +35,9 @@ def _try_func_return(func): except ValidationError as exc: logger.exception(exc) return "", 400 + except exceptions.PolicyTypeAlreadyExists as exc: + logger.exception(exc) + return "", 400 except exceptions.PolicyTypeNotFound as exc: logger.exception(exc) return "", 404 @@ -70,70 +56,6 @@ def _try_func_return(func): return Response(status=500) -def _put_handler(policy_type_id, policy_instance_id, instance): - """ - Handles policy put - - For now, policy_type_id is used as the message type - """ - # check for 404 - data.type_is_valid(policy_type_id) - - # validate the PUT against the schema, or if there is no shema, make sure the pUT is empty - schema = _get_policy_schema(policy_type_id) - if schema: - utils.validate_json(instance, schema) - elif instance != {}: - return "BODY SUPPLIED BUT POLICY HAS NO EXPECTED BODY", 400 - - # store the instance - data.store_policy_instance(policy_type_id, policy_instance_id, instance) - - body = { - "operation": "CREATE", - "policy_type_id": policy_type_id, - "policy_instance_id": policy_instance_id, - "payload": instance, - } - - # send rmr (best effort) - a1rmr.send(json.dumps(body), message_type=policy_type_id) - - return "", 201 - - -def _get_status_handler(policy_type_id, policy_instance_id): - """ - Pop trough A1s mailbox, insert the latest status updates into the database, and then return the status vector - - NOTE: this is done lazily. Meaning, when someone performs a GET on this API, we pop through a1s mailbox. - THis may not work in the future if there are "thousands" of policy acknowledgements that hit a1 before this is called, - because the rmr mailbox may fill. However, in the near term, we do not expect this to happen. - """ - # check validity to 404 first: - data.type_is_valid(policy_type_id) - data.instance_is_valid(policy_type_id, policy_instance_id) - - # pop a1s mailbox, looking for policy notifications - new_messages = a1rmr.dequeue_all_waiting_messages(21024) - - # try to parse the messages as responses. Drop those that are malformed - for msg in new_messages: - # note, we don't use the parameters "policy_type_id, policy_instance" from above here, - # because we are popping the whole mailbox, which might include other statuses - pay = json.loads(msg["payload"]) - if "policy_type_id" in pay and "policy_instance_id" in pay and "handler_id" in pay and "status" in pay: - data.set_policy_instance_status( - pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"] - ) - else: - logger.debug("Dropping message") - logger.debug(pay) - - # return the status vector - return data.get_policy_instance_statuses(policy_type_id, policy_instance_id) - - # Healthcheck @@ -159,14 +81,20 @@ def create_policy_type(policy_type_id): """ Handles PUT /a1-p/policytypes/policy_type_id """ - return "", 501 + + def _put_type_handler(policy_type_id, body): + data.store_policy_type(policy_type_id, body) + return "", 201 + + body = connexion.request.json + return _try_func_return(lambda: _put_type_handler(policy_type_id, body)) def get_policy_type(policy_type_id): """ Handles GET /a1-p/policytypes/policy_type_id """ - return "", 501 + return _try_func_return(lambda: data.get_policy_type(policy_type_id)) def delete_policy_type(policy_type_id): @@ -190,6 +118,7 @@ def get_policy_instance(policy_type_id, policy_instance_id): """ Handles GET /a1-p/policytypes/polidyid/policies/policy_instance_id """ + # 200 is automatic here return _try_func_return(lambda: data.get_policy_instance(policy_type_id, policy_instance_id)) @@ -197,6 +126,38 @@ def get_policy_instance_status(policy_type_id, policy_instance_id): """ Handles GET /a1-p/policytypes/polidyid/policies/policy_instance_id/status """ + + def _get_status_handler(policy_type_id, policy_instance_id): + """ + Pop trough A1s mailbox, insert the latest status updates into the database, and then return the status vector + + NOTE: this is done lazily. Meaning, when someone performs a GET on this API, we pop through a1s mailbox. + THis may not work in the future if there are "thousands" of policy acknowledgements that hit a1 before this is called, + because the rmr mailbox may fill. However, in the near term, we do not expect this to happen. + """ + # check validity to 404 first: + data.type_is_valid(policy_type_id) + data.instance_is_valid(policy_type_id, policy_instance_id) + + # pop a1s mailbox, looking for policy notifications + new_messages = a1rmr.dequeue_all_waiting_messages(21024) + + # try to parse the messages as responses. Drop those that are malformed + for msg in new_messages: + # note, we don't use the parameters "policy_type_id, policy_instance" from above here, + # because we are popping the whole mailbox, which might include other statuses + pay = json.loads(msg["payload"]) + if "policy_type_id" in pay and "policy_instance_id" in pay and "handler_id" in pay and "status" in pay: + data.set_policy_instance_status( + pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"] + ) + else: + logger.debug("Dropping message") + logger.debug(pay) + + # return the status vector + return data.get_policy_instance_statuses(policy_type_id, policy_instance_id) + return _try_func_return(lambda: _get_status_handler(policy_type_id, policy_instance_id)) @@ -204,8 +165,34 @@ def create_or_replace_policy_instance(policy_type_id, policy_instance_id): """ Handles PUT /a1-p/policytypes/polidyid/policies/policy_instance_id """ + + def _put_instance_handler(policy_type_id, policy_instance_id, instance): + """ + Handles policy instance put + + For now, policy_type_id is used as the message type + """ + # validate the PUT against the schema + schema = data.get_policy_type(policy_type_id)["create_schema"] + validate(instance=instance, schema=schema) + + # store the instance + data.store_policy_instance(policy_type_id, policy_instance_id, instance) + + body = { + "operation": "CREATE", + "policy_type_id": policy_type_id, + "policy_instance_id": policy_instance_id, + "payload": instance, + } + + # send rmr (best effort) + a1rmr.send(json.dumps(body), message_type=policy_type_id) + + return "", 201 + instance = connexion.request.json - return _try_func_return(lambda: _put_handler(policy_type_id, policy_instance_id, instance)) + return _try_func_return(lambda: _put_instance_handler(policy_type_id, policy_instance_id, instance)) def delete_policy_instance(policy_type_id, policy_instance_id):