Changing log severity level in case of liveness/readiness issues
[ric-plt/a1.git] / a1 / data.py
index d320046..8ec30cc 100644 (file)
@@ -1,6 +1,6 @@
 # ==================================================================================
-#       Copyright (c) 2019 Nokia
-#       Copyright (c) 2018-2019 AT&T Intellectual Property.
+#       Copyright (c) 2019-2020 Nokia
+#       Copyright (c) 2018-2020 AT&T Intellectual Property.
 #
 #   Licensed under the Apache License, Version 2.0 (the "License");
 #   you may not use this file except in compliance with the License.
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 # ==================================================================================
-
 """
 Represents A1s database and database access functions.
-In the future, this may change to use a different backend, possibly dramatically.
-Hopefully, the access functions are a good api so nothing else has to change when this happens
-
-For now, the database is in memory.
-We use dict data structures (KV) with the expectation of having to move this into Redis
 """
-import json
-from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound, PolicyTypeAlreadyExists, CantDeleteNonEmptyType
-from a1 import get_module_logger
-from a1 import a1rmr
+import distutils.util
+import os
+import time
+from threading import Thread
+from mdclogpy import Logger
+from ricxappframe.xapp_sdl import SDLWrapper
+from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound, PolicyTypeAlreadyExists, PolicyTypeIdMismatch, CantDeleteNonEmptyType
+
+# constants
+INSTANCE_DELETE_NO_RESP_TTL = int(os.environ.get("INSTANCE_DELETE_NO_RESP_TTL", 5))
+INSTANCE_DELETE_RESP_TTL = int(os.environ.get("INSTANCE_DELETE_RESP_TTL", 5))
+USE_FAKE_SDL = bool(distutils.util.strtobool(os.environ.get("USE_FAKE_SDL", "False")))
+A1NS = "A1m_ns"
+TYPE_PREFIX = "a1.policy_type."
+INSTANCE_PREFIX = "a1.policy_instance."
+METADATA_PREFIX = "a1.policy_inst_metadata."
+HANDLER_PREFIX = "a1.policy_handler."
+
+
+mdc_logger = Logger(name=__name__)
+mdc_logger.mdclog_format_init(configmap_monitor=True)
+if USE_FAKE_SDL:
+    mdc_logger.debug("Using fake SDL")
+SDL = SDLWrapper(use_fake_sdl=USE_FAKE_SDL)
 
-logger = get_module_logger(__name__)
+# Internal helpers
 
-# This is essentially mockouts for future KV
-# Note that the D subkey won't be needed when in redis, since you can store data at x anx x_y
-POLICY_DATA = {}
-I = "instances"
-H = "handlers"
-D = "data"
+
+def _generate_type_key(policy_type_id):
+    """
+    generate a key for a policy type
+    """
+    return "{0}{1}".format(TYPE_PREFIX, policy_type_id)
 
 
-# Internal helpers
+def _generate_instance_key(policy_type_id, policy_instance_id):
+    """
+    generate a key for a policy instance
+    """
+    return "{0}{1}.{2}".format(INSTANCE_PREFIX, policy_type_id, policy_instance_id)
+
+
+def _generate_instance_metadata_key(policy_type_id, policy_instance_id):
+    """
+    generate a key for a policy instance metadata
+    """
+    return "{0}{1}.{2}".format(METADATA_PREFIX, policy_type_id, policy_instance_id)
+
+
+def _generate_handler_prefix(policy_type_id, policy_instance_id):
+    """
+    generate the prefix to a handler key
+    """
+    return "{0}{1}.{2}.".format(HANDLER_PREFIX, policy_type_id, policy_instance_id)
+
+
+def _generate_handler_key(policy_type_id, policy_instance_id, handler_id):
+    """
+    generate a key for a policy handler
+    """
+    return "{0}{1}".format(_generate_handler_prefix(policy_type_id, policy_instance_id), handler_id)
+
+
+def _type_is_valid(policy_type_id):
+    """
+    check that a type is valid
+    """
+    if SDL.get(A1NS, _generate_type_key(policy_type_id)) is None:
+        raise PolicyTypeNotFound(policy_type_id)
+
+
+def _instance_is_valid(policy_type_id, policy_instance_id):
+    """
+    check that an instance is valid
+    """
+    _type_is_valid(policy_type_id)
+    if SDL.get(A1NS, _generate_instance_key(policy_type_id, policy_instance_id)) is None:
+        raise PolicyInstanceNotFound(policy_type_id)
 
 
 def _get_statuses(policy_type_id, policy_instance_id):
     """
     shared helper to get statuses for an instance
     """
