Towards a1 1.0.0: rmr improvements
[ric-plt/a1.git] / a1 / controller.py
index 9fd36d5..7db18a0 100644 (file)
@@ -1,3 +1,6 @@
+"""
+Main a1 controller
+"""
 # ==================================================================================
 #       Copyright (c) 2019 Nokia
 #       Copyright (c) 2018-2019 AT&T Intellectual Property.
@@ -17,8 +20,8 @@
 import json
 from flask import Response
 from jsonschema import validate
-import connexion
 from jsonschema.exceptions import ValidationError
+import connexion
 from a1 import get_module_logger
 from a1 import a1rmr, exceptions, data
 
@@ -32,30 +35,30 @@ def _try_func_return(func):
     """
     try:
         return func()
-    except ValidationError as exc:
+    except (ValidationError, exceptions.PolicyTypeAlreadyExists) as exc:
         logger.exception(exc)
         return "", 400
-    except exceptions.PolicyTypeAlreadyExists as exc:
-        logger.exception(exc)
-        return "", 400
-    except exceptions.PolicyTypeNotFound as exc:
-        logger.exception(exc)
-        return "", 404
-    except exceptions.PolicyInstanceNotFound as exc:
+    except (exceptions.PolicyTypeNotFound, exceptions.PolicyInstanceNotFound) as exc:
         logger.exception(exc)
         return "", 404
-    except exceptions.MissingManifest as exc:
-        logger.exception(exc)
-        return "A1 was unable to find the required RIC manifest. report this!", 500
-    except exceptions.MissingRmrString as exc:
-        logger.exception(exc)
-        return "A1 does not have a mapping for the desired rmr string. report this!", 500
     except BaseException as exc:
         # catch all, should never happen...
         logger.exception(exc)
         return Response(status=500)
 
 
+def _gen_body_to_handler(operation, policy_type_id, policy_instance_id, payload=None):
+    """
+    used to create the payloads that get sent to downstream policy handlers
+    """
+    return {
+        "operation": operation,
+        "policy_type_id": policy_type_id,
+        "policy_instance_id": policy_instance_id,
+        "payload": payload,
+    }
+
+
 # Healthcheck
 
 
@@ -74,7 +77,7 @@ def get_all_policy_types():
     """
     Handles GET /a1-p/policytypes
     """
-    return "", 501
+    return _try_func_return(data.get_type_list)
 
 
 def create_policy_type(policy_type_id):
@@ -82,12 +85,12 @@ def create_policy_type(policy_type_id):
     Handles PUT /a1-p/policytypes/policy_type_id
     """
 
-    def _put_type_handler(policy_type_id, body):
+    def put_type_handler():
         data.store_policy_type(policy_type_id, body)
         return "", 201
 
     body = connexion.request.json
-    return _try_func_return(lambda: _put_type_handler(policy_type_id, body))
+    return _try_func_return(put_type_handler)
 
 
 def get_policy_type(policy_type_id):
@@ -101,6 +104,7 @@ def delete_policy_type(policy_type_id):
     """
     Handles DELETE /a1-p/policytypes/policy_type_id
     """
+    logger.error(policy_type_id)
     return "", 501
 
 
@@ -111,62 +115,63 @@ def get_all_instances_for_type(policy_type_id):
     """
     Handles GET /a1-p/policytypes/policy_type_id/policies
     """
-    return "", 501
+
+    def get_all_instance_handler():
+        # try to clean up instances for this type
+        for policy_instance_id in data.get_instance_list(policy_type_id):
+            data.delete_policy_instance_if_applicable(policy_type_id, policy_instance_id)
+
+        # re-fetch this list as it may have changed
+        return data.get_instance_list(policy_type_id), 200
+
+    return _try_func_return(get_all_instance_handler)
 
 
 def get_policy_instance(policy_type_id, policy_instance_id):
     """
     Handles GET /a1-p/policytypes/polidyid/policies/policy_instance_id
     """
