Towards release 1.0.0 (for O-RAN A)
[ric-plt/a1.git] / a1 / controller.py
index ca9618b..92ff278 100644 (file)
 from flask import Response
 import connexion
 import json
+from jsonschema.exceptions import ValidationError
 from a1 import get_module_logger
-from a1 import a1rmr, exceptions, utils
+from a1 import a1rmr, exceptions, utils, data
 
 
 logger = get_module_logger(__name__)
 
 
-def _get_needed_policy_info(policyname):
-    """
-    Get the needed info for a policy
-    """
+def _get_policy_definition(policy_type_id):
     # Currently we read the manifest on each call, which would seem to allow updating A1 in place. Revisit this?
     manifest = utils.get_ric_manifest()
     for m in manifest["controls"]:
-        if m["name"] == policyname:
-            schema = m["message_receives_payload_schema"] if "message_receives_payload_schema" in m else None
-            return (
-                utils.rmr_string_to_int(m["message_receives_rmr_type"]),
-                schema,
-                utils.rmr_string_to_int(m["message_sends_rmr_type"]),
-            )
-    raise exceptions.PolicyNotFound()
+        if m["name"] == policy_type_id:
+            return m
+
+    raise exceptions.PolicyTypeNotFound()
+
+
+def _get_policy_schema(policy_type_id):
+    """
+    Get the needed info for a policy
+    """
+    m = _get_policy_definition(policy_type_id)
+    return m["message_receives_payload_schema"] if "message_receives_payload_schema" in m else None
 
 
 def _try_func_return(func):
@@ -47,7 +49,13 @@ def _try_func_return(func):
     """
     try:
         return func()
-    except exceptions.PolicyNotFound as exc:
+    except ValidationError as exc:
+        logger.exception(exc)
+        return "", 400
+    except exceptions.PolicyTypeNotFound as exc:
+        logger.exception(exc)
+        return "", 404
+    except exceptions.PolicyInstanceNotFound as exc:
         logger.exception(exc)
         return "", 404
     except exceptions.MissingManifest as exc:
@@ -56,58 +64,152 @@ def _try_func_return(func):
     except exceptions.MissingRmrString as exc:
         logger.exception(exc)
         return "A1 does not have a mapping for the desired rmr string. report this!", 500
-    except exceptions.MessageSendFailure as exc:
-        logger.exception(exc)
-        return "A1 was unable to send a needed message to a downstream subscriber", 504
-    except exceptions.ExpectedAckNotReceived as exc:
-        logger.exception(exc)
-        return "A1 was expecting an ACK back but it didn't receive one or didn't recieve the expected ACK", 504
     except BaseException as exc:
         # catch all, should never happen...
         logger.exception(exc)
         return Response(status=500)
 
 
-def _put_handler(policyname, data):
+def _put_handler(policy_type_id, policy_instance_id, instance):
     """
     Handles policy put
-    """
 
-    mtype_send, schema, mtype_return = _get_needed_policy_info(policyname)
+    For now, policy_type_id is used as the message type
+    """
+    # check for 404
+    data.type_is_valid(policy_type_id)
 
     # validate the PUT against the schema, or if there is no shema, make sure the pUT is empty
+    schema = _get_policy_schema(policy_type_id)
     if schema:
-        utils.validate_json(data, schema)
-    elif data != {}:
+        utils.validate_json(instance, schema)
+    elif instance != {}:
         return "BODY SUPPLIED BUT POLICY HAS NO EXPECTED BODY", 400
 
