Threading pt3:
[ric-plt/a1.git] / a1 / data.py
index 51ba044..074f326 100644 (file)
@@ -23,30 +23,128 @@ Hopefully, the access functions are a good api so nothing else has to change whe
 For now, the database is in memory.
 We use dict data structures (KV) with the expectation of having to move this into Redis
 """
-from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound, PolicyTypeAlreadyExists
+import msgpack
+from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound, PolicyTypeAlreadyExists, CantDeleteNonEmptyType
 from a1 import get_module_logger
-from a1 import a1rmr
-import json
 
 logger = get_module_logger(__name__)
 
-# This is essentially mockouts for future KV
-# Note that the D subkey won't be needed when in redis, since you can store data at x anx x_y
-POLICY_DATA = {}
-I = "instances"
-H = "handlers"
-D = "data"
+
+class SDLWrapper:
+    """
+    This is a wrapper around the expected SDL Python interface.
+    The usage of POLICY_DATA will be replaced with  SDL when SDL for python is available.
+    The eventual SDL API is expected to be very close to what is here.
+
+    We use msgpack for binary (de)serialization: https://msgpack.org/index.html
+    """
+
+    def __init__(self):
+        self.POLICY_DATA = {}
+
+    def set(self, key, value):
+        """set a key"""
+        self.POLICY_DATA[key] = msgpack.packb(value, use_bin_type=True)
+
+    def get(self, key):
+        """get a key"""
+        if key in self.POLICY_DATA:
+            return msgpack.unpackb(self.POLICY_DATA[key], raw=False)
+        return None
+
+    def find_and_get(self, prefix):
+        """get all k v pairs that start with prefix"""
+        return {k: msgpack.unpackb(v, raw=False) for k, v in self.POLICY_DATA.items() if k.startswith(prefix)}
+
+    def delete(self, key):
+        """ delete a key"""
+        del self.POLICY_DATA[key]
+
+
+SDL = SDLWrapper()
+
+TYPE_PREFIX = "a1.policy_type."
+INSTANCE_PREFIX = "a1.policy_instance."
+HANDLER_PREFIX = "a1.policy_handler."
+
+
+# Internal helpers
+
+
+def _generate_type_key(policy_type_id):
+    """
+    generate a key for a policy type
+    """
+    return "{0}{1}".format(TYPE_PREFIX, policy_type_id)
+
+
+def _generate_instance_key(policy_type_id, policy_instance_id):
+    """
+    generate a key for a policy instance
+    """
+    return "{0}{1}.{2}".format(INSTANCE_PREFIX, policy_type_id, policy_instance_id)
+
+
+def _generate_handler_prefix(policy_type_id, policy_instance_id):
+    """
+    generate the prefix to a handler key
+    """
+    return "{0}{1}.{2}.".format(HANDLER_PREFIX, policy_type_id, policy_instance_id)
+
+
+def _generate_handler_key(policy_type_id, policy_instance_id, handler_id):
+    """
+    generate a key for a policy handler
+    """
+    return "{0}{1}".format(_generate_handler_prefix(policy_type_id, policy_instance_id), handler_id)
+
+
+def _get_statuses(policy_type_id, policy_instance_id):
+    """
+    shared helper to get statuses for an instance
+    """
+    instance_is_valid(policy_type_id, policy_instance_id)
+    prefixes_for_handler = "{0}{1}.{2}.".format(HANDLER_PREFIX, policy_type_id, policy_instance_id)
+    return list(SDL.find_and_get(prefixes_for_handler).values())
+
+
+def _get_instance_list(policy_type_id):
+    """
+    shared helper to get instance list for a type
+    """
+    type_is_valid(policy_type_id)
+    prefixes_for_type = "{0}{1}.".format(INSTANCE_PREFIX, policy_type_id)
+    instancekeys = SDL.find_and_get(prefixes_for_type).keys()
+    return [k.split(prefixes_for_type)[1] for k in instancekeys]
+
+
+def _clear_handlers(policy_type_id, policy_instance_id):
+    """
+    delete all the handlers for a policy instance
+    """
+    all_handlers_pref = _generate_handler_prefix(policy_type_id, policy_instance_id)
+    keys = SDL.find_and_get(all_handlers_pref)
+    for k in keys:
+        SDL.delete(k)
 
 
 # Types
 
 
+def get_type_list():
+    """
+    retrieve all type ids
+    """
+    typekeys = SDL.find_and_get(TYPE_PREFIX).keys()
+    # policy types are ints but they get butchered to strings in the KV
+    return [int(k.split(TYPE_PREFIX)[1]) for k in typekeys]
+
+
 def type_is_valid(policy_type_id):
     """
     check that a type is valid
     """
-    if policy_type_id not in POLICY_DATA:
-        logger.error("%s not found", policy_type_id)
+    if SDL.get(_generate_type_key(policy_type_id)) is None:
         raise PolicyTypeNotFound()
 
 
@@ -54,27 +152,29 @@ def store_policy_type(policy_type_id, body):
     """
     store a policy type if it doesn't already exist
     """
-    if policy_type_id in POLICY_DATA:
+    key = _generate_type_key(policy_type_id)
+    if SDL.get(key) is not None:
         raise PolicyTypeAlreadyExists()
-
-    POLICY_DATA[policy_type_id] = {}
-    POLICY_DATA[policy_type_id][D] = body
-    POLICY_DATA[policy_type_id][I] = {}
+    SDL.set(key, body)
 
 
-def get_policy_type(policy_type_id):
+def delete_policy_type(policy_type_id):
     """