-    instance_is_valid(policy_type_id, policy_instance_id)
-    return [v for _, v in POLICY_DATA[policy_type_id][I][policy_instance_id][H].items()]
+    _instance_is_valid(policy_type_id, policy_instance_id)
+    prefixes_for_handler = "{0}{1}.{2}.".format(HANDLER_PREFIX, policy_type_id, policy_instance_id)
+    return list(SDL.find_and_get(A1NS, prefixes_for_handler).values())
 
 
 def _get_instance_list(policy_type_id):
     """
     shared helper to get instance list for a type
     """
-    type_is_valid(policy_type_id)
-    return list(POLICY_DATA[policy_type_id][I].keys())
-
-
-def _clean_up_type(policy_type_id):
-    """
-    pop through a1s mailbox, updating a1s db of all policy statuses
-    for all instances of type, see if it can be deleted
-    """
-    type_is_valid(policy_type_id)
-    for msg in a1rmr.dequeue_all_waiting_messages([21024]):
-        # try to parse the messages as responses. Drop those that are malformed
-        pay = json.loads(msg["payload"])
-        if "policy_type_id" in pay and "policy_instance_id" in pay and "handler_id" in pay and "status" in pay:
-            """
-            NOTE: can't raise an exception here e.g.:
-                instance_is_valid(pti, pii)
-            because this is called on many functions; just drop bad status messages.
-            We def don't want bad messages that happen to hit a1s mailbox to blow up anything
-
-            NOTE2: we don't use the parameters "policy_type_id, policy_instance" from above here,
-            # because we are popping the whole mailbox, which might include other statuses
-            """
-            pti = pay["policy_type_id"]
-            pii = pay["policy_instance_id"]
-            if pti in POLICY_DATA and pii in POLICY_DATA[pti][I]:  # manual check per comment above
-                POLICY_DATA[pti][I][pii][H][pay["handler_id"]] = pay["status"]
-        else:
-            logger.debug("Dropping message")
-            logger.debug(pay)
-
-    for policy_instance_id in _get_instance_list(policy_type_id):
-        # see if we can delete
-        vector = _get_statuses(policy_type_id, policy_instance_id)
-
-        """
-        TODO: not being able to delete if the list is [] is prolematic.
-        There are cases, such as a bad routing file, where this type will never be able to be deleted because it never went to any xapps
-        However, A1 cannot distinguish between the case where [] was never going to work, and the case where it hasn't worked *yet*
-
-        However, removing this constraint also leads to problems.
-        Deleting the instance when the vector is empty, for example doing so “shortly after” the PUT, can lead to a worse race condition where the xapps get the policy after that, implement it, but because the DELETE triggered “too soon”, you can never get the status or do the delete on it again, so the xapps are all implementing the instance roguely.
-
-        This requires some thought to address.
-        For now we stick with the "less bad problem".
-        """
-        if vector != []:
-            all_deleted = True
-            for i in vector:
-                if i != "DELETED":
-                    all_deleted = False
-                    break  # have at least one not DELETED, do nothing
-
-            # blow away from a1 db
-            if all_deleted:
-                del POLICY_DATA[policy_type_id][I][policy_instance_id]
+    _type_is_valid(policy_type_id)
+    prefixes_for_type = "{0}{1}.".format(INSTANCE_PREFIX, policy_type_id)
+    instancekeys = SDL.find_and_get(A1NS, prefixes_for_type).keys()
+    return [k.split(prefixes_for_type)[1] for k in instancekeys]
 
 
-# Types
+def _clear_handlers(policy_type_id, policy_instance_id):
+    """
+    delete all the handlers for a policy instance
+    """
+    all_handlers_pref = _generate_handler_prefix(policy_type_id, policy_instance_id)
+    keys = SDL.find_and_get(A1NS, all_handlers_pref)
+    for k in keys:
+        SDL.delete(A1NS, k)
 
 
-def get_type_list():
+def _get_metadata(policy_type_id, policy_instance_id):
     """
-    retrieve all type ids
+    get instance metadata
     """
