2 Represents A1s database and database access functions.
3 In the future, this may change to use a different backend, possibly dramatically.
4 Hopefully, the access functions are a good api so nothing else has to change when this happens
6 For now, the database is in memory.
7 We use dict data structures (KV) with the expectation of having to move this into Redis
9 # ==================================================================================
10 # Copyright (c) 2019-2020 Nokia
11 # Copyright (c) 2018-2020 AT&T Intellectual Property.
13 # Licensed under the Apache License, Version 2.0 (the "License");
14 # you may not use this file except in compliance with the License.
15 # You may obtain a copy of the License at
17 # http://www.apache.org/licenses/LICENSE-2.0
19 # Unless required by applicable law or agreed to in writing, software
20 # distributed under the License is distributed on an "AS IS" BASIS,
21 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
22 # See the License for the specific language governing permissions and
23 # limitations under the License.
24 # ==================================================================================
27 from threading import Thread
29 from mdclogpy import Logger
31 from ricsdl.syncstorage import SyncStorage
33 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound, PolicyTypeAlreadyExists, CantDeleteNonEmptyType
35 mdc_logger = Logger(name=__name__)
38 INSTANCE_DELETE_NO_RESP_TTL = int(os.environ.get("INSTANCE_DELETE_NO_RESP_TTL", 5))
39 INSTANCE_DELETE_RESP_TTL = int(os.environ.get("INSTANCE_DELETE_RESP_TTL", 5))
46 This is a wrapper around the expected SDL Python interface.
47 The usage of POLICY_DATA will be replaced with SDL when SDL for python is available.
48 The eventual SDL API is expected to be very close to what is here.
50 We use msgpack for binary (de)serialization: https://msgpack.org/index.html
54 self.sdl = SyncStorage()
56 def set(self, key, value):
58 self.sdl.set(A1NS, {key: msgpack.packb(value, use_bin_type=True)})
62 ret_dict = self.sdl.get(A1NS, {key})
64 return msgpack.unpackb(ret_dict[key], raw=False)
68 def find_and_get(self, prefix):
69 """get all k v pairs that start with prefix"""
70 ret_dict = self.sdl.find_and_get(A1NS, "{0}".format(prefix), atomic=True)
71 found = {k: msgpack.unpackb(v, raw=False) for k, v in ret_dict.items()}
72 # TODO: upgrade to sdl 2.0.0 which does the sorting for us
73 return {k: found[k] for k in sorted(found)}
75 def delete(self, key):
77 self.sdl.remove(A1NS, {key})
82 TYPE_PREFIX = "a1.policy_type."
83 INSTANCE_PREFIX = "a1.policy_instance."
84 METADATA_PREFIX = "a1.policy_inst_metadata."
85 HANDLER_PREFIX = "a1.policy_handler."
91 def _generate_type_key(policy_type_id):
93 generate a key for a policy type
95 return "{0}{1}".format(TYPE_PREFIX, policy_type_id)
98 def _generate_instance_key(policy_type_id, policy_instance_id):
100 generate a key for a policy instance
102 return "{0}{1}.{2}".format(INSTANCE_PREFIX, policy_type_id, policy_instance_id)
105 def _generate_instance_metadata_key(policy_type_id, policy_instance_id):
107 generate a key for a policy instance metadata
109 return "{0}{1}.{2}".format(METADATA_PREFIX, policy_type_id, policy_instance_id)
112 def _generate_handler_prefix(policy_type_id, policy_instance_id):
114 generate the prefix to a handler key
116 return "{0}{1}.{2}.".format(HANDLER_PREFIX, policy_type_id, policy_instance_id)
119 def _generate_handler_key(policy_type_id, policy_instance_id, handler_id):
121 generate a key for a policy handler
123 return "{0}{1}".format(_generate_handler_prefix(policy_type_id, policy_instance_id), handler_id)
126 def _type_is_valid(policy_type_id):
128 check that a type is valid
130 if SDL.get(_generate_type_key(policy_type_id)) is None:
131 raise PolicyTypeNotFound()
134 def _instance_is_valid(policy_type_id, policy_instance_id):
136 check that an instance is valid
138 _type_is_valid(policy_type_id)
139 if SDL.get(_generate_instance_key(policy_type_id, policy_instance_id)) is None:
140 raise PolicyInstanceNotFound
143 def _get_statuses(policy_type_id, policy_instance_id):
145 shared helper to get statuses for an instance
147 _instance_is_valid(policy_type_id, policy_instance_id)
148 prefixes_for_handler = "{0}{1}.{2}.".format(HANDLER_PREFIX, policy_type_id, policy_instance_id)
149 return list(SDL.find_and_get(prefixes_for_handler).values())
152 def _get_instance_list(policy_type_id):
154 shared helper to get instance list for a type
156 _type_is_valid(policy_type_id)
157 prefixes_for_type = "{0}{1}.".format(INSTANCE_PREFIX, policy_type_id)
158 instancekeys = SDL.find_and_get(prefixes_for_type).keys()
159 return [k.split(prefixes_for_type)[1] for k in instancekeys]
162 def _clear_handlers(policy_type_id, policy_instance_id):
164 delete all the handlers for a policy instance
166 all_handlers_pref = _generate_handler_prefix(policy_type_id, policy_instance_id)
167 keys = SDL.find_and_get(all_handlers_pref)
172 def _get_metadata(policy_type_id, policy_instance_id):
174 get instance metadata
176 _instance_is_valid(policy_type_id, policy_instance_id)
177 metadata_key = _generate_instance_metadata_key(policy_type_id, policy_instance_id)
178 return SDL.get(metadata_key)
181 def _delete_after(policy_type_id, policy_instance_id, ttl):
183 this is a blocking function, must call this in a thread to not block!
184 waits ttl seconds, then deletes the instance
186 _instance_is_valid(policy_type_id, policy_instance_id)
191 _clear_handlers(policy_type_id, policy_instance_id) # delete all the handlers
192 SDL.delete(_generate_instance_key(policy_type_id, policy_instance_id)) # delete instance
193 SDL.delete(_generate_instance_metadata_key(policy_type_id, policy_instance_id)) # delete instance metadata
194 mdc_logger.debug("type {0} instance {1} deleted".format(policy_type_id, policy_instance_id))
202 retrieve all type ids
204 typekeys = SDL.find_and_get(TYPE_PREFIX).keys()
205 # policy types are ints but they get butchered to strings in the KV
206 return [int(k.split(TYPE_PREFIX)[1]) for k in typekeys]
209 def store_policy_type(policy_type_id, body):
211 store a policy type if it doesn't already exist
213 key = _generate_type_key(policy_type_id)
214 if SDL.get(key) is not None:
215 raise PolicyTypeAlreadyExists()
219 def delete_policy_type(policy_type_id):
221 delete a policy type; can only be done if there are no instances (business logic)
223 pil = get_instance_list(policy_type_id)
224 if pil == []: # empty, can delete
225 SDL.delete(_generate_type_key(policy_type_id))
227 raise CantDeleteNonEmptyType()
230 def get_policy_type(policy_type_id):
234 _type_is_valid(policy_type_id)
235 return SDL.get(_generate_type_key(policy_type_id))
241 def store_policy_instance(policy_type_id, policy_instance_id, instance):
243 Store a policy instance
245 _type_is_valid(policy_type_id)
246 creation_timestamp = time.time()
249 key = _generate_instance_key(policy_type_id, policy_instance_id)
250 if SDL.get(key) is not None:
251 # Reset the statuses because this is a new policy instance, even if it was overwritten
252 _clear_handlers(policy_type_id, policy_instance_id) # delete all the handlers
253 SDL.set(key, instance)
255 metadata_key = _generate_instance_metadata_key(policy_type_id, policy_instance_id)
256 SDL.set(metadata_key, {"created_at": creation_timestamp, "has_been_deleted": False})
259 def get_policy_instance(policy_type_id, policy_instance_id):
261 Retrieve a policy instance
263 _instance_is_valid(policy_type_id, policy_instance_id)
264 return SDL.get(_generate_instance_key(policy_type_id, policy_instance_id))
267 def get_instance_list(policy_type_id):
269 retrieve all instance ids for a type
271 return _get_instance_list(policy_type_id)
274 def delete_policy_instance(policy_type_id, policy_instance_id):
276 initially sets has_been_deleted
277 then launches a thread that waits until the relevent timer expires, and finally deletes the instance
279 _instance_is_valid(policy_type_id, policy_instance_id)
281 # set the metadata first
282 deleted_timestamp = time.time()
283 metadata_key = _generate_instance_metadata_key(policy_type_id, policy_instance_id)
284 existing_metadata = _get_metadata(policy_type_id, policy_instance_id)
287 {"created_at": existing_metadata["created_at"], "has_been_deleted": True, "deleted_at": deleted_timestamp},
291 vector = _get_statuses(policy_type_id, policy_instance_id)
293 # handler is empty; we wait for t1 to expire then goodnight
294 clos = lambda: _delete_after(policy_type_id, policy_instance_id, INSTANCE_DELETE_NO_RESP_TTL)
296 # handler is not empty, we wait max t1,t2 to expire then goodnight
297 clos = lambda: _delete_after(
298 policy_type_id, policy_instance_id, max(INSTANCE_DELETE_RESP_TTL, INSTANCE_DELETE_NO_RESP_TTL)
300 Thread(target=clos).start()
306 def set_policy_instance_status(policy_type_id, policy_instance_id, handler_id, status):
308 update the database status for a handler
309 called from a1's rmr thread
311 _type_is_valid(policy_type_id)
312 _instance_is_valid(policy_type_id, policy_instance_id)
313 SDL.set(_generate_handler_key(policy_type_id, policy_instance_id, handler_id), status)
316 def get_policy_instance_status(policy_type_id, policy_instance_id):
318 Gets the status of an instance
320 _instance_is_valid(policy_type_id, policy_instance_id)
321 metadata = _get_metadata(policy_type_id, policy_instance_id)
322 metadata["instance_status"] = "NOT IN EFFECT"
323 for i in _get_statuses(policy_type_id, policy_instance_id):
325 metadata["instance_status"] = "IN EFFECT"