92ff278ae9cf2d97c08f12a67c58a376cac0875e
[ric-plt/a1.git] / a1 / controller.py
1 # ==================================================================================
2 #       Copyright (c) 2019 Nokia
3 #       Copyright (c) 2018-2019 AT&T Intellectual Property.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 # ==================================================================================
17 from flask import Response
18 import connexion
19 import json
20 from jsonschema.exceptions import ValidationError
21 from a1 import get_module_logger
22 from a1 import a1rmr, exceptions, utils, data
23
24
25 logger = get_module_logger(__name__)
26
27
28 def _get_policy_definition(policy_type_id):
29     # Currently we read the manifest on each call, which would seem to allow updating A1 in place. Revisit this?
30     manifest = utils.get_ric_manifest()
31     for m in manifest["controls"]:
32         if m["name"] == policy_type_id:
33             return m
34
35     raise exceptions.PolicyTypeNotFound()
36
37
38 def _get_policy_schema(policy_type_id):
39     """
40     Get the needed info for a policy
41     """
42     m = _get_policy_definition(policy_type_id)
43     return m["message_receives_payload_schema"] if "message_receives_payload_schema" in m else None
44
45
46 def _try_func_return(func):
47     """
48     generic caller that returns the apporp http response if exceptions are raised
49     """
50     try:
51         return func()
52     except ValidationError as exc:
53         logger.exception(exc)
54         return "", 400
55     except exceptions.PolicyTypeNotFound as exc:
56         logger.exception(exc)
57         return "", 404
58     except exceptions.PolicyInstanceNotFound as exc:
59         logger.exception(exc)
60         return "", 404
61     except exceptions.MissingManifest as exc:
62         logger.exception(exc)
63         return "A1 was unable to find the required RIC manifest. report this!", 500
64     except exceptions.MissingRmrString as exc:
65         logger.exception(exc)
66         return "A1 does not have a mapping for the desired rmr string. report this!", 500
67     except BaseException as exc:
68         # catch all, should never happen...
69         logger.exception(exc)
70         return Response(status=500)
71
72
73 def _put_handler(policy_type_id, policy_instance_id, instance):
74     """
75     Handles policy put
76
77     For now, policy_type_id is used as the message type
78     """
79     # check for 404
80     data.type_is_valid(policy_type_id)
81
82     # validate the PUT against the schema, or if there is no shema, make sure the pUT is empty
83     schema = _get_policy_schema(policy_type_id)
84     if schema:
85         utils.validate_json(instance, schema)
86     elif instance != {}:
87         return "BODY SUPPLIED BUT POLICY HAS NO EXPECTED BODY", 400
88
89     # store the instance
90     data.store_policy_instance(policy_type_id, policy_instance_id, instance)
91
92     body = {
93         "operation": "CREATE",
94         "policy_type_id": policy_type_id,
95         "policy_instance_id": policy_instance_id,
96         "payload": instance,
97     }
98
99     # send rmr (best effort)
100     a1rmr.send(json.dumps(body), message_type=policy_type_id)
101
102     return "", 201
103
104
105 def _get_status_handler(policy_type_id, policy_instance_id):
106     """
107     Pop trough A1s mailbox, insert the latest status updates into the database, and then return the status vector
108
109     NOTE: this is done lazily. Meaning, when someone performs a GET on this API, we pop through a1s mailbox.
110     THis may not work in the future if there are "thousands" of policy acknowledgements that hit a1 before this is called,
111     because the rmr mailbox may fill. However, in the near term, we do not expect this to happen.
112     """
113     # check validity to 404 first:
114     data.type_is_valid(policy_type_id)
115     data.instance_is_valid(policy_type_id, policy_instance_id)
116
117     # pop a1s mailbox, looking for policy notifications
118     new_messages = a1rmr.dequeue_all_waiting_messages(21024)
119
120     # try to parse the messages as responses. Drop those that are malformed
121     for msg in new_messages:
122         # note, we don't use the parameters "policy_type_id, policy_instance" from above here,
123         # because we are popping the whole mailbox, which might include other statuses
124         pay = json.loads(msg["payload"])
125         if "policy_type_id" in pay and "policy_instance_id" in pay and "handler_id" in pay and "status" in pay:
126             data.set_policy_instance_status(
127                 pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
128             )
129         else:
130             logger.debug("Dropping message")
131             logger.debug(pay)
132
133     # return the status vector
134     return data.get_policy_instance_statuses(policy_type_id, policy_instance_id)
135
136
137 # Healthcheck
138
139
140 def get_healthcheck():
141     """
142     Handles healthcheck GET
143     Currently, this basically checks the server is alive.a1rmr
144     """
145     return "", 200
146
147
148 # Policy types
149
150
151 def get_all_policy_types():
152     """
153     Handles GET /a1-p/policytypes
154     """
155     return "", 501
156
157
158 def create_policy_type(policy_type_id):
159     """
160     Handles PUT /a1-p/policytypes/policy_type_id
161     """
162     return "", 501
163
164
165 def get_policy_type(policy_type_id):
166     """
167     Handles GET /a1-p/policytypes/policy_type_id
168     """
169     return "", 501
170
171
172 def delete_policy_type(policy_type_id):
173     """
174     Handles DELETE /a1-p/policytypes/policy_type_id
175     """
176     return "", 501
177
178
179 # Policy instances
180
181
182 def get_all_instances_for_type(policy_type_id):
183     """
184     Handles GET /a1-p/policytypes/policy_type_id/policies
185     """
186     return "", 501
187
188
189 def get_policy_instance(policy_type_id, policy_instance_id):
190     """
191     Handles GET /a1-p/policytypes/polidyid/policies/policy_instance_id
192     """
193     return _try_func_return(lambda: data.get_policy_instance(policy_type_id, policy_instance_id))
194
195
196 def get_policy_instance_status(policy_type_id, policy_instance_id):
197     """
198     Handles GET /a1-p/policytypes/polidyid/policies/policy_instance_id/status
199     """
200     return _try_func_return(lambda: _get_status_handler(policy_type_id, policy_instance_id))
201
202
203 def create_or_replace_policy_instance(policy_type_id, policy_instance_id):
204     """
205     Handles PUT /a1-p/policytypes/polidyid/policies/policy_instance_id
206     """
207     instance = connexion.request.json
208     return _try_func_return(lambda: _put_handler(policy_type_id, policy_instance_id, instance))
209
210
211 def delete_policy_instance(policy_type_id, policy_instance_id):
212     """
213     Handles DELETE /a1-p/policytypes/polidyid/policies/policy_instance_id
214     """
215     return "", 501