from threading import Thread
from mdclogpy import Logger
from ricxappframe.xapp_sdl import SDLWrapper
-from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound, PolicyTypeAlreadyExists, CantDeleteNonEmptyType
+from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound, PolicyTypeAlreadyExists, PolicyTypeIdMismatch, CantDeleteNonEmptyType
# constants
INSTANCE_DELETE_NO_RESP_TTL = int(os.environ.get("INSTANCE_DELETE_NO_RESP_TTL", 5))
check that a type is valid
"""
if SDL.get(A1NS, _generate_type_key(policy_type_id)) is None:
- raise PolicyTypeNotFound()
+ raise PolicyTypeNotFound(policy_type_id)
def _instance_is_valid(policy_type_id, policy_instance_id):
"""
_type_is_valid(policy_type_id)
if SDL.get(A1NS, _generate_instance_key(policy_type_id, policy_instance_id)) is None:
- raise PolicyInstanceNotFound
+ raise PolicyInstanceNotFound(policy_type_id)
def _get_statuses(policy_type_id, policy_instance_id):
"""
store a policy type if it doesn't already exist
"""
+ if policy_type_id != body['policy_type_id']:
+ raise PolicyTypeIdMismatch("{0} vs. {1}".format(policy_type_id, body['policy_type_id']))
key = _generate_type_key(policy_type_id)
if SDL.get(A1NS, key) is not None:
- raise PolicyTypeAlreadyExists()
- # overwrite ID in body to enforce consistency
- body['policy_type_id'] = policy_type_id
+ raise PolicyTypeAlreadyExists(policy_type_id)
SDL.set(A1NS, key, body)
if pil == []: # empty, can delete
SDL.delete(A1NS, _generate_type_key(policy_type_id))
else:
- raise CantDeleteNonEmptyType()
+ raise CantDeleteNonEmptyType(policy_type_id)
def get_policy_type(policy_type_id):