-    # send rmr, wait for ACK
-    return_payload = a1rmr.send_ack_retry(json.dumps(data), message_type=mtype_send, expected_ack_message_type=mtype_return)
+    # store the instance
+    data.store_policy_instance(policy_type_id, policy_instance_id, instance)
+
+    body = {
+        "operation": "CREATE",
+        "policy_type_id": policy_type_id,
+        "policy_instance_id": policy_instance_id,
+        "payload": instance,
+    }
+
+    # send rmr (best effort)
+    a1rmr.send(json.dumps(body), message_type=policy_type_id)
+
+    return "", 201
+
+
+def _get_status_handler(policy_type_id, policy_instance_id):
+    """
+    Pop trough A1s mailbox, insert the latest status updates into the database, and then return the status vector
+
+    NOTE: this is done lazily. Meaning, when someone performs a GET on this API, we pop through a1s mailbox.
+    THis may not work in the future if there are "thousands" of policy acknowledgements that hit a1 before this is called,
+    because the rmr mailbox may fill. However, in the near term, we do not expect this to happen.
+    """
+    # check validity to 404 first:
+    data.type_is_valid(policy_type_id)
+    data.instance_is_valid(policy_type_id, policy_instance_id)
+
+    # pop a1s mailbox, looking for policy notifications
+    new_messages = a1rmr.dequeue_all_waiting_messages(21024)
+
+    # try to parse the messages as responses. Drop those that are malformed
+    for msg in new_messages:
+        # note, we don't use the parameters "policy_type_id, policy_instance" from above here,
+        # because we are popping the whole mailbox, which might include other statuses
+        pay = json.loads(msg["payload"])
+        if "policy_type_id" in pay and "policy_instance_id" in pay and "handler_id" in pay and "status" in pay:
+            data.set_policy_instance_status(
+                pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
+            )
+        else:
+            logger.debug("Dropping message")
+            logger.debug(pay)
+
+    # return the status vector
+    return data.get_policy_instance_statuses(policy_type_id, policy_instance_id)
+
+
+# Healthcheck
+
+
+def get_healthcheck():
+    """
+    Handles healthcheck GET
+    Currently, this basically checks the server is alive.a1rmr
+    """
+    return "", 200
 
-    # right now it is assumed that xapps respond with JSON payloads
-    # it is further assumed that they include a field "status" and that the value "SUCCESS" indicates a good policy change
-    try:
-        rpj = json.loads(return_payload)
-        return (rpj, 200) if rpj["status"] == "SUCCESS" else ({"reason": "BAD STATUS", "return_payload": rpj}, 502)
-    except json.decoder.JSONDecodeError:
-        return {"reason": "NOT JSON", "return_payload": return_payload}, 502
-    except KeyError:
-        return {"reason": "NO STATUS", "return_payload": rpj}, 502
 
+# Policy types
 
-# Public
+
+def get_all_policy_types():
+    """
+    Handles GET /a1-p/policytypes
+    """
+    return "", 501
+
+
+def create_policy_type(policy_type_id):
+    """
+    Handles PUT /a1-p/policytypes/policy_type_id
+    """
+    return "", 501
+
+
+def get_policy_type(policy_type_id):
+    """
+    Handles GET /a1-p/policytypes/policy_type_id
+    """
+    return "", 501
+
+
+def delete_policy_type(policy_type_id):
+    """
+    Handles DELETE /a1-p/policytypes/policy_type_id
+    """
+    return "", 501
+
+
+# Policy instances
+
+
+def get_all_instances_for_type(policy_type_id):
+    """
+    Handles GET /a1-p/policytypes/policy_type_id/policies
+    """
+    return "", 501
+
+
+def get_policy_instance(policy_type_id, policy_instance_id):
+    """
+    Handles GET /a1-p/policytypes/polidyid/policies/policy_instance_id
+    """
+    return _try_func_return(lambda: data.get_policy_instance(policy_type_id, policy_instance_id))
+
+
+def get_policy_instance_status(policy_type_id, policy_instance_id):
+    """
+    Handles GET /a1-p/policytypes/polidyid/policies/policy_instance_id/status
+    """
+    return _try_func_return(lambda: _get_status_handler(policy_type_id, policy_instance_id))
 
 
-def put_handler(policyname):
+def create_or_replace_policy_instance(policy_type_id, policy_instance_id):
     """
-    Handles policy replacement
+    Handles PUT /a1-p/policytypes/polidyid/policies/policy_instance_id
     """
-    data = connexion.request.json
-    return _try_func_return(lambda: _put_handler(policyname, data))
+    instance = connexion.request.json
+    return _try_func_return(lambda: _put_handler(policy_type_id, policy_instance_id, instance))
 
 
-def get_handler(policyname):
+def delete_policy_instance(policy_type_id, policy_instance_id):
     """
-    Handles policy GET
+    Handles DELETE /a1-p/policytypes/polidyid/policies/policy_instance_id
     """
     return "", 501