A1 2.0.0:
[ric-plt/a1.git] / a1 / controller.py
index 92ff278..cde7483 100644 (file)
@@ -1,3 +1,6 @@
+"""
+Main a1 controller
+"""
 # ==================================================================================
 #       Copyright (c) 2019 Nokia
 #       Copyright (c) 2018-2019 AT&T Intellectual Property.
 #   limitations under the License.
 # ==================================================================================
 from flask import Response
-import connexion
-import json
+from jsonschema import validate
 from jsonschema.exceptions import ValidationError
-from a1 import get_module_logger
-from a1 import a1rmr, exceptions, utils, data
-
-
-logger = get_module_logger(__name__)
-
-
-def _get_policy_definition(policy_type_id):
-    # Currently we read the manifest on each call, which would seem to allow updating A1 in place. Revisit this?
-    manifest = utils.get_ric_manifest()
-    for m in manifest["controls"]:
-        if m["name"] == policy_type_id:
-            return m
-
-    raise exceptions.PolicyTypeNotFound()
+import connexion
+from mdclogpy import Logger
+from a1 import a1rmr, exceptions, data
 
 
-def _get_policy_schema(policy_type_id):
-    """
-    Get the needed info for a policy
-    """
-    m = _get_policy_definition(policy_type_id)
-    return m["message_receives_payload_schema"] if "message_receives_payload_schema" in m else None
+mdc_logger = Logger(name=__name__)
 
 
 def _try_func_return(func):
@@ -49,100 +34,30 @@ def _try_func_return(func):
     """
     try:
         return func()
-    except ValidationError as exc:
-        logger.exception(exc)
+    except (ValidationError, exceptions.PolicyTypeAlreadyExists, exceptions.CantDeleteNonEmptyType):
         return "", 400
-    except exceptions.PolicyTypeNotFound as exc:
-        logger.exception(exc)
+    except (exceptions.PolicyTypeNotFound, exceptions.PolicyInstanceNotFound):
         return "", 404
-    except exceptions.PolicyInstanceNotFound as exc:
-        logger.exception(exc)
-        return "", 404
-    except exceptions.MissingManifest as exc:
-        logger.exception(exc)
-        return "A1 was unable to find the required RIC manifest. report this!", 500
-    except exceptions.MissingRmrString as exc:
-        logger.exception(exc)
-        return "A1 does not have a mapping for the desired rmr string. report this!", 500
     except BaseException as exc:
         # catch all, should never happen...
-        logger.exception(exc)
+        mdc_logger.exception(exc)
         return Response(status=500)
 
 
-def _put_handler(policy_type_id, policy_instance_id, instance):
-    """
-    Handles policy put
-
-    For now, policy_type_id is used as the message type
-    """
-    # check for 404
-    data.type_is_valid(policy_type_id)
-
-    # validate the PUT against the schema, or if there is no shema, make sure the pUT is empty
-    schema = _get_policy_schema(policy_type_id)
-    if schema:
-        utils.validate_json(instance, schema)
-    elif instance != {}:
-        return "BODY SUPPLIED BUT POLICY HAS NO EXPECTED BODY", 400
-
-    # store the instance
-    data.store_policy_instance(policy_type_id, policy_instance_id, instance)
-
-    body = {
-        "operation": "CREATE",
-        "policy_type_id": policy_type_id,
-        "policy_instance_id": policy_instance_id,
-        "payload": instance,
-    }
-
-    # send rmr (best effort)
-    a1rmr.send(json.dumps(body), message_type=policy_type_id)
-
-    return "", 201
-
-
-def _get_status_handler(policy_type_id, policy_instance_id):
-    """
-    Pop trough A1s mailbox, insert the latest status updates into the database, and then return the status vector
-
-    NOTE: this is done lazily. Meaning, when someone performs a GET on this API, we pop through a1s mailbox.
-    THis may not work in the future if there are "thousands" of policy acknowledgements that hit a1 before this is called,
-    because the rmr mailbox may fill. However, in the near term, we do not expect this to happen.
-    """
-    # check validity to 404 first:
-    data.type_is_valid(policy_type_id)
-    data.instance_is_valid(policy_type_id, policy_instance_id)
-
-    # pop a1s mailbox, looking for policy notifications
-    new_messages = a1rmr.dequeue_all_waiting_messages(21024)
-
-    # try to parse the messages as responses. Drop those that are malformed
-    for msg in new_messages:
-        # note, we don't use the parameters "policy_type_id, policy_instance" from above here,
-        # because we are popping the whole mailbox, which might include other statuses
-        pay = json.loads(msg["payload"])
-        if "policy_type_id" in pay and "policy_instance_id" in pay and "handler_id" in pay and "status" in pay:
-            data.set_policy_instance_status(
-                pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
-            )
-        else:
-            logger.debug("Dropping message")
-            logger.debug(pay)
-
-    # return the status vector
-    return data.get_policy_instance_statuses(policy_type_id, policy_instance_id)
-
-
 # Healthcheck
 
 
 def get_healthcheck():
     """
     Handles healthcheck GET
