Add sdl healthcheck to a1's healthcheck
[ric-plt/a1.git] / a1 / controller.py
index e6e093f..4210266 100644 (file)
@@ -1,6 +1,9 @@
+"""
+Main a1 controller
+"""
 # ==================================================================================
-#       Copyright (c) 2019 Nokia
-#       Copyright (c) 2018-2019 AT&T Intellectual Property.
+#       Copyright (c) 2019-2020 Nokia
+#       Copyright (c) 2018-2020 AT&T Intellectual Property.
 #
 #   Licensed under the Apache License, Version 2.0 (the "License");
 #   you may not use this file except in compliance with the License.
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 # ==================================================================================
-from flask import Response
-import connexion
-import json
+from jsonschema import validate
 from jsonschema.exceptions import ValidationError
-from a1 import get_module_logger
-from a1 import a1rmr, exceptions, utils
+import connexion
+from mdclogpy import Logger
+from ricsdl.exceptions import RejectedByBackend, NotConnected, BackendError
+from a1 import a1rmr, exceptions, data
+
+
+mdc_logger = Logger(name=__name__)
+
+
+def _try_func_return(func):
+    """
+    generic caller that returns the apporp http response if exceptions are raised
+    """
+    try:
+        return func()
+    except (ValidationError, exceptions.PolicyTypeAlreadyExists, exceptions.CantDeleteNonEmptyType):
+        return "", 400
+    except (exceptions.PolicyTypeNotFound, exceptions.PolicyInstanceNotFound):
+        return "", 404
+    except (RejectedByBackend, NotConnected, BackendError):
+        """
+        These are SDL errors. At the time of development here, we do not have a good understanding which of these errors are "try again later it may work"
+        and which are "never going to work". There is some discussion that RejectedByBackend is in the latter category, suggesting it should map to 400,
+        but until we understand the root cause of these errors, it's confusing to clients to give them a 400 (a "your fault" code) because they won't know how to fix
+        For now, we log, and 503, and investigate the logs later to improve the handling/reporting.
+        """
+        # mdc_logger.exception(exc)  # waiting for https://jira.o-ran-sc.org/browse/RIC-39
+        return "", 503
+
+    # let other types of unexpected exceptions blow up and log
+
+
+# Healthcheck
 
 
-logger = get_module_logger(__name__)
+def get_healthcheck():
+    """
+    Handles healthcheck GET
+    Currently, this checks:
+    1. whether the a1 webserver is up (if it isn't, this won't even be called, so even entering this function confirms it is)
+    2. checks whether the rmr thread is running and has completed a loop recently
+    3. checks that our SDL connection is healthy
+    """
+    if not a1rmr.healthcheck_rmr_thread():
+        return "rmr thread is unhealthy", 500
+    if not data.SDL.healthcheck():
+        return "sdl connection is unhealthy", 500
+    return "", 200
 
 
-def _get_policy_definition(policyname):
-    # Currently we read the manifest on each call, which would seem to allow updating A1 in place. Revisit this?
-    manifest = utils.get_ric_manifest()
-    for m in manifest["controls"]:
-        if m["name"] == policyname:
-            return m
-    raise exceptions.PolicyNotFound()
+# Policy types
 
 
-def _get_needed_policy_info(policyname):
+def get_all_policy_types():
     """
-    Get the needed info for a policy
+    Handles GET /a1-p/policytypes
     """
-    m = _get_policy_definition(policyname)
-    return (
-        utils.rmr_string_to_int(m["message_receives_rmr_type"]),
-        m["message_receives_payload_schema"] if "message_receives_payload_schema" in m else None,
-        utils.rmr_string_to_int(m["message_sends_rmr_type"]),
-    )
+    return _try_func_return(data.get_type_list)
 
 
-def _get_needed_policy_fetch_info(policyname):
+def create_policy_type(policy_type_id):
     """
-    Get the needed info for fetching a policy state
+    Handles PUT /a1-p/policytypes/policy_type_id
     """
-    m = _get_policy_definition(policyname)
-    req_k = "control_state_request_rmr_type"
-    ack_k = "control_state_request_reply_rmr_type"
-    return (
-        utils.rmr_string_to_int(m[req_k]) if req_k in m else None,
-        utils.rmr_string_to_int(m[ack_k]) if ack_k in m else None,
-    )
 
+    def put_type_handler():
+        data.store_policy_type(policy_type_id, body)
+        return "", 201
 
-def _try_func_return(func):
+    body = connexion.request.json
+    return _try_func_return(put_type_handler)
+
+
+def get_policy_type(policy_type_id):
     """
-    generic caller that returns the apporp http response if exceptions are raised
+    Handles GET /a1-p/policytypes/policy_type_id
     """
