X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=a1%2Fcontroller.py;h=19b8a26b4ee27e08a4de90964fa9f4d16ca66587;hb=f0ac3fb27a3bda65076f790def3284065c95b098;hp=b1897504fee7f1850f9d2bdda727f4773b717062;hpb=30a7bdce9115e6bba8811edae2fc949e404021da;p=ric-plt%2Fa1.git diff --git a/a1/controller.py b/a1/controller.py index b189750..19b8a26 100644 --- a/a1/controller.py +++ b/a1/controller.py @@ -1,6 +1,6 @@ # ================================================================================== -# Copyright (c) 2019 Nokia -# Copyright (c) 2018-2019 AT&T Intellectual Property. +# Copyright (c) 2019-2020 Nokia +# Copyright (c) 2018-2020 AT&T Intellectual Property. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,46 +14,55 @@ # See the License for the specific language governing permissions and # limitations under the License. # ================================================================================== -import json -from flask import Response +""" +Main a1 controller +""" from jsonschema import validate -import connexion from jsonschema.exceptions import ValidationError -from a1 import get_module_logger +import connexion +from prometheus_client import Counter +from mdclogpy import Logger +from ricsdl.exceptions import RejectedByBackend, NotConnected, BackendError from a1 import a1rmr, exceptions, data -logger = get_module_logger(__name__) +mdc_logger = Logger(name=__name__) +mdc_logger.mdclog_format_init(configmap_monitor=True) + +a1_counters = Counter('A1Policy', 'Policy type and instance counters', ['counter']) + + +def _log_build_http_resp(exception, http_resp_code): + """ + helper method that logs the exception and returns a tuple of (str, int) as a http response + """ + msg = repr(exception) + mdc_logger.warning("Request failed, returning {0}: {1}".format(http_resp_code, msg)) + return msg, http_resp_code def _try_func_return(func): """ - generic caller that returns the apporp http response if exceptions are raised + helper method that runs the function and returns a detailed http response if an exception is raised. """ try: return func() - except ValidationError as exc: - logger.exception(exc) - return "", 400 - except exceptions.PolicyTypeAlreadyExists as exc: - logger.exception(exc) - return "", 400 - except exceptions.PolicyTypeNotFound as exc: - logger.exception(exc) - return "", 404 - except exceptions.PolicyInstanceNotFound as exc: - logger.exception(exc) - return "", 404 - except exceptions.MissingManifest as exc: - logger.exception(exc) - return "A1 was unable to find the required RIC manifest. report this!", 500 - except exceptions.MissingRmrString as exc: - logger.exception(exc) - return "A1 does not have a mapping for the desired rmr string. report this!", 500 - except BaseException as exc: - # catch all, should never happen... - logger.exception(exc) - return Response(status=500) + except (ValidationError, exceptions.PolicyTypeAlreadyExists, exceptions.PolicyTypeIdMismatch, exceptions.CantDeleteNonEmptyType) as exc: + return _log_build_http_resp(exc, 400) + except (exceptions.PolicyTypeNotFound, exceptions.PolicyInstanceNotFound) as exc: + return _log_build_http_resp(exc, 404) + except (RejectedByBackend, NotConnected, BackendError) as exc: + """ + These are SDL errors. At the time of development here, we do not have a good understanding + which of these errors are "try again later it may work" and which are "never going to work". + There is some discussion that RejectedByBackend is in the latter category, suggesting it + should map to 400, but until we understand the root cause of these errors, it's confusing + to clients to give them a 400 (a "your fault" code) because they won't know how to fix. + For now, we log, and 503, and investigate the logs later to improve the handling/reporting. + """ + # mdc_logger.exception(exc) # waiting for https://jira.o-ran-sc.org/browse/RIC-39 + return _log_build_http_resp(exc, 503) + # let other types of unexpected exceptions blow up and log # Healthcheck @@ -62,8 +71,17 @@ def _try_func_return(func): def get_healthcheck(): """ Handles healthcheck GET - Currently, this basically checks the server is alive.a1rmr - """ + Currently, this checks: + 1. whether the a1 webserver is up (if it isn't, this won't even be called, so even entering this function confirms it is) + 2. checks whether the rmr thread is running and has completed a loop recently + 3. checks that our SDL connection is healthy + """ + if not a1rmr.healthcheck_rmr_thread(): + mdc_logger.error("A1 is not healthy due to the rmr thread") + return "rmr thread is unhealthy", 500 + if not data.SDL.healthcheck(): + mdc_logger.error("A1 is not healthy because it does not have a connection to SDL") + return "sdl connection is unhealthy", 500 return "", 200 @@ -74,20 +92,22 @@ def get_all_policy_types(): """ Handles GET /a1-p/policytypes """ - return _try_func_return(lambda: data.get_type_list()) + return _try_func_return(data.get_type_list) def create_policy_type(policy_type_id): """ Handles PUT /a1-p/policytypes/policy_type_id """ + a1_counters.labels(counter='CreatePolicyTypeReqs').inc() - def _put_type_handler(policy_type_id, body): + def put_type_handler(): data.store_policy_type(policy_type_id, body) + mdc_logger.debug("Policy type {} created.".format(policy_type_id)) return "", 201 body = connexion.request.json - return _try_func_return(lambda: _put_type_handler(policy_type_id, body)) + return _try_func_return(put_type_handler) def get_policy_type(policy_type_id): @@ -101,7 +121,14 @@ def delete_policy_type(policy_type_id): """ Handles DELETE /a1-p/policytypes/policy_type_id """ - return "", 501 + a1_counters.labels(counter='DeletePolicyTypeReqs').inc() + + def delete_policy_type_handler(): + data.delete_policy_type(policy_type_id) + mdc_logger.debug("Policy type {} deleted.".format(policy_type_id)) + return "", 204 + + return _try_func_return(delete_policy_type_handler) # Policy instances @@ -118,55 +145,29 @@ def get_policy_instance(policy_type_id, policy_instance_id): """ Handles GET /a1-p/policytypes/polidyid/policies/policy_instance_id """ - # 200 is automatic here return _try_func_return(lambda: data.get_policy_instance(policy_type_id, policy_instance_id)) def get_policy_instance_status(policy_type_id, policy_instance_id): """ Handles GET /a1-p/policytypes/polidyid/policies/policy_instance_id/status - """ - - def _get_status_handler(policy_type_id, policy_instance_id): - """ - Pop trough A1s mailbox, insert the latest status updates into the database, and then return the status vector - - NOTE: this is done lazily. Meaning, when someone performs a GET on this API, we pop through a1s mailbox. - THis may not work in the future if there are "thousands" of policy acknowledgements that hit a1 before this is called, - because the rmr mailbox may fill. However, in the near term, we do not expect this to happen. - """ - # check validity to 404 first: - data.type_is_valid(policy_type_id) - data.instance_is_valid(policy_type_id, policy_instance_id) - - # pop a1s mailbox, looking for policy notifications - new_messages = a1rmr.dequeue_all_waiting_messages(21024) - - # try to parse the messages as responses. Drop those that are malformed - for msg in new_messages: - # note, we don't use the parameters "policy_type_id, policy_instance" from above here, - # because we are popping the whole mailbox, which might include other statuses - pay = json.loads(msg["payload"]) - if "policy_type_id" in pay and "policy_instance_id" in pay and "handler_id" in pay and "status" in pay: - data.set_policy_instance_status( - pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"] - ) - else: - logger.debug("Dropping message") - logger.debug(pay) - # return the status vector - return data.get_policy_instance_statuses(policy_type_id, policy_instance_id) - - return _try_func_return(lambda: _get_status_handler(policy_type_id, policy_instance_id)) + Return the aggregated status. The order of rules is as follows: + 1. If a1 has received at least one status, and *all* received statuses are "DELETED", we blow away the instance and return a 404 + 2. if a1 has received at least one status and at least one is OK, we return "IN EFFECT" + 3. "NOT IN EFFECT" otherwise (no statuses, or none are OK but not all are deleted) + """ + return _try_func_return(lambda: data.get_policy_instance_status(policy_type_id, policy_instance_id)) def create_or_replace_policy_instance(policy_type_id, policy_instance_id): """ Handles PUT /a1-p/policytypes/polidyid/policies/policy_instance_id """ + a1_counters.labels(counter='CreatePolicyInstanceReqs').inc() + instance = connexion.request.json - def _put_instance_handler(policy_type_id, policy_instance_id, instance): + def put_instance_handler(): """ Handles policy instance put @@ -177,26 +178,46 @@ def create_or_replace_policy_instance(policy_type_id, policy_instance_id): validate(instance=instance, schema=schema) # store the instance - data.store_policy_instance(policy_type_id, policy_instance_id, instance) + operation = data.store_policy_instance(policy_type_id, policy_instance_id, instance) - body = { - "operation": "CREATE", - "policy_type_id": policy_type_id, - "policy_instance_id": policy_instance_id, - "payload": instance, - } + # queue rmr send (best effort) + a1rmr.queue_instance_send((operation, policy_type_id, policy_instance_id, instance)) - # send rmr (best effort) - a1rmr.send(json.dumps(body), message_type=policy_type_id) + return "", 202 - return "", 201 - - instance = connexion.request.json - return _try_func_return(lambda: _put_instance_handler(policy_type_id, policy_instance_id, instance)) + return _try_func_return(put_instance_handler) def delete_policy_instance(policy_type_id, policy_instance_id): """ Handles DELETE /a1-p/policytypes/polidyid/policies/policy_instance_id """ - return "", 501 + a1_counters.labels(counter='DeletePolicyInstanceReqs').inc() + + def delete_instance_handler(): + data.delete_policy_instance(policy_type_id, policy_instance_id) + + # queue rmr send (best effort) + a1rmr.queue_instance_send(("DELETE", policy_type_id, policy_instance_id, "")) + + return "", 202 + + return _try_func_return(delete_instance_handler) + + +# data delivery + + +def data_delivery(): + """ + Handle data delivery /data-delivery + """ + + def data_delivery_handler(): + mdc_logger.debug("data: {}".format(connexion.request.json)) + ei_job_result_json = connexion.request.json + mdc_logger.debug("jobid: {}".format(ei_job_result_json.get("job"))) + a1rmr.queue_ei_job_result((ei_job_result_json.get("job"), ei_job_result_json)) + return "", 200 + + return _try_func_return(data_delivery_handler)