-    Currently, this basically checks the server is alive.a1rmr
+    Currently, this checks:
+    1. whether the a1 webserver is up (if it isn't, this won't even be called, so even entering this function confirms it is)
+    2. checks whether the rmr thread is running and has completed a loop recently
+    TODO: make "seconds" to pass in a configurable parameter?
     """
-    return "", 200
+    if a1rmr.healthcheck_rmr_thread():
+        return "", 200
+    return "rmr thread is unhealthy", 500
 
 
 # Policy types
@@ -152,28 +67,39 @@ def get_all_policy_types():
     """
     Handles GET /a1-p/policytypes
     """
-    return "", 501
+    return _try_func_return(data.get_type_list)
 
 
 def create_policy_type(policy_type_id):
     """
     Handles PUT /a1-p/policytypes/policy_type_id
     """
-    return "", 501
+
+    def put_type_handler():
+        data.store_policy_type(policy_type_id, body)
+        return "", 201
+
+    body = connexion.request.json
+    return _try_func_return(put_type_handler)
 
 
 def get_policy_type(policy_type_id):
     """
     Handles GET /a1-p/policytypes/policy_type_id
     """
-    return "", 501
+    return _try_func_return(lambda: data.get_policy_type(policy_type_id))
 
 
 def delete_policy_type(policy_type_id):
     """
     Handles DELETE /a1-p/policytypes/policy_type_id
     """
-    return "", 501
+
+    def delete_policy_type_handler():
+        data.delete_policy_type(policy_type_id)
+        return "", 204
+
+    return _try_func_return(delete_policy_type_handler)
 
 
 # Policy instances
@@ -183,7 +109,7 @@ def get_all_instances_for_type(policy_type_id):
     """
     Handles GET /a1-p/policytypes/policy_type_id/policies
     """
-    return "", 501
+    return _try_func_return(lambda: data.get_instance_list(policy_type_id))
 
 
 def get_policy_instance(policy_type_id, policy_instance_id):
@@ -196,8 +122,14 @@ def get_policy_instance(policy_type_id, policy_instance_id):
 def get_policy_instance_status(policy_type_id, policy_instance_id):
     """
     Handles GET /a1-p/policytypes/polidyid/policies/policy_instance_id/status
+
+    Return the aggregated status. The order of rules is as follows:
+        1. If a1 has received at least one status, and *all* received statuses are "DELETED", we blow away the instance and return a 404
+        2. if a1 has received at least one status and at least one is OK, we return "IN EFFECT"
+        3. "NOT IN EFFECT" otherwise (no statuses, or none are OK but not all are deleted)
     """
-    return _try_func_return(lambda: _get_status_handler(policy_type_id, policy_instance_id))
+
+    return _try_func_return(lambda: data.get_policy_instance_status(policy_type_id, policy_instance_id))
 
 
 def create_or_replace_policy_instance(policy_type_id, policy_instance_id):
@@ -205,11 +137,43 @@ def create_or_replace_policy_instance(policy_type_id, policy_instance_id):
     Handles PUT /a1-p/policytypes/polidyid/policies/policy_instance_id
     """
     instance = connexion.request.json
-    return _try_func_return(lambda: _put_handler(policy_type_id, policy_instance_id, instance))
+
+    def put_instance_handler():
+        """
+        Handles policy instance put
+
+        For now, policy_type_id is used as the message type
+        """
+        #  validate the PUT against the schema
+        schema = data.get_policy_type(policy_type_id)["create_schema"]
+        validate(instance=instance, schema=schema)
+
+        # store the instance
+        data.store_policy_instance(policy_type_id, policy_instance_id, instance)
+
+        # queue rmr send (best effort)
+        a1rmr.queue_instance_send(("CREATE", policy_type_id, policy_instance_id, instance))
+
+        return "", 202
+
+    return _try_func_return(put_instance_handler)
 
 
 def delete_policy_instance(policy_type_id, policy_instance_id):
     """
     Handles DELETE /a1-p/policytypes/polidyid/policies/policy_instance_id
     """
-    return "", 501
+
+    def delete_instance_handler():
+        """
+        here we send out the DELETEs but we don't delete the instance until a GET is called where we check the statuses
+        We also set the status as deleted which would be reflected in a GET to ../status (before the DELETE completes)
+        """
+        data.delete_policy_instance(policy_type_id, policy_instance_id)
+
+        # queue rmr send (best effort)
+        a1rmr.queue_instance_send(("DELETE", policy_type_id, policy_instance_id, ""))
+
+        return "", 202
+
+    return _try_func_return(delete_instance_handler)