# See the License for the specific language governing permissions and
# limitations under the License.
# ==================================================================================
+import json
from flask import Response
+from jsonschema import validate
import connexion
-import json
from jsonschema.exceptions import ValidationError
from a1 import get_module_logger
-from a1 import a1rmr, exceptions, utils, data
+from a1 import a1rmr, exceptions, data
logger = get_module_logger(__name__)
-def _get_policy_definition(policy_type_id):
- # Currently we read the manifest on each call, which would seem to allow updating A1 in place. Revisit this?
- manifest = utils.get_ric_manifest()
- for m in manifest["controls"]:
- if m["name"] == policy_type_id:
- return m
-
- raise exceptions.PolicyTypeNotFound()
-
-
-def _get_policy_schema(policy_type_id):
- """
- Get the needed info for a policy
- """
- m = _get_policy_definition(policy_type_id)
- return m["message_receives_payload_schema"] if "message_receives_payload_schema" in m else None
-
-
def _try_func_return(func):
"""
generic caller that returns the apporp http response if exceptions are raised
except ValidationError as exc:
logger.exception(exc)
return "", 400
+ except exceptions.PolicyTypeAlreadyExists as exc:
+ logger.exception(exc)
+ return "", 400
except exceptions.PolicyTypeNotFound as exc:
logger.exception(exc)
return "", 404
return Response(status=500)
-def _put_handler(policy_type_id, policy_instance_id, instance):
- """
- Handles policy put
-
- For now, policy_type_id is used as the message type
- """
- # check for 404
- data.type_is_valid(policy_type_id)
-
- # validate the PUT against the schema, or if there is no shema, make sure the pUT is empty
- schema = _get_policy_schema(policy_type_id)
- if schema:
- utils.validate_json(instance, schema)
- elif instance != {}:
- return "BODY SUPPLIED BUT POLICY HAS NO EXPECTED BODY", 400
-
- # store the instance
- data.store_policy_instance(policy_type_id, policy_instance_id, instance)
-
- body = {
- "operation": "CREATE",
- "policy_type_id": policy_type_id,
- "policy_instance_id": policy_instance_id,
- "payload": instance,
- }
-
- # send rmr (best effort)
- a1rmr.send(json.dumps(body), message_type=policy_type_id)
-
- return "", 201
-
-
-def _get_status_handler(policy_type_id, policy_instance_id):
- """
- Pop trough A1s mailbox, insert the latest status updates into the database, and then return the status vector
-
- NOTE: this is done lazily. Meaning, when someone performs a GET on this API, we pop through a1s mailbox.
- THis may not work in the future if there are "thousands" of policy acknowledgements that hit a1 before this is called,
- because the rmr mailbox may fill. However, in the near term, we do not expect this to happen.
- """
- # check validity to 404 first:
- data.type_is_valid(policy_type_id)
- data.instance_is_valid(policy_type_id, policy_instance_id)
-
- # pop a1s mailbox, looking for policy notifications
- new_messages = a1rmr.dequeue_all_waiting_messages(21024)
-
- # try to parse the messages as responses. Drop those that are malformed
- for msg in new_messages:
- # note, we don't use the parameters "policy_type_id, policy_instance" from above here,
- # because we are popping the whole mailbox, which might include other statuses
- pay = json.loads(msg["payload"])
- if "policy_type_id" in pay and "policy_instance_id" in pay and "handler_id" in pay and "status" in pay:
- data.set_policy_instance_status(
- pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
- )
- else:
- logger.debug("Dropping message")
- logger.debug(pay)
-
- # return the status vector
- return data.get_policy_instance_statuses(policy_type_id, policy_instance_id)
-
-
# Healthcheck
"""
Handles PUT /a1-p/policytypes/policy_type_id
"""
- return "", 501
+
+ def _put_type_handler(policy_type_id, body):
+ data.store_policy_type(policy_type_id, body)
+ return "", 201
+
+ body = connexion.request.json
+ return _try_func_return(lambda: _put_type_handler(policy_type_id, body))
def get_policy_type(policy_type_id):
"""
Handles GET /a1-p/policytypes/policy_type_id
"""
- return "", 501
+ return _try_func_return(lambda: data.get_policy_type(policy_type_id))
def delete_policy_type(policy_type_id):
"""
Handles GET /a1-p/policytypes/polidyid/policies/policy_instance_id
"""
+ # 200 is automatic here
return _try_func_return(lambda: data.get_policy_instance(policy_type_id, policy_instance_id))
"""
Handles GET /a1-p/policytypes/polidyid/policies/policy_instance_id/status
"""
+
+ def _get_status_handler(policy_type_id, policy_instance_id):
+ """
+ Pop trough A1s mailbox, insert the latest status updates into the database, and then return the status vector
+
+ NOTE: this is done lazily. Meaning, when someone performs a GET on this API, we pop through a1s mailbox.
+ THis may not work in the future if there are "thousands" of policy acknowledgements that hit a1 before this is called,
+ because the rmr mailbox may fill. However, in the near term, we do not expect this to happen.
+ """
+ # check validity to 404 first:
+ data.type_is_valid(policy_type_id)
+ data.instance_is_valid(policy_type_id, policy_instance_id)
+
+ # pop a1s mailbox, looking for policy notifications
+ new_messages = a1rmr.dequeue_all_waiting_messages(21024)
+
+ # try to parse the messages as responses. Drop those that are malformed
+ for msg in new_messages:
+ # note, we don't use the parameters "policy_type_id, policy_instance" from above here,
+ # because we are popping the whole mailbox, which might include other statuses
+ pay = json.loads(msg["payload"])
+ if "policy_type_id" in pay and "policy_instance_id" in pay and "handler_id" in pay and "status" in pay:
+ data.set_policy_instance_status(
+ pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
+ )
+ else:
+ logger.debug("Dropping message")
+ logger.debug(pay)
+
+ # return the status vector
+ return data.get_policy_instance_statuses(policy_type_id, policy_instance_id)
+
return _try_func_return(lambda: _get_status_handler(policy_type_id, policy_instance_id))
"""
Handles PUT /a1-p/policytypes/polidyid/policies/policy_instance_id
"""
+
+ def _put_instance_handler(policy_type_id, policy_instance_id, instance):
+ """
+ Handles policy instance put
+
+ For now, policy_type_id is used as the message type
+ """
+ # validate the PUT against the schema
+ schema = data.get_policy_type(policy_type_id)["create_schema"]
+ validate(instance=instance, schema=schema)
+
+ # store the instance
+ data.store_policy_instance(policy_type_id, policy_instance_id, instance)
+
+ body = {
+ "operation": "CREATE",
+ "policy_type_id": policy_type_id,
+ "policy_instance_id": policy_instance_id,
+ "payload": instance,
+ }
+
+ # send rmr (best effort)
+ a1rmr.send(json.dumps(body), message_type=policy_type_id)
+
+ return "", 201
+
instance = connexion.request.json
- return _try_func_return(lambda: _put_handler(policy_type_id, policy_instance_id, instance))
+ return _try_func_return(lambda: _put_instance_handler(policy_type_id, policy_instance_id, instance))
def delete_policy_instance(policy_type_id, policy_instance_id):