+"""
+Main a1 controller
+"""
# ==================================================================================
# Copyright (c) 2019 Nokia
# Copyright (c) 2018-2019 AT&T Intellectual Property.
# limitations under the License.
# ==================================================================================
from flask import Response
+from jsonschema import validate
+from jsonschema.exceptions import ValidationError
import connexion
-import json
-from a1 import get_module_logger
-from a1 import a1rmr, exceptions, utils
+from mdclogpy import Logger
+from a1 import a1rmr, exceptions, data
-logger = get_module_logger(__name__)
-
-
-def _get_needed_policy_info(policyname):
- """
- Get the needed info for a policy
- """
- # Currently we read the manifest on each call, which would seem to allow updating A1 in place. Revisit this?
- manifest = utils.get_ric_manifest()
- for m in manifest["controls"]:
- if m["name"] == policyname:
- schema = m["message_receives_payload_schema"] if "message_receives_payload_schema" in m else None
- return (
- utils.rmr_string_to_int(m["message_receives_rmr_type"]),
- schema,
- utils.rmr_string_to_int(m["message_sends_rmr_type"]),
- )
- raise exceptions.PolicyNotFound()
+mdc_logger = Logger(name=__name__)
def _try_func_return(func):
"""
try:
return func()
- except exceptions.PolicyNotFound as exc:
- logger.exception(exc)
+ except (ValidationError, exceptions.PolicyTypeAlreadyExists, exceptions.CantDeleteNonEmptyType):
+ return "", 400
+ except (exceptions.PolicyTypeNotFound, exceptions.PolicyInstanceNotFound):
return "", 404
- except exceptions.MissingManifest as exc:
- logger.exception(exc)
- return "A1 was unable to find the required RIC manifest. report this!", 500
- except exceptions.MissingRmrString as exc:
- logger.exception(exc)
- return "A1 does not have a mapping for the desired rmr string. report this!", 500
- except exceptions.MessageSendFailure as exc:
- logger.exception(exc)
- return "A1 was unable to send a needed message to a downstream subscriber", 504
- except exceptions.ExpectedAckNotReceived as exc:
- logger.exception(exc)
- return "A1 was expecting an ACK back but it didn't receive one or didn't recieve the expected ACK", 504
except BaseException as exc:
# catch all, should never happen...
- logger.exception(exc)
+ mdc_logger.exception(exc)
return Response(status=500)
-def _put_handler(policyname, data):
+# Healthcheck
+
+
+def get_healthcheck():
"""
- Handles policy put
+ Handles healthcheck GET
+ Currently, this checks:
+ 1. whether the a1 webserver is up (if it isn't, this won't even be called, so even entering this function confirms it is)
+ 2. checks whether the rmr thread is running and has completed a loop recently
+ TODO: make "seconds" to pass in a configurable parameter?
"""
+ if a1rmr.healthcheck_rmr_thread():
+ return "", 200
+ return "rmr thread is unhealthy", 500
- mtype_send, schema, mtype_return = _get_needed_policy_info(policyname)
- # validate the PUT against the schema, or if there is no shema, make sure the pUT is empty
- if schema:
- utils.validate_json(data, schema)
- elif data != {}:
- return "BODY SUPPLIED BUT POLICY HAS NO EXPECTED BODY", 400
+# Policy types
- # send rmr, wait for ACK
- return_payload = a1rmr.send_ack_retry(json.dumps(data), message_type=mtype_send, expected_ack_message_type=mtype_return)
- # right now it is assumed that xapps respond with JSON payloads
- # it is further assumed that they include a field "status" and that the value "SUCCESS" indicates a good policy change
- try:
- rpj = json.loads(return_payload)
- return (rpj, 200) if rpj["status"] == "SUCCESS" else ({"reason": "BAD STATUS", "return_payload": rpj}, 502)
- except json.decoder.JSONDecodeError:
- return {"reason": "NOT JSON", "return_payload": return_payload}, 502
- except KeyError:
- return {"reason": "NO STATUS", "return_payload": rpj}, 502
+def get_all_policy_types():
+ """
+ Handles GET /a1-p/policytypes
+ """
+ return _try_func_return(data.get_type_list)
+
+
+def create_policy_type(policy_type_id):
+ """
+ Handles PUT /a1-p/policytypes/policy_type_id
+ """
+
+ def put_type_handler():
+ data.store_policy_type(policy_type_id, body)
+ return "", 201
+
+ body = connexion.request.json
+ return _try_func_return(put_type_handler)
+
+
+def get_policy_type(policy_type_id):
+ """
+ Handles GET /a1-p/policytypes/policy_type_id
+ """
+ return _try_func_return(lambda: data.get_policy_type(policy_type_id))
+
+
+def delete_policy_type(policy_type_id):
+ """
+ Handles DELETE /a1-p/policytypes/policy_type_id
+ """
+
+ def delete_policy_type_handler():
+ data.delete_policy_type(policy_type_id)
+ return "", 204
+
+ return _try_func_return(delete_policy_type_handler)
-# Public
+# Policy instances
-def put_handler(policyname):
+def get_all_instances_for_type(policy_type_id):
"""
- Handles policy replacement
+ Handles GET /a1-p/policytypes/policy_type_id/policies
"""
- data = connexion.request.json
- return _try_func_return(lambda: _put_handler(policyname, data))
+ return _try_func_return(lambda: data.get_instance_list(policy_type_id))
-def get_handler(policyname):
+def get_policy_instance(policy_type_id, policy_instance_id):
"""
- Handles policy GET
+ Handles GET /a1-p/policytypes/polidyid/policies/policy_instance_id
"""
- return "", 501
+ return _try_func_return(lambda: data.get_policy_instance(policy_type_id, policy_instance_id))
+
+
+def get_policy_instance_status(policy_type_id, policy_instance_id):
+ """
+ Handles GET /a1-p/policytypes/polidyid/policies/policy_instance_id/status
+
+ Return the aggregated status. The order of rules is as follows:
+ 1. If a1 has received at least one status, and *all* received statuses are "DELETED", we blow away the instance and return a 404
+ 2. if a1 has received at least one status and at least one is OK, we return "IN EFFECT"
+ 3. "NOT IN EFFECT" otherwise (no statuses, or none are OK but not all are deleted)
+ """
+
+ return _try_func_return(lambda: data.get_policy_instance_status(policy_type_id, policy_instance_id))
+
+
+def create_or_replace_policy_instance(policy_type_id, policy_instance_id):
+ """
+ Handles PUT /a1-p/policytypes/polidyid/policies/policy_instance_id
+ """
+ instance = connexion.request.json
+
+ def put_instance_handler():
+ """
+ Handles policy instance put
+
+ For now, policy_type_id is used as the message type
+ """
+ # validate the PUT against the schema
+ schema = data.get_policy_type(policy_type_id)["create_schema"]
+ validate(instance=instance, schema=schema)
+
+ # store the instance
+ data.store_policy_instance(policy_type_id, policy_instance_id, instance)
+
+ # queue rmr send (best effort)
+ a1rmr.queue_instance_send(("CREATE", policy_type_id, policy_instance_id, instance))
+
+ return "", 202
+
+ return _try_func_return(put_instance_handler)
+
+
+def delete_policy_instance(policy_type_id, policy_instance_id):
+ """
+ Handles DELETE /a1-p/policytypes/polidyid/policies/policy_instance_id
+ """
+
+ def delete_instance_handler():
+ """
+ here we send out the DELETEs but we don't delete the instance until a GET is called where we check the statuses
+ We also set the status as deleted which would be reflected in a GET to ../status (before the DELETE completes)
+ """
+ data.delete_policy_instance(policy_type_id, policy_instance_id)
+
+ # queue rmr send (best effort)
+ a1rmr.queue_instance_send(("DELETE", policy_type_id, policy_instance_id, ""))
+
+ return "", 202
+
+ return _try_func_return(delete_instance_handler)