-    return list(POLICY_DATA.keys())
+    _instance_is_valid(policy_type_id, policy_instance_id)
+    metadata_key = _generate_instance_metadata_key(policy_type_id, policy_instance_id)
+    return SDL.get(A1NS, metadata_key)
 
 
-def type_is_valid(policy_type_id):
+def _delete_after(policy_type_id, policy_instance_id, ttl):
     """
-    check that a type is valid
+    this is a blocking function, must call this in a thread to not block!
+    waits ttl seconds, then deletes the instance
     """
-    if policy_type_id not in POLICY_DATA:
-        logger.error("%s not found", policy_type_id)
-        raise PolicyTypeNotFound()
+    _instance_is_valid(policy_type_id, policy_instance_id)
+
+    time.sleep(ttl)
+
+    # ready to delete
+    _clear_handlers(policy_type_id, policy_instance_id)  # delete all the handlers
+    SDL.delete(A1NS, _generate_instance_key(policy_type_id, policy_instance_id))  # delete instance
+    SDL.delete(A1NS, _generate_instance_metadata_key(policy_type_id, policy_instance_id))  # delete instance metadata
+    mdc_logger.debug("type {0} instance {1} deleted".format(policy_type_id, policy_instance_id))
+
+
+# Types
+
+
+def get_type_list():
+    """
+    retrieve all type ids
+    """
+    typekeys = SDL.find_and_get(A1NS, TYPE_PREFIX).keys()
+    # policy types are ints but they get butchered to strings in the KV
+    return [int(k.split(TYPE_PREFIX)[1]) for k in typekeys]
 
 
 def store_policy_type(policy_type_id, body):
     """
     store a policy type if it doesn't already exist
     """
-    if policy_type_id in POLICY_DATA:
-        raise PolicyTypeAlreadyExists()
-
-    POLICY_DATA[policy_type_id] = {}
-    POLICY_DATA[policy_type_id][D] = body
-    POLICY_DATA[policy_type_id][I] = {}
+    if policy_type_id != body['policy_type_id']:
+        raise PolicyTypeIdMismatch("{0} vs. {1}".format(policy_type_id, body['policy_type_id']))
+    key = _generate_type_key(policy_type_id)
+    if SDL.get(A1NS, key) is not None:
+        raise PolicyTypeAlreadyExists(policy_type_id)
+    SDL.set(A1NS, key, body)
 
 
 def delete_policy_type(policy_type_id):
@@ -148,64 +181,111 @@ def delete_policy_type(policy_type_id):
     """
     pil = get_instance_list(policy_type_id)
     if pil == []:  # empty, can delete
-        del POLICY_DATA[policy_type_id]
+        SDL.delete(A1NS, _generate_type_key(policy_type_id))
     else:
-        raise CantDeleteNonEmptyType()
+        raise CantDeleteNonEmptyType(policy_type_id)
 
 
 def get_policy_type(policy_type_id):
     """
     retrieve a type
     """
-    type_is_valid(policy_type_id)
-    return POLICY_DATA[policy_type_id][D]
+    _type_is_valid(policy_type_id)
+    return SDL.get(A1NS, _generate_type_key(policy_type_id))
 
 
 # Instances
 
 
-def instance_is_valid(policy_type_id, policy_instance_id):
-    """
-    check that an instance is valid
-    """
-    type_is_valid(policy_type_id)
-    if policy_instance_id not in POLICY_DATA[policy_type_id][I]:
-        raise PolicyInstanceNotFound
-
-
 def store_policy_instance(policy_type_id, policy_instance_id, instance):
     """
     Store a policy instance
     """
-    type_is_valid(policy_type_id)
+    _type_is_valid(policy_type_id)
+    creation_timestamp = time.time()
 
     # store the instance
-    # Reset the statuses because this is a new policy instance, even if it was overwritten
-    POLICY_DATA[policy_type_id][I][policy_instance_id] = {}
-    POLICY_DATA[policy_type_id][I][policy_instance_id][D] = instance
-    POLICY_DATA[policy_type_id][I][policy_instance_id][H] = {}
+    operation = "CREATE"
+    key = _generate_instance_key(policy_type_id, policy_instance_id)
+    if SDL.get(A1NS, key) is not None:
+        operation = "UPDATE"
+        # Reset the statuses because this is a new policy instance, even if it was overwritten
+        _clear_handlers(policy_type_id, policy_instance_id)  # delete all the handlers
+    SDL.set(A1NS, key, instance)
+
+    metadata_key = _generate_instance_metadata_key(policy_type_id, policy_instance_id)
+    SDL.set(A1NS, metadata_key, {"created_at": creation_timestamp, "has_been_deleted": False})
+
+    return operation
 
 
 def get_policy_instance(policy_type_id, policy_instance_id):
     """
     Retrieve a policy instance
     """
-    _clean_up_type(policy_type_id)
-    instance_is_valid(policy_type_id, policy_instance_id)
-    return POLICY_DATA[policy_type_id][I][policy_instance_id][D]
+    _instance_is_valid(policy_type_id, policy_instance_id)
+    return SDL.get(A1NS, _generate_instance_key(policy_type_id, policy_instance_id))
 
 
-def get_policy_instance_statuses(policy_type_id, policy_instance_id):
+def get_instance_list(policy_type_id):
     """
