Towards Resiliency.
[ric-plt/a1.git] / a1 / data.py
1 # ==================================================================================
2 #       Copyright (c) 2019 Nokia
3 #       Copyright (c) 2018-2019 AT&T Intellectual Property.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 # ==================================================================================
17
18 """
19 Represents A1s database and database access functions.
20 In the future, this may change to use a different backend, possibly dramatically.
21 Hopefully, the access functions are a good api so nothing else has to change when this happens
22
23 For now, the database is in memory.
24 We use dict data structures (KV) with the expectation of having to move this into Redis
25 """
26 import json
27 import msgpack
28 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound, PolicyTypeAlreadyExists, CantDeleteNonEmptyType
29 from a1 import get_module_logger
30 from a1 import a1rmr
31
32 logger = get_module_logger(__name__)
33
34
35 class SDLWrapper:
36     """
37     This is a wrapper around the expected SDL Python interface.
38     The usage of POLICY_DATA will be replaced with  SDL when SDL for python is available.
39     The eventual SDL API is expected to be very close to what is here.
40
41     We use msgpack for binary (de)serialization: https://msgpack.org/index.html
42     """
43
44     def __init__(self):
45         self.POLICY_DATA = {}
46
47     def set(self, key, value):
48         """set a key"""
49         self.POLICY_DATA[key] = msgpack.packb(value, use_bin_type=True)
50
51     def get(self, key):
52         """get a key"""
53         if key in self.POLICY_DATA:
54             return msgpack.unpackb(self.POLICY_DATA[key], raw=False)
55         return None
56
57     def find_and_get(self, prefix):
58         """get all k v pairs that start with prefix"""
59         return {k: msgpack.unpackb(v, raw=False) for k, v in self.POLICY_DATA.items() if k.startswith(prefix)}
60
61     def delete(self, key):
62         """ delete a key"""
63         del self.POLICY_DATA[key]
64
65
66 SDL = SDLWrapper()
67
68 TYPE_PREFIX = "a1.policy_type."
69 INSTANCE_PREFIX = "a1.policy_instance."
70 HANDLER_PREFIX = "a1.policy_handler."
71
72
73 # Internal helpers
74
75
76 def _generate_type_key(policy_type_id):
77     """
78     generate a key for a policy type
79     """
80     return "{0}{1}".format(TYPE_PREFIX, policy_type_id)
81
82
83 def _generate_instance_key(policy_type_id, policy_instance_id):
84     """
85     generate a key for a policy instance
86     """
87     return "{0}{1}.{2}".format(INSTANCE_PREFIX, policy_type_id, policy_instance_id)
88
89
90 def _generate_handler_prefix(policy_type_id, policy_instance_id):
91     """
92     generate the prefix to a handler key
93     """
94     return "{0}{1}.{2}.".format(HANDLER_PREFIX, policy_type_id, policy_instance_id)
95
96
97 def _generate_handler_key(policy_type_id, policy_instance_id, handler_id):
98     """
99     generate a key for a policy handler
100     """
101     return "{0}{1}".format(_generate_handler_prefix(policy_type_id, policy_instance_id), handler_id)
102
103
104 def _get_statuses(policy_type_id, policy_instance_id):
105     """
106     shared helper to get statuses for an instance
107     """
108     instance_is_valid(policy_type_id, policy_instance_id)
109     prefixes_for_handler = "{0}{1}.{2}.".format(HANDLER_PREFIX, policy_type_id, policy_instance_id)
110     return list(SDL.find_and_get(prefixes_for_handler).values())
111
112
113 def _get_instance_list(policy_type_id):
114     """
115     shared helper to get instance list for a type
116     """
117     type_is_valid(policy_type_id)
118     prefixes_for_type = "{0}{1}.".format(INSTANCE_PREFIX, policy_type_id)
119     instancekeys = SDL.find_and_get(prefixes_for_type).keys()
120     return [k.split(prefixes_for_type)[1] for k in instancekeys]
121
122
123 def _clear_handlers(policy_type_id, policy_instance_id):
124     """
125     delete all the handlers for a policy instance
126     """
127     all_handlers_pref = _generate_handler_prefix(policy_type_id, policy_instance_id)
128     keys = SDL.find_and_get(all_handlers_pref)
129     for k in keys:
130         SDL.delete(k)
131
132
133 def _clean_up_type(policy_type_id):
134     """
135     pop through a1s mailbox, updating a1s db of all policy statuses
136     for all instances of type, see if it can be deleted
137     """
138     type_is_valid(policy_type_id)
139     for msg in a1rmr.dequeue_all_waiting_messages([21024]):
140         # try to parse the messages as responses. Drop those that are malformed
141         pay = json.loads(msg["payload"])
142         if "policy_type_id" in pay and "policy_instance_id" in pay and "handler_id" in pay and "status" in pay:
143             # We don't use the parameters "policy_type_id, policy_instance" from above here,
144             # because we are popping the whole mailbox, which might include other statuses
145             pti = pay["policy_type_id"]
146             pii = pay["policy_instance_id"]
147
148             try:
149                 """
150                 can't raise an exception here e.g.:
151                 because this is called on many functions; just drop bad status messages.
152                 We def don't want bad messages that happen to hit a1s mailbox to blow up anything
153
154                 """
155                 type_is_valid(pti)
156                 instance_is_valid(pti, pii)
157                 SDL.set(_generate_handler_key(pti, pii, pay["handler_id"]), pay["status"])
158             except (PolicyTypeNotFound, PolicyInstanceNotFound):
159                 pass
160
161         else:
162             logger.debug("Dropping message")
163             logger.debug(pay)
164
165     for policy_instance_id in _get_instance_list(policy_type_id):
166         # see if we can delete
167         vector = _get_statuses(policy_type_id, policy_instance_id)
168
169         """
170         TODO: not being able to delete if the list is [] is prolematic.
171         There are cases, such as a bad routing file, where this type will never be able to be deleted because it never went to any xapps
172         However, A1 cannot distinguish between the case where [] was never going to work, and the case where it hasn't worked *yet*
173
174         However, removing this constraint also leads to problems.
175         Deleting the instance when the vector is empty, for example doing so “shortly after” the PUT, can lead to a worse race condition where the xapps get the policy after that, implement it, but because the DELETE triggered “too soon”, you can never get the status or do the delete on it again, so the xapps are all implementing the instance roguely.
176
177         This requires some thought to address.
178         For now we stick with the "less bad problem".
179         """
180         if vector != []:
181             all_deleted = True
182             for i in vector:
183                 if i != "DELETED":
184                     all_deleted = False
185                     break  # have at least one not DELETED, do nothing
186
187             # blow away from a1 db
188             if all_deleted:
189                 _clear_handlers(policy_type_id, policy_instance_id)  # delete all the handlers
190                 SDL.delete(_generate_instance_key(policy_type_id, policy_instance_id))  # delete instance
191
192
193 # Types
194
195
196 def get_type_list():
197     """
198     retrieve all type ids
199     """
200     typekeys = SDL.find_and_get(TYPE_PREFIX).keys()
201     # policy types are ints but they get butchered to strings in the KV
202     return [int(k.split(TYPE_PREFIX)[1]) for k in typekeys]
203
204
205 def type_is_valid(policy_type_id):
206     """
207     check that a type is valid
208     """
209     if SDL.get(_generate_type_key(policy_type_id)) is None:
210         raise PolicyTypeNotFound()
211
212
213 def store_policy_type(policy_type_id, body):
214     """
215     store a policy type if it doesn't already exist
216     """
217     key = _generate_type_key(policy_type_id)
218     if SDL.get(key) is not None:
219         raise PolicyTypeAlreadyExists()
220     SDL.set(key, body)
221
222
223 def delete_policy_type(policy_type_id):
224     """
225     delete a policy type; can only be done if there are no instances (business logic)
226     """
227     pil = get_instance_list(policy_type_id)
228     if pil == []:  # empty, can delete
229         SDL.delete(_generate_type_key(policy_type_id))
230     else:
231         raise CantDeleteNonEmptyType()
232
233
234 def get_policy_type(policy_type_id):
235     """
236     retrieve a type
237     """
238     type_is_valid(policy_type_id)
239     return SDL.get(_generate_type_key(policy_type_id))
240
241
242 # Instances
243
244
245 def instance_is_valid(policy_type_id, policy_instance_id):
246     """
247     check that an instance is valid
248     """
249     type_is_valid(policy_type_id)
250     if SDL.get(_generate_instance_key(policy_type_id, policy_instance_id)) is None:
251         raise PolicyInstanceNotFound
252
253
254 def store_policy_instance(policy_type_id, policy_instance_id, instance):
255     """
256     Store a policy instance
257     """
258     type_is_valid(policy_type_id)
259     key = _generate_instance_key(policy_type_id, policy_instance_id)
260     if SDL.get(key) is not None:
261         # Reset the statuses because this is a new policy instance, even if it was overwritten
262         _clear_handlers(policy_type_id, policy_instance_id)  # delete all the handlers
263     SDL.set(key, instance)
264
265
266 def get_policy_instance(policy_type_id, policy_instance_id):
267     """
268     Retrieve a policy instance
269     """
270     _clean_up_type(policy_type_id)
271     instance_is_valid(policy_type_id, policy_instance_id)
272     return SDL.get(_generate_instance_key(policy_type_id, policy_instance_id))
273
274
275 def get_policy_instance_statuses(policy_type_id, policy_instance_id):
276     """
277     Retrieve the status vector for a policy instance
278     """
279     _clean_up_type(policy_type_id)
280     return _get_statuses(policy_type_id, policy_instance_id)
281
282
283 def get_instance_list(policy_type_id):
284     """
285     retrieve all instance ids for a type
286     """
287     _clean_up_type(policy_type_id)
288     return _get_instance_list(policy_type_id)