2 Represents A1s database and database access functions.
4 # ==================================================================================
5 # Copyright (c) 2019-2020 Nokia
6 # Copyright (c) 2018-2020 AT&T Intellectual Property.
8 # Licensed under the Apache License, Version 2.0 (the "License");
9 # you may not use this file except in compliance with the License.
10 # You may obtain a copy of the License at
12 # http://www.apache.org/licenses/LICENSE-2.0
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS,
16 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 # See the License for the specific language governing permissions and
18 # limitations under the License.
19 # ==================================================================================
22 from threading import Thread
24 from mdclogpy import Logger
25 from ricsdl.syncstorage import SyncStorage
26 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound, PolicyTypeAlreadyExists, CantDeleteNonEmptyType
28 mdc_logger = Logger(name=__name__)
31 INSTANCE_DELETE_NO_RESP_TTL = int(os.environ.get("INSTANCE_DELETE_NO_RESP_TTL", 5))
32 INSTANCE_DELETE_RESP_TTL = int(os.environ.get("INSTANCE_DELETE_RESP_TTL", 5))
39 This is a wrapper around the expected SDL Python interface.
40 The usage of POLICY_DATA will be replaced with SDL when SDL for python is available.
41 The eventual SDL API is expected to be very close to what is here.
43 We use msgpack for binary (de)serialization: https://msgpack.org/index.html
47 self.sdl = SyncStorage()
49 def set(self, key, value):
51 self.sdl.set(A1NS, {key: msgpack.packb(value, use_bin_type=True)})
55 ret_dict = self.sdl.get(A1NS, {key})
57 return msgpack.unpackb(ret_dict[key], raw=False)
61 def find_and_get(self, prefix):
62 """get all k v pairs that start with prefix"""
63 # note: SDL "*" usage is inconsistent with real python regex, where it would be ".*"
64 ret_dict = self.sdl.find_and_get(A1NS, "{0}*".format(prefix))
65 return {k: msgpack.unpackb(v, raw=False) for k, v in ret_dict.items()}
67 def delete(self, key):
69 self.sdl.remove(A1NS, {key})
71 def healthcheck(self):
72 """checks if the sdl connection is healthy"""
73 return self.sdl.is_active()
78 TYPE_PREFIX = "a1.policy_type."
79 INSTANCE_PREFIX = "a1.policy_instance."
80 METADATA_PREFIX = "a1.policy_inst_metadata."
81 HANDLER_PREFIX = "a1.policy_handler."
87 def _generate_type_key(policy_type_id):
89 generate a key for a policy type
91 return "{0}{1}".format(TYPE_PREFIX, policy_type_id)
94 def _generate_instance_key(policy_type_id, policy_instance_id):
96 generate a key for a policy instance
98 return "{0}{1}.{2}".format(INSTANCE_PREFIX, policy_type_id, policy_instance_id)
101 def _generate_instance_metadata_key(policy_type_id, policy_instance_id):
103 generate a key for a policy instance metadata
105 return "{0}{1}.{2}".format(METADATA_PREFIX, policy_type_id, policy_instance_id)
108 def _generate_handler_prefix(policy_type_id, policy_instance_id):
110 generate the prefix to a handler key
112 return "{0}{1}.{2}.".format(HANDLER_PREFIX, policy_type_id, policy_instance_id)
115 def _generate_handler_key(policy_type_id, policy_instance_id, handler_id):
117 generate a key for a policy handler
119 return "{0}{1}".format(_generate_handler_prefix(policy_type_id, policy_instance_id), handler_id)
122 def _type_is_valid(policy_type_id):
124 check that a type is valid
126 if SDL.get(_generate_type_key(policy_type_id)) is None:
127 raise PolicyTypeNotFound()
130 def _instance_is_valid(policy_type_id, policy_instance_id):
132 check that an instance is valid
134 _type_is_valid(policy_type_id)
135 if SDL.get(_generate_instance_key(policy_type_id, policy_instance_id)) is None:
136 raise PolicyInstanceNotFound
139 def _get_statuses(policy_type_id, policy_instance_id):
141 shared helper to get statuses for an instance
143 _instance_is_valid(policy_type_id, policy_instance_id)
144 prefixes_for_handler = "{0}{1}.{2}.".format(HANDLER_PREFIX, policy_type_id, policy_instance_id)
145 return list(SDL.find_and_get(prefixes_for_handler).values())
148 def _get_instance_list(policy_type_id):
150 shared helper to get instance list for a type
152 _type_is_valid(policy_type_id)
153 prefixes_for_type = "{0}{1}.".format(INSTANCE_PREFIX, policy_type_id)
154 instancekeys = SDL.find_and_get(prefixes_for_type).keys()
155 return [k.split(prefixes_for_type)[1] for k in instancekeys]
158 def _clear_handlers(policy_type_id, policy_instance_id):
160 delete all the handlers for a policy instance
162 all_handlers_pref = _generate_handler_prefix(policy_type_id, policy_instance_id)
163 keys = SDL.find_and_get(all_handlers_pref)
168 def _get_metadata(policy_type_id, policy_instance_id):
170 get instance metadata
172 _instance_is_valid(policy_type_id, policy_instance_id)
173 metadata_key = _generate_instance_metadata_key(policy_type_id, policy_instance_id)
174 return SDL.get(metadata_key)
177 def _delete_after(policy_type_id, policy_instance_id, ttl):
179 this is a blocking function, must call this in a thread to not block!
180 waits ttl seconds, then deletes the instance
182 _instance_is_valid(policy_type_id, policy_instance_id)
187 _clear_handlers(policy_type_id, policy_instance_id) # delete all the handlers
188 SDL.delete(_generate_instance_key(policy_type_id, policy_instance_id)) # delete instance
189 SDL.delete(_generate_instance_metadata_key(policy_type_id, policy_instance_id)) # delete instance metadata
190 mdc_logger.debug("type {0} instance {1} deleted".format(policy_type_id, policy_instance_id))
198 retrieve all type ids
200 typekeys = SDL.find_and_get(TYPE_PREFIX).keys()
201 # policy types are ints but they get butchered to strings in the KV
202 return [int(k.split(TYPE_PREFIX)[1]) for k in typekeys]
205 def store_policy_type(policy_type_id, body):
207 store a policy type if it doesn't already exist
209 key = _generate_type_key(policy_type_id)
210 if SDL.get(key) is not None:
211 raise PolicyTypeAlreadyExists()
215 def delete_policy_type(policy_type_id):
217 delete a policy type; can only be done if there are no instances (business logic)
219 pil = get_instance_list(policy_type_id)
220 if pil == []: # empty, can delete
221 SDL.delete(_generate_type_key(policy_type_id))
223 raise CantDeleteNonEmptyType()
226 def get_policy_type(policy_type_id):
230 _type_is_valid(policy_type_id)
231 return SDL.get(_generate_type_key(policy_type_id))
237 def store_policy_instance(policy_type_id, policy_instance_id, instance):
239 Store a policy instance
241 _type_is_valid(policy_type_id)
242 creation_timestamp = time.time()
245 key = _generate_instance_key(policy_type_id, policy_instance_id)
246 if SDL.get(key) is not None:
247 # Reset the statuses because this is a new policy instance, even if it was overwritten
248 _clear_handlers(policy_type_id, policy_instance_id) # delete all the handlers
249 SDL.set(key, instance)
251 metadata_key = _generate_instance_metadata_key(policy_type_id, policy_instance_id)
252 SDL.set(metadata_key, {"created_at": creation_timestamp, "has_been_deleted": False})
255 def get_policy_instance(policy_type_id, policy_instance_id):
257 Retrieve a policy instance
259 _instance_is_valid(policy_type_id, policy_instance_id)
260 return SDL.get(_generate_instance_key(policy_type_id, policy_instance_id))
263 def get_instance_list(policy_type_id):
265 retrieve all instance ids for a type
267 return _get_instance_list(policy_type_id)
270 def delete_policy_instance(policy_type_id, policy_instance_id):
272 initially sets has_been_deleted in the status
273 then launches a thread that waits until the relevent timer expires, and finally deletes the instance
275 _instance_is_valid(policy_type_id, policy_instance_id)
277 # set the metadata first
278 deleted_timestamp = time.time()
279 metadata_key = _generate_instance_metadata_key(policy_type_id, policy_instance_id)
280 existing_metadata = _get_metadata(policy_type_id, policy_instance_id)
283 {"created_at": existing_metadata["created_at"], "has_been_deleted": True, "deleted_at": deleted_timestamp},
287 vector = _get_statuses(policy_type_id, policy_instance_id)
289 # handler is empty; we wait for t1 to expire then goodnight
290 clos = lambda: _delete_after(policy_type_id, policy_instance_id, INSTANCE_DELETE_NO_RESP_TTL)
292 # handler is not empty, we wait max t1,t2 to expire then goodnight
293 clos = lambda: _delete_after(
294 policy_type_id, policy_instance_id, max(INSTANCE_DELETE_RESP_TTL, INSTANCE_DELETE_NO_RESP_TTL)
296 Thread(target=clos).start()
302 def set_policy_instance_status(policy_type_id, policy_instance_id, handler_id, status):
304 update the database status for a handler
305 called from a1's rmr thread
307 _type_is_valid(policy_type_id)
308 _instance_is_valid(policy_type_id, policy_instance_id)
309 SDL.set(_generate_handler_key(policy_type_id, policy_instance_id, handler_id), status)
312 def get_policy_instance_status(policy_type_id, policy_instance_id):
314 Gets the status of an instance
316 _instance_is_valid(policy_type_id, policy_instance_id)
317 metadata = _get_metadata(policy_type_id, policy_instance_id)
318 metadata["instance_status"] = "NOT IN EFFECT"
319 for i in _get_statuses(policy_type_id, policy_instance_id):
321 metadata["instance_status"] = "IN EFFECT"