1 # ==================================================================================
2 # Copyright (c) 2019 Nokia
3 # Copyright (c) 2018-2019 AT&T Intellectual Property.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ==================================================================================
17 from flask import Response
20 from jsonschema.exceptions import ValidationError
21 from a1 import get_module_logger
22 from a1 import a1rmr, exceptions, utils, data
25 logger = get_module_logger(__name__)
28 def _get_policy_definition(policy_type_id):
29 # Currently we read the manifest on each call, which would seem to allow updating A1 in place. Revisit this?
30 manifest = utils.get_ric_manifest()
31 for m in manifest["controls"]:
32 if m["name"] == policy_type_id:
35 raise exceptions.PolicyTypeNotFound()
38 def _get_policy_schema(policy_type_id):
40 Get the needed info for a policy
42 m = _get_policy_definition(policy_type_id)
43 return m["message_receives_payload_schema"] if "message_receives_payload_schema" in m else None
46 def _try_func_return(func):
48 generic caller that returns the apporp http response if exceptions are raised
52 except ValidationError as exc:
55 except exceptions.PolicyTypeNotFound as exc:
58 except exceptions.PolicyInstanceNotFound as exc:
61 except exceptions.MissingManifest as exc:
63 return "A1 was unable to find the required RIC manifest. report this!", 500
64 except exceptions.MissingRmrString as exc:
66 return "A1 does not have a mapping for the desired rmr string. report this!", 500
67 except BaseException as exc:
68 # catch all, should never happen...
70 return Response(status=500)
73 def _put_handler(policy_type_id, policy_instance_id, instance):
77 For now, policy_type_id is used as the message type
80 data.type_is_valid(policy_type_id)
82 # validate the PUT against the schema, or if there is no shema, make sure the pUT is empty
83 schema = _get_policy_schema(policy_type_id)
85 utils.validate_json(instance, schema)
87 return "BODY SUPPLIED BUT POLICY HAS NO EXPECTED BODY", 400
90 data.store_policy_instance(policy_type_id, policy_instance_id, instance)
93 "operation": "CREATE",
94 "policy_type_id": policy_type_id,
95 "policy_instance_id": policy_instance_id,
99 # send rmr (best effort)
100 a1rmr.send(json.dumps(body), message_type=policy_type_id)
105 def _get_status_handler(policy_type_id, policy_instance_id):
107 Pop trough A1s mailbox, insert the latest status updates into the database, and then return the status vector
109 NOTE: this is done lazily. Meaning, when someone performs a GET on this API, we pop through a1s mailbox.
110 THis may not work in the future if there are "thousands" of policy acknowledgements that hit a1 before this is called,
111 because the rmr mailbox may fill. However, in the near term, we do not expect this to happen.
113 # check validity to 404 first:
114 data.type_is_valid(policy_type_id)
115 data.instance_is_valid(policy_type_id, policy_instance_id)
117 # pop a1s mailbox, looking for policy notifications
118 new_messages = a1rmr.dequeue_all_waiting_messages(21024)
120 # try to parse the messages as responses. Drop those that are malformed
121 for msg in new_messages:
122 # note, we don't use the parameters "policy_type_id, policy_instance" from above here,
123 # because we are popping the whole mailbox, which might include other statuses
124 pay = json.loads(msg["payload"])
125 if "policy_type_id" in pay and "policy_instance_id" in pay and "handler_id" in pay and "status" in pay:
126 data.set_policy_instance_status(
127 pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
130 logger.debug("Dropping message")
133 # return the status vector
134 return data.get_policy_instance_statuses(policy_type_id, policy_instance_id)
140 def get_healthcheck():
142 Handles healthcheck GET
143 Currently, this basically checks the server is alive.a1rmr
151 def get_all_policy_types():
153 Handles GET /a1-p/policytypes
158 def create_policy_type(policy_type_id):
160 Handles PUT /a1-p/policytypes/policy_type_id
165 def get_policy_type(policy_type_id):
167 Handles GET /a1-p/policytypes/policy_type_id
172 def delete_policy_type(policy_type_id):
174 Handles DELETE /a1-p/policytypes/policy_type_id
182 def get_all_instances_for_type(policy_type_id):
184 Handles GET /a1-p/policytypes/policy_type_id/policies
189 def get_policy_instance(policy_type_id, policy_instance_id):
191 Handles GET /a1-p/policytypes/polidyid/policies/policy_instance_id
193 return _try_func_return(lambda: data.get_policy_instance(policy_type_id, policy_instance_id))
196 def get_policy_instance_status(policy_type_id, policy_instance_id):
198 Handles GET /a1-p/policytypes/polidyid/policies/policy_instance_id/status
200 return _try_func_return(lambda: _get_status_handler(policy_type_id, policy_instance_id))
203 def create_or_replace_policy_instance(policy_type_id, policy_instance_id):
205 Handles PUT /a1-p/policytypes/polidyid/policies/policy_instance_id
207 instance = connexion.request.json
208 return _try_func_return(lambda: _put_handler(policy_type_id, policy_instance_id, instance))
211 def delete_policy_instance(policy_type_id, policy_instance_id):
213 Handles DELETE /a1-p/policytypes/polidyid/policies/policy_instance_id