-    retrieve a type
+    delete a policy type; can only be done if there are no instances (business logic)
     """
-    type_is_valid(policy_type_id)
-    return POLICY_DATA[policy_type_id][D]
+    pil = get_instance_list(policy_type_id)
+    if pil == []:  # empty, can delete
+        SDL.delete(_generate_type_key(policy_type_id))
+    else:
+        raise CantDeleteNonEmptyType()
 
 
-def get_type_list():
+def get_policy_type(policy_type_id):
     """
-    retrieve all type ids
+    retrieve a type
     """
-    return list(POLICY_DATA.keys())
+    type_is_valid(policy_type_id)
+    return SDL.get(_generate_type_key(policy_type_id))
 
 
 # Instances
@@ -85,7 +185,7 @@ def instance_is_valid(policy_type_id, policy_instance_id):
     check that an instance is valid
     """
     type_is_valid(policy_type_id)
-    if policy_instance_id not in POLICY_DATA[policy_type_id][I]:
+    if SDL.get(_generate_instance_key(policy_type_id, policy_instance_id)) is None:
         raise PolicyInstanceNotFound
 
 
@@ -94,78 +194,77 @@ def store_policy_instance(policy_type_id, policy_instance_id, instance):
     Store a policy instance
     """
     type_is_valid(policy_type_id)
+    key = _generate_instance_key(policy_type_id, policy_instance_id)
+    if SDL.get(key) is not None:
+        # Reset the statuses because this is a new policy instance, even if it was overwritten
+        _clear_handlers(policy_type_id, policy_instance_id)  # delete all the handlers
+    SDL.set(key, instance)
 
-    # store the instance
-    # Reset the statuses because this is a new policy instance, even if it was overwritten
-    POLICY_DATA[policy_type_id][I][policy_instance_id] = {}
-    POLICY_DATA[policy_type_id][I][policy_instance_id][D] = instance
-    POLICY_DATA[policy_type_id][I][policy_instance_id][H] = {}
 
-
-def delete_policy_instance_if_applicable(policy_type_id, policy_instance_id):
+def get_policy_instance(policy_type_id, policy_instance_id):
     """
