X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=a1%2Fdata.py;h=a98be5c15ca691d86cdb0ab8359fb5acefaba528;hb=refs%2Fchanges%2F01%2F1901%2F1;hp=d701373da061d586ab2d9ab6be01ca0d850ba353;hpb=a0876efd819b43b870ba2254b34676b1a03ad326;p=ric-plt%2Fa1.git diff --git a/a1/data.py b/a1/data.py index d701373..a98be5c 100644 --- a/a1/data.py +++ b/a1/data.py @@ -1,3 +1,11 @@ +""" +Represents A1s database and database access functions. +In the future, this may change to use a different backend, possibly dramatically. +Hopefully, the access functions are a good api so nothing else has to change when this happens + +For now, the database is in memory. +We use dict data structures (KV) with the expectation of having to move this into Redis +""" # ================================================================================== # Copyright (c) 2019 Nokia # Copyright (c) 2018-2019 AT&T Intellectual Property. @@ -14,158 +22,296 @@ # See the License for the specific language governing permissions and # limitations under the License. # ================================================================================== +import os +import time +from threading import Thread +import msgpack +from mdclogpy import Logger +from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound, PolicyTypeAlreadyExists, CantDeleteNonEmptyType -""" -Represents A1s database and database access functions. -In the future, this may change to use a different backend, possibly dramatically. -Hopefully, the access functions are a good api so nothing else has to change when this happens +mdc_logger = Logger(name=__name__) -For now, the database is in memory. -We use dict data structures (KV) with the expectation of having to move this into Redis -""" -import json -from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound, PolicyTypeAlreadyExists -from a1 import get_module_logger -from a1 import a1rmr -logger = get_module_logger(__name__) +INSTANCE_DELETE_NO_RESP_TTL = int(os.environ.get("INSTANCE_DELETE_NO_RESP_TTL", 5)) +INSTANCE_DELETE_RESP_TTL = int(os.environ.get("INSTANCE_DELETE_RESP_TTL", 5)) -# This is essentially mockouts for future KV -# Note that the D subkey won't be needed when in redis, since you can store data at x anx x_y -POLICY_DATA = {} -I = "instances" -H = "handlers" -D = "data" +class SDLWrapper: + """ + This is a wrapper around the expected SDL Python interface. + The usage of POLICY_DATA will be replaced with SDL when SDL for python is available. + The eventual SDL API is expected to be very close to what is here. + + We use msgpack for binary (de)serialization: https://msgpack.org/index.html + """ + + def __init__(self): + self.POLICY_DATA = {} + + def set(self, key, value): + """set a key""" + self.POLICY_DATA[key] = msgpack.packb(value, use_bin_type=True) + + def get(self, key): + """get a key""" + if key in self.POLICY_DATA: + return msgpack.unpackb(self.POLICY_DATA[key], raw=False) + return None + + def find_and_get(self, prefix): + """get all k v pairs that start with prefix""" + return {k: msgpack.unpackb(v, raw=False) for k, v in self.POLICY_DATA.items() if k.startswith(prefix)} + + def delete(self, key): + """ delete a key""" + del self.POLICY_DATA[key] + + +SDL = SDLWrapper() + +TYPE_PREFIX = "a1.policy_type." +INSTANCE_PREFIX = "a1.policy_instance." +METADATA_PREFIX = "a1.policy_inst_metadata." +HANDLER_PREFIX = "a1.policy_handler." -# Types +# Internal helpers -def type_is_valid(policy_type_id): + +def _generate_type_key(policy_type_id): """ - check that a type is valid + generate a key for a policy type """ - if policy_type_id not in POLICY_DATA: - logger.error("%s not found", policy_type_id) - raise PolicyTypeNotFound() + return "{0}{1}".format(TYPE_PREFIX, policy_type_id) -def store_policy_type(policy_type_id, body): +def _generate_instance_key(policy_type_id, policy_instance_id): """ - store a policy type if it doesn't already exist + generate a key for a policy instance """ - if policy_type_id in POLICY_DATA: - raise PolicyTypeAlreadyExists() + return "{0}{1}.{2}".format(INSTANCE_PREFIX, policy_type_id, policy_instance_id) + - POLICY_DATA[policy_type_id] = {} - POLICY_DATA[policy_type_id][D] = body - POLICY_DATA[policy_type_id][I] = {} +def _generate_instance_metadata_key(policy_type_id, policy_instance_id): + """ + generate a key for a policy instance metadata + """ + return "{0}{1}.{2}".format(METADATA_PREFIX, policy_type_id, policy_instance_id) -def get_policy_type(policy_type_id): +def _generate_handler_prefix(policy_type_id, policy_instance_id): """ - retrieve a type + generate the prefix to a handler key """ - type_is_valid(policy_type_id) - return POLICY_DATA[policy_type_id][D] + return "{0}{1}.{2}.".format(HANDLER_PREFIX, policy_type_id, policy_instance_id) -def get_type_list(): +def _generate_handler_key(policy_type_id, policy_instance_id, handler_id): """ - retrieve all type ids + generate a key for a policy handler """ - return list(POLICY_DATA.keys()) + return "{0}{1}".format(_generate_handler_prefix(policy_type_id, policy_instance_id), handler_id) -# Instances +def _type_is_valid(policy_type_id): + """ + check that a type is valid + """ + if SDL.get(_generate_type_key(policy_type_id)) is None: + raise PolicyTypeNotFound() -def instance_is_valid(policy_type_id, policy_instance_id): +def _instance_is_valid(policy_type_id, policy_instance_id): """ check that an instance is valid """ - type_is_valid(policy_type_id) - if policy_instance_id not in POLICY_DATA[policy_type_id][I]: + _type_is_valid(policy_type_id) + if SDL.get(_generate_instance_key(policy_type_id, policy_instance_id)) is None: raise PolicyInstanceNotFound -def store_policy_instance(policy_type_id, policy_instance_id, instance): +def _get_statuses(policy_type_id, policy_instance_id): """ - Store a policy instance + shared helper to get statuses for an instance """ - type_is_valid(policy_type_id) + _instance_is_valid(policy_type_id, policy_instance_id) + prefixes_for_handler = "{0}{1}.{2}.".format(HANDLER_PREFIX, policy_type_id, policy_instance_id) + return list(SDL.find_and_get(prefixes_for_handler).values()) - # store the instance - # Reset the statuses because this is a new policy instance, even if it was overwritten - POLICY_DATA[policy_type_id][I][policy_instance_id] = {} - POLICY_DATA[policy_type_id][I][policy_instance_id][D] = instance - POLICY_DATA[policy_type_id][I][policy_instance_id][H] = {} + +def _get_instance_list(policy_type_id): + """ + shared helper to get instance list for a type + """ + _type_is_valid(policy_type_id) + prefixes_for_type = "{0}{1}.".format(INSTANCE_PREFIX, policy_type_id) + instancekeys = SDL.find_and_get(prefixes_for_type).keys() + return [k.split(prefixes_for_type)[1] for k in instancekeys] -def delete_policy_instance_if_applicable(policy_type_id, policy_instance_id): +def _clear_handlers(policy_type_id, policy_instance_id): """ - delete a policy instance if all known statuses are DELETED + delete all the handlers for a policy instance + """ + all_handlers_pref = _generate_handler_prefix(policy_type_id, policy_instance_id) + keys = SDL.find_and_get(all_handlers_pref) + for k in keys: + SDL.delete(k) + - pops a1s waiting mailbox +def _get_metadata(policy_type_id, policy_instance_id): + """ + get instance metadata """ - # pop through a1s mailbox, updating a1s db of all policy statuses - for msg in a1rmr.dequeue_all_waiting_messages([21024]): - # try to parse the messages as responses. Drop those that are malformed - # NOTE: we don't use the parameters "policy_type_id, policy_instance" from above here, - # because we are popping the whole mailbox, which might include other statuses - pay = json.loads(msg["payload"]) - if "policy_type_id" in pay and "policy_instance_id" in pay and "handler_id" in pay and "status" in pay: - set_policy_instance_status(pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]) - else: - logger.debug("Dropping message") - logger.debug(pay) + _instance_is_valid(policy_type_id, policy_instance_id) + metadata_key = _generate_instance_metadata_key(policy_type_id, policy_instance_id) + return SDL.get(metadata_key) - # raise if not valid - instance_is_valid(policy_type_id, policy_instance_id) - # see if we can delete - vector = get_policy_instance_statuses(policy_type_id, policy_instance_id) - if vector != []: - all_deleted = True - for i in vector: - if i != "DELETED": - all_deleted = False - break # have at least one not DELETED, do nothing +def _delete_after(policy_type_id, policy_instance_id, ttl): + """ + this is a blocking function, must call this in a thread to not block! + waits ttl seconds, then deletes the instance + """ + _instance_is_valid(policy_type_id, policy_instance_id) - # blow away from a1 db - if all_deleted: - del POLICY_DATA[policy_type_id][I][policy_instance_id] + time.sleep(ttl) + # ready to delete + _clear_handlers(policy_type_id, policy_instance_id) # delete all the handlers + SDL.delete(_generate_instance_key(policy_type_id, policy_instance_id)) # delete instance + SDL.delete(_generate_instance_metadata_key(policy_type_id, policy_instance_id)) # delete instance metadata + mdc_logger.debug("type {0} instance {1} deleted".format(policy_type_id, policy_instance_id)) -def get_policy_instance(policy_type_id, policy_instance_id): + +# Types + + +def get_type_list(): """ - Retrieve a policy instance + retrieve all type ids """ - instance_is_valid(policy_type_id, policy_instance_id) - return POLICY_DATA[policy_type_id][I][policy_instance_id][D] + typekeys = SDL.find_and_get(TYPE_PREFIX).keys() + # policy types are ints but they get butchered to strings in the KV + return [int(k.split(TYPE_PREFIX)[1]) for k in typekeys] -def get_policy_instance_statuses(policy_type_id, policy_instance_id): +def store_policy_type(policy_type_id, body): """ - Retrieve the status vector for a policy instance + store a policy type if it doesn't already exist """ - instance_is_valid(policy_type_id, policy_instance_id) + key = _generate_type_key(policy_type_id) + if SDL.get(key) is not None: + raise PolicyTypeAlreadyExists() + SDL.set(key, body) - return [v for _, v in POLICY_DATA[policy_type_id][I][policy_instance_id][H].items()] +def delete_policy_type(policy_type_id): + """ + delete a policy type; can only be done if there are no instances (business logic) + """ + pil = get_instance_list(policy_type_id) + if pil == []: # empty, can delete + SDL.delete(_generate_type_key(policy_type_id)) + else: + raise CantDeleteNonEmptyType() -def set_policy_instance_status(policy_type_id, policy_instance_id, handler_id, status): + +def get_policy_type(policy_type_id): """ - Update the status of a handler id of a policy instance + retrieve a type """ - instance_is_valid(policy_type_id, policy_instance_id) + _type_is_valid(policy_type_id) + return SDL.get(_generate_type_key(policy_type_id)) + - POLICY_DATA[policy_type_id][I][policy_instance_id][H][handler_id] = status +# Instances + + +def store_policy_instance(policy_type_id, policy_instance_id, instance): + """ + Store a policy instance + """ + _type_is_valid(policy_type_id) + creation_timestamp = time.time() + + # store the instance + key = _generate_instance_key(policy_type_id, policy_instance_id) + if SDL.get(key) is not None: + # Reset the statuses because this is a new policy instance, even if it was overwritten + _clear_handlers(policy_type_id, policy_instance_id) # delete all the handlers + SDL.set(key, instance) + + metadata_key = _generate_instance_metadata_key(policy_type_id, policy_instance_id) + SDL.set(metadata_key, {"created_at": creation_timestamp, "has_been_deleted": False}) + + +def get_policy_instance(policy_type_id, policy_instance_id): + """ + Retrieve a policy instance + """ + _instance_is_valid(policy_type_id, policy_instance_id) + return SDL.get(_generate_instance_key(policy_type_id, policy_instance_id)) def get_instance_list(policy_type_id): """ retrieve all instance ids for a type """ - type_is_valid(policy_type_id) - return list(POLICY_DATA[policy_type_id][I].keys()) + return _get_instance_list(policy_type_id) + + +def delete_policy_instance(policy_type_id, policy_instance_id): + """ + initially sets has_been_deleted + then launches a thread that waits until the relevent timer expires, and finally deletes the instance + """ + _instance_is_valid(policy_type_id, policy_instance_id) + + # set the metadata first + deleted_timestamp = time.time() + metadata_key = _generate_instance_metadata_key(policy_type_id, policy_instance_id) + existing_metadata = _get_metadata(policy_type_id, policy_instance_id) + SDL.set( + metadata_key, + {"created_at": existing_metadata["created_at"], "has_been_deleted": True, "deleted_at": deleted_timestamp}, + ) + + # wait, then delete + vector = _get_statuses(policy_type_id, policy_instance_id) + if vector == []: + # handler is empty; we wait for t1 to expire then goodnight + clos = lambda: _delete_after(policy_type_id, policy_instance_id, INSTANCE_DELETE_NO_RESP_TTL) + else: + # handler is not empty, we wait max t1,t2 to expire then goodnight + clos = lambda: _delete_after( + policy_type_id, policy_instance_id, max(INSTANCE_DELETE_RESP_TTL, INSTANCE_DELETE_NO_RESP_TTL) + ) + Thread(target=clos).start() + + +# Statuses + + +def set_policy_instance_status(policy_type_id, policy_instance_id, handler_id, status): + """ + update the database status for a handler + called from a1's rmr thread + """ + _type_is_valid(policy_type_id) + _instance_is_valid(policy_type_id, policy_instance_id) + SDL.set(_generate_handler_key(policy_type_id, policy_instance_id, handler_id), status) + + +def get_policy_instance_status(policy_type_id, policy_instance_id): + """ + Gets the status of an instance + """ + _instance_is_valid(policy_type_id, policy_instance_id) + metadata = _get_metadata(policy_type_id, policy_instance_id) + metadata["instance_status"] = "NOT IN EFFECT" + for i in _get_statuses(policy_type_id, policy_instance_id): + if i == "OK": + metadata["instance_status"] = "IN EFFECT" + break + return metadata