-    Retrieve the status vector for a policy instance
+    retrieve all instance ids for a type
     """
-    _clean_up_type(policy_type_id)
-    return _get_statuses(policy_type_id, policy_instance_id)
+    return _get_instance_list(policy_type_id)
 
 
-def get_instance_list(policy_type_id):
+def delete_policy_instance(policy_type_id, policy_instance_id):
     """
-    retrieve all instance ids for a type
+    initially sets has_been_deleted in the status
+    then launches a thread that waits until the relevent timer expires, and finally deletes the instance
     """
-    _clean_up_type(policy_type_id)
-    return _get_instance_list(policy_type_id)
+    _instance_is_valid(policy_type_id, policy_instance_id)
+
+    # set the metadata first
+    deleted_timestamp = time.time()
+    metadata_key = _generate_instance_metadata_key(policy_type_id, policy_instance_id)
+    existing_metadata = _get_metadata(policy_type_id, policy_instance_id)
+    SDL.set(
+        A1NS,
+        metadata_key,
+        {"created_at": existing_metadata["created_at"], "has_been_deleted": True, "deleted_at": deleted_timestamp},
+    )
+
+    # wait, then delete
+    vector = _get_statuses(policy_type_id, policy_instance_id)
+    if vector == []:
+        # handler is empty; we wait for t1 to expire then goodnight
+        clos = lambda: _delete_after(policy_type_id, policy_instance_id, INSTANCE_DELETE_NO_RESP_TTL)
+    else:
+        # handler is not empty, we wait max t1,t2 to expire then goodnight
+        clos = lambda: _delete_after(
+            policy_type_id, policy_instance_id, max(INSTANCE_DELETE_RESP_TTL, INSTANCE_DELETE_NO_RESP_TTL)
+        )
+    Thread(target=clos).start()
+
+
+# Statuses
+
+
+def set_policy_instance_status(policy_type_id, policy_instance_id, handler_id, status):
+    """
+    update the database status for a handler
+    called from a1's rmr thread
+    """
+    _type_is_valid(policy_type_id)
+    _instance_is_valid(policy_type_id, policy_instance_id)
+    SDL.set(A1NS, _generate_handler_key(policy_type_id, policy_instance_id, handler_id), status)
+
+
+def get_policy_instance_status(policy_type_id, policy_instance_id):
+    """
+    Gets the status of an instance
+    """
+    _instance_is_valid(policy_type_id, policy_instance_id)
+    metadata = _get_metadata(policy_type_id, policy_instance_id)
+    metadata["instance_status"] = "NOT IN EFFECT"
+    for i in _get_statuses(policy_type_id, policy_instance_id):
+        if i == "OK":
+            metadata["instance_status"] = "IN EFFECT"
+            break
+    return metadata