-    try:
-        return func()
-    except ValidationError as exc:
-        logger.exception(exc)
-        return "", 400
-    except exceptions.PolicyNotFound as exc:
-        logger.exception(exc)
-        return "", 404
-    except exceptions.MissingManifest as exc:
-        logger.exception(exc)
-        return "A1 was unable to find the required RIC manifest. report this!", 500
-    except exceptions.MissingRmrString as exc:
-        logger.exception(exc)
-        return "A1 does not have a mapping for the desired rmr string. report this!", 500
-    except exceptions.MessageSendFailure as exc:
-        logger.exception(exc)
-        return "A1 was unable to send a needed message to a downstream subscriber", 504
-    except exceptions.ExpectedAckNotReceived as exc:
-        logger.exception(exc)
-        return "A1 was expecting an ACK back but it didn't receive one or didn't recieve the expected ACK", 504
-    except BaseException as exc:
-        # catch all, should never happen...
-        logger.exception(exc)
-        return Response(status=500)
-
-
-def _put_handler(policyname, data):
-    """
-    Handles policy put
-    """
-
-    mtype_send, schema, mtype_return = _get_needed_policy_info(policyname)
-
-    # validate the PUT against the schema, or if there is no shema, make sure the pUT is empty
-    if schema:
-        utils.validate_json(data, schema)
-    elif data != {}:
-        return "BODY SUPPLIED BUT POLICY HAS NO EXPECTED BODY", 400
-
-    # send rmr, wait for ACK
-    return_payload = a1rmr.send_ack_retry(json.dumps(data), message_type=mtype_send, expected_ack_message_type=mtype_return)
-
-    # right now it is assumed that xapps respond with JSON payloads
-    # it is further assumed that they include a field "status" and that the value "SUCCESS" indicates a good policy change
-    try:
-        rpj = json.loads(return_payload)
-        return (rpj, 200) if rpj["status"] == "SUCCESS" else ({"reason": "BAD STATUS", "return_payload": rpj}, 502)
-    except json.decoder.JSONDecodeError:
-        return {"reason": "NOT JSON", "return_payload": return_payload}, 502
-    except KeyError:
-        return {"reason": "NO STATUS", "return_payload": rpj}, 502
+    return _try_func_return(lambda: data.get_policy_type(policy_type_id))
 
 
-def _get_handler(policyname):
+def delete_policy_type(policy_type_id):
     """
-    Handles policy GET
+    Handles DELETE /a1-p/policytypes/policy_type_id
     """
-    mtype_send, mtype_return = _get_needed_policy_fetch_info(policyname)
 
-    if not (mtype_send and mtype_return):
-        return "POLICY DOES NOT SUPPORT FETCHING", 400
+    def delete_policy_type_handler():
+        data.delete_policy_type(policy_type_id)
+        return "", 204
 
-    # send rmr, wait for ACK
-    return_payload = a1rmr.send_ack_retry("", message_type=mtype_send, expected_ack_message_type=mtype_return)
+    return _try_func_return(delete_policy_type_handler)
 
-    # right now it is assumed that xapps respond with JSON payloads
-    try:
-        return (json.loads(return_payload), 200)
-    except json.decoder.JSONDecodeError:
-        return {"reason": "NOT JSON", "return_payload": return_payload}, 502
 
+# Policy instances
 
-# Public
+
+def get_all_instances_for_type(policy_type_id):
+    """
+    Handles GET /a1-p/policytypes/policy_type_id/policies
+    """
+    return _try_func_return(lambda: data.get_instance_list(policy_type_id))
 
 
-def put_handler(policyname):
+def get_policy_instance(policy_type_id, policy_instance_id):
     """
-    Handles policy replacement
+    Handles GET /a1-p/policytypes/polidyid/policies/policy_instance_id
     """
-    data = connexion.request.json
-    return _try_func_return(lambda: _put_handler(policyname, data))
+    return _try_func_return(lambda: data.get_policy_instance(policy_type_id, policy_instance_id))
 
 
-def get_handler(policyname):
+def get_policy_instance_status(policy_type_id, policy_instance_id):
     """
-    Handles policy GET
+    Handles GET /a1-p/policytypes/polidyid/policies/policy_instance_id/status
+
+    Return the aggregated status. The order of rules is as follows:
+        1. If a1 has received at least one status, and *all* received statuses are "DELETED", we blow away the instance and return a 404
+        2. if a1 has received at least one status and at least one is OK, we return "IN EFFECT"
+        3. "NOT IN EFFECT" otherwise (no statuses, or none are OK but not all are deleted)
     """
-    return _try_func_return(lambda: _get_handler(policyname))
+
+    return _try_func_return(lambda: data.get_policy_instance_status(policy_type_id, policy_instance_id))
 
 
-def healthcheck_handler():
+def create_or_replace_policy_instance(policy_type_id, policy_instance_id):
     """
-    Handles healthcheck GET
-    Currently, this basically checks the server is alive.a1rmr
+    Handles PUT /a1-p/policytypes/polidyid/policies/policy_instance_id
     """
-    return "", 200
+    instance = connexion.request.json
+
+    def put_instance_handler():
+        """
+        Handles policy instance put
+
+        For now, policy_type_id is used as the message type
+        """
+        #  validate the PUT against the schema
+        schema = data.get_policy_type(policy_type_id)["create_schema"]
+        validate(instance=instance, schema=schema)
+
+        # store the instance
+        data.store_policy_instance(policy_type_id, policy_instance_id, instance)
+
+        # queue rmr send (best effort)
+        a1rmr.queue_instance_send(("CREATE", policy_type_id, policy_instance_id, instance))
+
+        return "", 202
+
+    return _try_func_return(put_instance_handler)
+
+
+def delete_policy_instance(policy_type_id, policy_instance_id):
+    """
+    Handles DELETE /a1-p/policytypes/polidyid/policies/policy_instance_id
+    """
+
+    def delete_instance_handler():
+        data.delete_policy_instance(policy_type_id, policy_instance_id)
+
+        # queue rmr send (best effort)
+        a1rmr.queue_instance_send(("DELETE", policy_type_id, policy_instance_id, ""))
+
+        return "", 202
+
+    return _try_func_return(delete_instance_handler)