-    # 200 is automatic here
-    return _try_func_return(lambda: data.get_policy_instance(policy_type_id, policy_instance_id))
+
+    def get_instance_handler():
+        # delete if applicable (will raise if not applicable to begin with)
+        data.delete_policy_instance_if_applicable(policy_type_id, policy_instance_id)
+
+        # raise 404 now that we may have deleted, or get the instance otherwise
+        return data.get_policy_instance(policy_type_id, policy_instance_id), 200
+
+    return _try_func_return(get_instance_handler)
 
 
 def get_policy_instance_status(policy_type_id, policy_instance_id):
     """
     Handles GET /a1-p/policytypes/polidyid/policies/policy_instance_id/status
-    """
-
-    def _get_status_handler(policy_type_id, policy_instance_id):
-        """
-        Pop trough A1s mailbox, insert the latest status updates into the database, and then return the status vector
-
-        NOTE: this is done lazily. Meaning, when someone performs a GET on this API, we pop through a1s mailbox.
-        THis may not work in the future if there are "thousands" of policy acknowledgements that hit a1 before this is called,
-        because the rmr mailbox may fill. However, in the near term, we do not expect this to happen.
-        """
-        # check validity to 404 first:
-        data.type_is_valid(policy_type_id)
-        data.instance_is_valid(policy_type_id, policy_instance_id)
 
-        # pop a1s mailbox, looking for policy notifications
-        new_messages = a1rmr.dequeue_all_waiting_messages(21024)
+    Return the aggregated status. The order of rules is as follows:
+        1. If a1 has received at least one status, and *all* received statuses are "DELETED", we blow away the instance and return a 404
+        2. if a1 has received at least one status and at least one is OK, we return "IN EFFECT"
+        3. "NOT IN EFFECT" otherwise (no statuses, or none are OK but not all are deleted)
+    """
 
-        # try to parse the messages as responses. Drop those that are malformed
-        for msg in new_messages:
-            # note, we don't use the parameters "policy_type_id, policy_instance" from above here,
-            # because we are popping the whole mailbox, which might include other statuses
-            pay = json.loads(msg["payload"])
-            if "policy_type_id" in pay and "policy_instance_id" in pay and "handler_id" in pay and "status" in pay:
-                data.set_policy_instance_status(
-                    pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
-                )
-            else:
-                logger.debug("Dropping message")
-                logger.debug(pay)
+    def get_status_handler():
+        # delete if applicable (will raise if not applicable to begin with)
+        data.delete_policy_instance_if_applicable(policy_type_id, policy_instance_id)
 
-        # return the status vector
-        return data.get_policy_instance_statuses(policy_type_id, policy_instance_id)
+        vector = data.get_policy_instance_statuses(policy_type_id, policy_instance_id)
+        for i in vector:
+            if i == "OK":
+                return "IN EFFECT", 200
+        return "NOT IN EFFECT", 200
 
-    return _try_func_return(lambda: _get_status_handler(policy_type_id, policy_instance_id))
+    return _try_func_return(get_status_handler)
 
 
 def create_or_replace_policy_instance(policy_type_id, policy_instance_id):
     """
     Handles PUT /a1-p/policytypes/polidyid/policies/policy_instance_id
     """
+    instance = connexion.request.json
 
-    def _put_instance_handler(policy_type_id, policy_instance_id, instance):
+    def put_instance_handler():
         """
         Handles policy instance put
 
@@ -179,24 +184,28 @@ def create_or_replace_policy_instance(policy_type_id, policy_instance_id):
         # store the instance
         data.store_policy_instance(policy_type_id, policy_instance_id, instance)
 
-        body = {
-            "operation": "CREATE",
-            "policy_type_id": policy_type_id,
-            "policy_instance_id": policy_instance_id,
-            "payload": instance,
-        }
-
         # send rmr (best effort)
+        body = _gen_body_to_handler("CREATE", policy_type_id, policy_instance_id, payload=instance)
         a1rmr.send(json.dumps(body), message_type=policy_type_id)
 
-        return "", 201
+        return "", 202
 
-    instance = connexion.request.json
-    return _try_func_return(lambda: _put_instance_handler(policy_type_id, policy_instance_id, instance))
+    return _try_func_return(put_instance_handler)
 
 
 def delete_policy_instance(policy_type_id, policy_instance_id):
     """
     Handles DELETE /a1-p/policytypes/polidyid/policies/policy_instance_id
     """
-    return "", 501
+
+    def delete_instance_handler():
+        """
+        here we send out the DELETEs but we don't delete the instance until a GET is called where we check the statuses
+        """
+        # send rmr (best effort)
+        body = _gen_body_to_handler("DELETE", policy_type_id, policy_instance_id)
+        a1rmr.send(json.dumps(body), message_type=policy_type_id)
+
+        return "", 202
+
+    return _try_func_return(delete_instance_handler)