Implement Delete timeouts
[ric-plt/a1.git] / a1 / data.py
1 # ==================================================================================
2 #       Copyright (c) 2019 Nokia
3 #       Copyright (c) 2018-2019 AT&T Intellectual Property.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 # ==================================================================================
17
18 """
19 Represents A1s database and database access functions.
20 In the future, this may change to use a different backend, possibly dramatically.
21 Hopefully, the access functions are a good api so nothing else has to change when this happens
22
23 For now, the database is in memory.
24 We use dict data structures (KV) with the expectation of having to move this into Redis
25 """
26 import os
27 import time
28 from threading import Thread
29 import msgpack
30 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound, PolicyTypeAlreadyExists, CantDeleteNonEmptyType
31 from a1 import get_module_logger
32
33 logger = get_module_logger(__name__)
34
35
36 INSTANCE_DELETE_NO_RESP_TTL = int(os.environ.get("INSTANCE_DELETE_NO_RESP_TTL", 5))
37 INSTANCE_DELETE_RESP_TTL = int(os.environ.get("INSTANCE_DELETE_RESP_TTL", 5))
38
39
40 class SDLWrapper:
41     """
42     This is a wrapper around the expected SDL Python interface.
43     The usage of POLICY_DATA will be replaced with  SDL when SDL for python is available.
44     The eventual SDL API is expected to be very close to what is here.
45
46     We use msgpack for binary (de)serialization: https://msgpack.org/index.html
47     """
48
49     def __init__(self):
50         self.POLICY_DATA = {}
51
52     def set(self, key, value):
53         """set a key"""
54         self.POLICY_DATA[key] = msgpack.packb(value, use_bin_type=True)
55
56     def get(self, key):
57         """get a key"""
58         if key in self.POLICY_DATA:
59             return msgpack.unpackb(self.POLICY_DATA[key], raw=False)
60         return None
61
62     def find_and_get(self, prefix):
63         """get all k v pairs that start with prefix"""
64         return {k: msgpack.unpackb(v, raw=False) for k, v in self.POLICY_DATA.items() if k.startswith(prefix)}
65
66     def delete(self, key):
67         """ delete a key"""
68         del self.POLICY_DATA[key]
69
70
71 SDL = SDLWrapper()
72
73 TYPE_PREFIX = "a1.policy_type."
74 INSTANCE_PREFIX = "a1.policy_instance."
75 METADATA_PREFIX = "a1.policy_inst_metadata."
76 HANDLER_PREFIX = "a1.policy_handler."
77
78
79 # Internal helpers
80
81
82 def _generate_type_key(policy_type_id):
83     """
84     generate a key for a policy type
85     """
86     return "{0}{1}".format(TYPE_PREFIX, policy_type_id)
87
88
89 def _generate_instance_key(policy_type_id, policy_instance_id):
90     """
91     generate a key for a policy instance
92     """
93     return "{0}{1}.{2}".format(INSTANCE_PREFIX, policy_type_id, policy_instance_id)
94
95
96 def _generate_instance_metadata_key(policy_type_id, policy_instance_id):
97     """
98     generate a key for a policy instance metadata
99     """
100     return "{0}{1}.{2}".format(METADATA_PREFIX, policy_type_id, policy_instance_id)
101
102
103 def _generate_handler_prefix(policy_type_id, policy_instance_id):
104     """
105     generate the prefix to a handler key
106     """
107     return "{0}{1}.{2}.".format(HANDLER_PREFIX, policy_type_id, policy_instance_id)
108
109
110 def _generate_handler_key(policy_type_id, policy_instance_id, handler_id):
111     """
112     generate a key for a policy handler
113     """
114     return "{0}{1}".format(_generate_handler_prefix(policy_type_id, policy_instance_id), handler_id)
115
116
117 def _type_is_valid(policy_type_id):
118     """
119     check that a type is valid
120     """
121     if SDL.get(_generate_type_key(policy_type_id)) is None:
122         raise PolicyTypeNotFound()
123
124
125 def _instance_is_valid(policy_type_id, policy_instance_id):
126     """
127     check that an instance is valid
128     """
129     _type_is_valid(policy_type_id)
130     if SDL.get(_generate_instance_key(policy_type_id, policy_instance_id)) is None:
131         raise PolicyInstanceNotFound
132
133
134 def _get_statuses(policy_type_id, policy_instance_id):
135     """
136     shared helper to get statuses for an instance
137     """
138     _instance_is_valid(policy_type_id, policy_instance_id)
139     prefixes_for_handler = "{0}{1}.{2}.".format(HANDLER_PREFIX, policy_type_id, policy_instance_id)
140     return list(SDL.find_and_get(prefixes_for_handler).values())
141
142
143 def _get_instance_list(policy_type_id):
144     """
145     shared helper to get instance list for a type
146     """
147     _type_is_valid(policy_type_id)
148     prefixes_for_type = "{0}{1}.".format(INSTANCE_PREFIX, policy_type_id)
149     instancekeys = SDL.find_and_get(prefixes_for_type).keys()
150     return [k.split(prefixes_for_type)[1] for k in instancekeys]
151
152
153 def _clear_handlers(policy_type_id, policy_instance_id):
154     """
155     delete all the handlers for a policy instance
156     """
157     all_handlers_pref = _generate_handler_prefix(policy_type_id, policy_instance_id)
158     keys = SDL.find_and_get(all_handlers_pref)
159     for k in keys:
160         SDL.delete(k)
161
162
163 def _get_metadata(policy_type_id, policy_instance_id):
164     """
165     get instance metadata
166     """
167     _instance_is_valid(policy_type_id, policy_instance_id)
168     metadata_key = _generate_instance_metadata_key(policy_type_id, policy_instance_id)
169     return SDL.get(metadata_key)
170
171
172 def _delete_after(policy_type_id, policy_instance_id, ttl):
173     """
174     this is a blocking function, must call this in a thread to not block!
175     waits ttl seconds, then deletes the instance
176     """
177     _instance_is_valid(policy_type_id, policy_instance_id)
178
179     time.sleep(ttl)
180
181     # ready to delete
182     _clear_handlers(policy_type_id, policy_instance_id)  # delete all the handlers
183     SDL.delete(_generate_instance_key(policy_type_id, policy_instance_id))  # delete instance
184     SDL.delete(_generate_instance_metadata_key(policy_type_id, policy_instance_id))  # delete instance metadata
185     logger.debug("type %s instance %s deleted", policy_type_id, policy_instance_id)
186     raise PolicyInstanceNotFound()
187
188
189 # Types
190
191
192 def get_type_list():
193     """
194     retrieve all type ids
195     """
196     typekeys = SDL.find_and_get(TYPE_PREFIX).keys()
197     # policy types are ints but they get butchered to strings in the KV
198     return [int(k.split(TYPE_PREFIX)[1]) for k in typekeys]
199
200
201 def store_policy_type(policy_type_id, body):
202     """
203     store a policy type if it doesn't already exist
204     """
205     key = _generate_type_key(policy_type_id)
206     if SDL.get(key) is not None:
207         raise PolicyTypeAlreadyExists()
208     SDL.set(key, body)
209
210
211 def delete_policy_type(policy_type_id):
212     """
213     delete a policy type; can only be done if there are no instances (business logic)
214     """
215     pil = get_instance_list(policy_type_id)
216     if pil == []:  # empty, can delete
217         SDL.delete(_generate_type_key(policy_type_id))
218     else:
219         raise CantDeleteNonEmptyType()
220
221
222 def get_policy_type(policy_type_id):
223     """
224     retrieve a type
225     """
226     _type_is_valid(policy_type_id)
227     return SDL.get(_generate_type_key(policy_type_id))
228
229
230 # Instances
231
232
233 def store_policy_instance(policy_type_id, policy_instance_id, instance):
234     """
235     Store a policy instance
236     """
237     _type_is_valid(policy_type_id)
238     creation_timestamp = time.time()
239
240     # store the instance
241     key = _generate_instance_key(policy_type_id, policy_instance_id)
242     if SDL.get(key) is not None:
243         # Reset the statuses because this is a new policy instance, even if it was overwritten
244         _clear_handlers(policy_type_id, policy_instance_id)  # delete all the handlers
245     SDL.set(key, instance)
246
247     metadata_key = _generate_instance_metadata_key(policy_type_id, policy_instance_id)
248     SDL.set(metadata_key, {"created_at": creation_timestamp, "has_been_deleted": False})
249
250
251 def get_policy_instance(policy_type_id, policy_instance_id):
252     """
253     Retrieve a policy instance
254     """
255     _instance_is_valid(policy_type_id, policy_instance_id)
256     return SDL.get(_generate_instance_key(policy_type_id, policy_instance_id))
257
258
259 def get_instance_list(policy_type_id):
260     """
261     retrieve all instance ids for a type
262     """
263     return _get_instance_list(policy_type_id)
264
265
266 def delete_policy_instance(policy_type_id, policy_instance_id):
267     """
268     initially sets has_been_deleted
269     then launches a thread that waits until the relevent timer expires, and finally deletes the instance
270     """
271     _instance_is_valid(policy_type_id, policy_instance_id)
272
273     # set the metadata first
274     deleted_timestamp = time.time()
275     metadata_key = _generate_instance_metadata_key(policy_type_id, policy_instance_id)
276     existing_metadata = _get_metadata(policy_type_id, policy_instance_id)
277     SDL.set(
278         metadata_key,
279         {"created_at": existing_metadata["created_at"], "has_been_deleted": True, "deleted_at": deleted_timestamp},
280     )
281
282     # wait, then delete
283     vector = _get_statuses(policy_type_id, policy_instance_id)
284     if vector == []:
285         # handler is empty; we wait for t1 to expire then goodnight
286         clos = lambda: _delete_after(policy_type_id, policy_instance_id, INSTANCE_DELETE_NO_RESP_TTL)
287     else:
288         # handler is not empty, we wait max t1,t2 to expire then goodnight
289         clos = lambda: _delete_after(
290             policy_type_id, policy_instance_id, max(INSTANCE_DELETE_RESP_TTL, INSTANCE_DELETE_NO_RESP_TTL)
291         )
292     Thread(target=clos).start()
293
294
295 # Statuses
296
297
298 def set_policy_instance_status(policy_type_id, policy_instance_id, handler_id, status):
299     """
300     update the database status for a handler
301     called from a1's rmr thread
302     """
303     _type_is_valid(policy_type_id)
304     _instance_is_valid(policy_type_id, policy_instance_id)
305     SDL.set(_generate_handler_key(policy_type_id, policy_instance_id, handler_id), status)
306
307
308 def get_policy_instance_status(policy_type_id, policy_instance_id):
309     """
310     Gets the status of an instance
311     """
312     _instance_is_valid(policy_type_id, policy_instance_id)
313     metadata = _get_metadata(policy_type_id, policy_instance_id)
314     metadata["instance_status"] = "NOT IN EFFECT"
315     for i in _get_statuses(policy_type_id, policy_instance_id):
316         if i == "OK":
317             metadata["instance_status"] = "IN EFFECT"
318             break
319     return metadata