-    delete a policy instance if all known statuses are DELETED
-
-    pops a1s waiting mailbox
+    Retrieve a policy instance
     """
-    # pop through a1s mailbox, updating a1s db of all policy statuses
-    for msg in a1rmr.dequeue_all_waiting_messages(21024):
-        # try to parse the messages as responses. Drop those that are malformed
-        # NOTE: we don't use the parameters "policy_type_id, policy_instance" from above here,
-        # because we are popping the whole mailbox, which might include other statuses
-        pay = json.loads(msg["payload"])
-        if "policy_type_id" in pay and "policy_instance_id" in pay and "handler_id" in pay and "status" in pay:
-            set_policy_instance_status(pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"])
-        else:
-            logger.debug("Dropping message")
-            logger.debug(pay)
-
-    # raise if not valid
     instance_is_valid(policy_type_id, policy_instance_id)
+    return SDL.get(_generate_instance_key(policy_type_id, policy_instance_id))
 
-    # see if we can delete
-    vector = get_policy_instance_statuses(policy_type_id, policy_instance_id)
-    if vector != []:
-        all_deleted = True
-        for i in vector:
-            if i != "DELETED":
-                all_deleted = False
-                break  # have at least one not DELETED, do nothing
 
-        # blow away from a1 db
-        if all_deleted:
-            del POLICY_DATA[policy_type_id][I][policy_instance_id]
+def get_policy_instance_statuses(policy_type_id, policy_instance_id):
+    """
+    Retrieve the status vector for a policy instance
+    """
+    return _get_statuses(policy_type_id, policy_instance_id)
 
 
-def get_policy_instance(policy_type_id, policy_instance_id):
+def get_instance_list(policy_type_id):
     """
-    Retrieve a policy instance
+    retrieve all instance ids for a type
     """
-    instance_is_valid(policy_type_id, policy_instance_id)
-    return POLICY_DATA[policy_type_id][I][policy_instance_id][D]
+    return _get_instance_list(policy_type_id)
 
 
-def get_policy_instance_statuses(policy_type_id, policy_instance_id):
+# Statuses
+
+
+def set_status(policy_type_id, policy_instance_id, handler_id, status):
     """
-    Retrieve the status vector for a policy instance
+    update the database status for a handler
+    called from a1's rmr thread
     """
+    type_is_valid(policy_type_id)
     instance_is_valid(policy_type_id, policy_instance_id)
-
-    return [v for _, v in POLICY_DATA[policy_type_id][I][policy_instance_id][H].items()]
+    SDL.set(_generate_handler_key(policy_type_id, policy_instance_id, handler_id), status)
 
 
-def set_policy_instance_status(policy_type_id, policy_instance_id, handler_id, status):
+def clean_up_instance(policy_type_id, policy_instance_id):
     """
-    Update the status of a handler id of a policy instance
+    see if we can delete an instance based on it's status
     """
+    type_is_valid(policy_type_id)
     instance_is_valid(policy_type_id, policy_instance_id)
 
-    POLICY_DATA[policy_type_id][I][policy_instance_id][H][handler_id] = status
+    """
+    TODO: not being able to delete if the list is [] is prolematic.
+    There are cases, such as a bad routing file, where this type will never be able to be deleted because it never went to any xapps
+    However, A1 cannot distinguish between the case where [] was never going to work, and the case where it hasn't worked *yet*
 
+    However, removing this constraint also leads to problems.
+    Deleting the instance when the vector is empty, for example doing so “shortly after” the PUT, can lead to a worse race condition where the xapps get the policy after that, implement it, but because the DELETE triggered “too soon”, you can never get the status or do the delete on it again, so the xapps are all implementing the instance roguely.
 
-def get_instance_list(policy_type_id):
+    This requires some thought to address.
+    For now we stick with the "less bad problem".
     """
-    retrieve all instance ids for a type
-    """
-    type_is_valid(policy_type_id)
-    return list(POLICY_DATA[policy_type_id][I].keys())
+
+    vector = _get_statuses(policy_type_id, policy_instance_id)
+    if vector != []:
+        all_deleted = True
+        for i in vector:
+            if i != "DELETED":
+                all_deleted = False
+                break  # have at least one not DELETED, do nothing
+
+        # blow away from a1 db
+        if all_deleted:
+            _clear_handlers(policy_type_id, policy_instance_id)  # delete all the handlers
+            SDL.delete(_generate_instance_key(policy_type_id, policy_instance_id))  # delete instance
+            logger.debug("type %s instance %s deleted", policy_type_id, policy_instance_id)