+"""
+Main a1 controller
+"""
# ==================================================================================
# Copyright (c) 2019 Nokia
# Copyright (c) 2018-2019 AT&T Intellectual Property.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==================================================================================
-from flask import Response
-import connexion
import json
+from flask import Response
+from jsonschema import validate
from jsonschema.exceptions import ValidationError
+import connexion
from a1 import get_module_logger
-from a1 import a1rmr, exceptions, utils, data
+from a1 import a1rmr, exceptions, data
logger = get_module_logger(__name__)
-def _get_policy_definition(policy_type_id):
- # Currently we read the manifest on each call, which would seem to allow updating A1 in place. Revisit this?
- manifest = utils.get_ric_manifest()
- for m in manifest["controls"]:
- if m["name"] == policy_type_id:
- return m
-
- raise exceptions.PolicyTypeNotFound()
-
-
-def _get_policy_schema(policy_type_id):
- """
- Get the needed info for a policy
- """
- m = _get_policy_definition(policy_type_id)
- return m["message_receives_payload_schema"] if "message_receives_payload_schema" in m else None
-
-
def _try_func_return(func):
"""
generic caller that returns the apporp http response if exceptions are raised
"""
try:
return func()
- except ValidationError as exc:
- logger.exception(exc)
+ except (ValidationError, exceptions.PolicyTypeAlreadyExists, exceptions.CantDeleteNonEmptyType):
return "", 400
- except exceptions.PolicyTypeNotFound as exc:
- logger.exception(exc)
- return "", 404
- except exceptions.PolicyInstanceNotFound as exc:
- logger.exception(exc)
+ except (exceptions.PolicyTypeNotFound, exceptions.PolicyInstanceNotFound):
return "", 404
- except exceptions.MissingManifest as exc:
- logger.exception(exc)
- return "A1 was unable to find the required RIC manifest. report this!", 500
- except exceptions.MissingRmrString as exc:
- logger.exception(exc)
- return "A1 does not have a mapping for the desired rmr string. report this!", 500
except BaseException as exc:
# catch all, should never happen...
logger.exception(exc)
return Response(status=500)
-def _put_handler(policy_type_id, policy_instance_id, instance):
+def _gen_body_to_handler(operation, policy_type_id, policy_instance_id, payload=None):
"""
- Handles policy put
-
- For now, policy_type_id is used as the message type
+ used to create the payloads that get sent to downstream policy handlers
"""
- # check for 404
- data.type_is_valid(policy_type_id)
-
- # validate the PUT against the schema, or if there is no shema, make sure the pUT is empty
- schema = _get_policy_schema(policy_type_id)
- if schema:
- utils.validate_json(instance, schema)
- elif instance != {}:
- return "BODY SUPPLIED BUT POLICY HAS NO EXPECTED BODY", 400
-
- # store the instance
- data.store_policy_instance(policy_type_id, policy_instance_id, instance)
-
- body = {
- "operation": "CREATE",
+ return {
+ "operation": operation,
"policy_type_id": policy_type_id,
"policy_instance_id": policy_instance_id,
- "payload": instance,
+ "payload": payload,
}
- # send rmr (best effort)
- a1rmr.send(json.dumps(body), message_type=policy_type_id)
-
- return "", 201
-
-
-def _get_status_handler(policy_type_id, policy_instance_id):
- """
- Pop trough A1s mailbox, insert the latest status updates into the database, and then return the status vector
-
- NOTE: this is done lazily. Meaning, when someone performs a GET on this API, we pop through a1s mailbox.
- THis may not work in the future if there are "thousands" of policy acknowledgements that hit a1 before this is called,
- because the rmr mailbox may fill. However, in the near term, we do not expect this to happen.
- """
- # check validity to 404 first:
- data.type_is_valid(policy_type_id)
- data.instance_is_valid(policy_type_id, policy_instance_id)
-
- # pop a1s mailbox, looking for policy notifications
- new_messages = a1rmr.dequeue_all_waiting_messages(21024)
-
- # try to parse the messages as responses. Drop those that are malformed
- for msg in new_messages:
- # note, we don't use the parameters "policy_type_id, policy_instance" from above here,
- # because we are popping the whole mailbox, which might include other statuses
- pay = json.loads(msg["payload"])
- if "policy_type_id" in pay and "policy_instance_id" in pay and "handler_id" in pay and "status" in pay:
- data.set_policy_instance_status(
- pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
- )
- else:
- logger.debug("Dropping message")
- logger.debug(pay)
-
- # return the status vector
- return data.get_policy_instance_statuses(policy_type_id, policy_instance_id)
-
# Healthcheck
def get_healthcheck():
"""
Handles healthcheck GET
- Currently, this basically checks the server is alive.a1rmr
+ Currently, this checks:
+ 1. whether the a1 webserver is up (if it isn't, this won't even be called, so even entering this function confirms it is)
+ 2. checks whether the rmr thread is running and has completed a loop recently
+ TODO: make "seconds" to pass in a configurable parameter?
"""
- return "", 200
+ if a1rmr.healthcheck_rmr_thread():
+ return "", 200
+ return "rmr thread is unhealthy", 500
# Policy types
"""
Handles GET /a1-p/policytypes
"""
- return "", 501
+ return _try_func_return(data.get_type_list)
def create_policy_type(policy_type_id):
"""
Handles PUT /a1-p/policytypes/policy_type_id
"""
- return "", 501
+
+ def put_type_handler():
+ data.store_policy_type(policy_type_id, body)
+ return "", 201
+
+ body = connexion.request.json
+ return _try_func_return(put_type_handler)
def get_policy_type(policy_type_id):
"""
Handles GET /a1-p/policytypes/policy_type_id
"""
- return "", 501
+ return _try_func_return(lambda: data.get_policy_type(policy_type_id))
def delete_policy_type(policy_type_id):
"""
Handles DELETE /a1-p/policytypes/policy_type_id
"""
- return "", 501
+
+ def delete_policy_type_handler():
+ data.delete_policy_type(policy_type_id)
+ return "", 204
+
+ return _try_func_return(delete_policy_type_handler)
# Policy instances
"""
Handles GET /a1-p/policytypes/policy_type_id/policies
"""
- return "", 501
+ return _try_func_return(lambda: data.get_instance_list(policy_type_id))
def get_policy_instance(policy_type_id, policy_instance_id):
def get_policy_instance_status(policy_type_id, policy_instance_id):
"""
Handles GET /a1-p/policytypes/polidyid/policies/policy_instance_id/status
+
+ Return the aggregated status. The order of rules is as follows:
+ 1. If a1 has received at least one status, and *all* received statuses are "DELETED", we blow away the instance and return a 404
+ 2. if a1 has received at least one status and at least one is OK, we return "IN EFFECT"
+ 3. "NOT IN EFFECT" otherwise (no statuses, or none are OK but not all are deleted)
"""
- return _try_func_return(lambda: _get_status_handler(policy_type_id, policy_instance_id))
+
+ def get_status_handler():
+ vector = data.get_policy_instance_statuses(policy_type_id, policy_instance_id)
+ for i in vector:
+ if i == "OK":
+ return "IN EFFECT", 200
+ return "NOT IN EFFECT", 200
+
+ return _try_func_return(get_status_handler)
def create_or_replace_policy_instance(policy_type_id, policy_instance_id):
Handles PUT /a1-p/policytypes/polidyid/policies/policy_instance_id
"""
instance = connexion.request.json
- return _try_func_return(lambda: _put_handler(policy_type_id, policy_instance_id, instance))
+
+ def put_instance_handler():
+ """
+ Handles policy instance put
+
+ For now, policy_type_id is used as the message type
+ """
+ # validate the PUT against the schema
+ schema = data.get_policy_type(policy_type_id)["create_schema"]
+ validate(instance=instance, schema=schema)
+
+ # store the instance
+ data.store_policy_instance(policy_type_id, policy_instance_id, instance)
+
+ # send rmr (best effort)
+ body = _gen_body_to_handler("CREATE", policy_type_id, policy_instance_id, payload=instance)
+ a1rmr.queue_work({"payload": json.dumps(body), "msg type": policy_type_id})
+
+ return "", 202
+
+ return _try_func_return(put_instance_handler)
def delete_policy_instance(policy_type_id, policy_instance_id):
"""
Handles DELETE /a1-p/policytypes/polidyid/policies/policy_instance_id
"""
- return "", 501
+
+ def delete_instance_handler():
+ """
+ here we send out the DELETEs but we don't delete the instance until a GET is called where we check the statuses
+ """
+ data.instance_is_valid(policy_type_id, policy_instance_id)
+
+ # send rmr (best effort)
+ body = _gen_body_to_handler("DELETE", policy_type_id, policy_instance_id)
+ a1rmr.queue_work({"payload": json.dumps(body), "msg type": policy_type_id})
+
+ return "", 202
+
+ return _try_func_return(delete_instance_handler)