X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=o2common%2Fviews%2Froute.py;h=832e38c8ac10ef6dcf09ebd9b49af94a914daf80;hb=refs%2Fchanges%2F43%2F9443%2F1;hp=b2d537d09f47ba56694943847505cd796407b33a;hpb=8f7352951c11d939bae11422c00c87dc1f1d2a85;p=pti%2Fo2.git diff --git a/o2common/views/route.py b/o2common/views/route.py index b2d537d..832e38c 100644 --- a/o2common/views/route.py +++ b/o2common/views/route.py @@ -29,6 +29,7 @@ from flask_restx.mask import Mask # , apply as apply_mask from flask_restx.model import Model from flask_restx.fields import List, Nested, String from flask_restx.utils import unpack +from o2common.domain.base import Serializer from o2common.helper import o2logging logger = o2logging.get_logger(__name__) @@ -95,7 +96,7 @@ class o2_marshal_with(marshal_with): resp = f(*args, **kwargs) req_args = request.args - mask = self._gen_mask_from_filter(**req_args) + mask = self._gen_mask_from_selector(**req_args) # mask = self.mask @@ -124,7 +125,7 @@ class o2_marshal_with(marshal_with): return wrapper - def _gen_mask_from_filter(self, **kwargs) -> str: + def _gen_mask_from_selector(self, **kwargs) -> str: mask_val = '' if 'all_fields' in kwargs: all_fields_without_space = kwargs['all_fields'].replace(" ", "") @@ -145,24 +146,23 @@ class o2_marshal_with(marshal_with): # continue # mask_val_list.append(f) # mask_val = '{%s}' % ','.join(mask_val_list) - default_fields = {} + selector = {} - self.__update_filter_value( - default_fields, fields_without_space, True) + self.__update_selector_value(selector, fields_without_space, True) - mask_val = self.__gen_mask_from_filter_tree(default_fields) + mask_val = self.__gen_mask_from_selector(selector) elif 'exclude_fields' in kwargs and kwargs['exclude_fields'] != '': exclude_fields_without_space = kwargs['exclude_fields'].replace( " ", "") - default_fields = self.__gen_filter_tree_from_model_with_value( + selector = self.__gen_selector_from_model_with_value( self.fields) - self.__update_filter_value( - default_fields, exclude_fields_without_space, False) + self.__update_selector_value( + selector, exclude_fields_without_space, False) - mask_val = self.__gen_mask_from_filter_tree(default_fields) + mask_val = self.__gen_mask_from_selector(selector) elif 'exclude_default' in kwargs and kwargs['exclude_default'] != '': exclude_default_without_space = kwargs['exclude_default'].replace( " ", "") @@ -184,50 +184,78 @@ class o2_marshal_with(marshal_with): else: return '{%s}' % f[0] - def __gen_filter_tree_from_model_with_value( + def __gen_selector_from_model_with_value( self, model: Model, default_val: bool = True) -> dict: - filter = dict() + selector = dict() for i in model: if type(model[i]) is List: if type(model[i].container) is String: - filter[i] = default_val + selector[i] = default_val continue - filter[i] = self.__gen_filter_tree_from_model_with_value( + selector[i] = self.__gen_selector_from_model_with_value( model[i].container.model, default_val) continue elif type(model[i]) is Nested: - filter[i] = self.__gen_filter_tree_from_model_with_value( + selector[i] = self.__gen_selector_from_model_with_value( model[i].model, default_val) - filter[i] = default_val - return filter + selector[i] = default_val + return selector - def __update_filter_value(self, default_fields: dict, filter: str, - val: bool): + def __update_selector_value(self, default_selector: dict, filter: str, + val: bool): fields = filter.split(',') for f in fields: if '/' in f: - self.__update_filter_tree_value(default_fields, f, val) + self.__update_selector_tree_value(default_selector, f, val) continue - default_fields[f] = val + default_selector[f] = val - def __update_filter_tree_value(self, m: dict, filter: str, val: bool): + def __update_selector_tree_value(self, m: dict, filter: str, val: bool): filter_list = filter.split('/', 1) if filter_list[0] not in m: m[filter_list[0]] = dict() if len(filter_list) > 1: - self.__update_filter_tree_value( + self.__update_selector_tree_value( m[filter_list[0]], filter_list[1], val) return m[filter_list[0]] = val - def __gen_mask_from_filter_tree(self, fields: dict) -> str: + def __gen_mask_from_selector(self, fields: dict) -> str: mask_li = list() for k, v in fields.items(): if type(v) is dict: - s = self.__gen_mask_from_filter_tree(v) + s = self.__gen_mask_from_selector(v) mask_li.append('%s%s' % (k, s)) continue if v: mask_li.append(k) return '{%s}' % ','.join(mask_li) + + +class ProblemDetails(Serializer): + def __init__(self, namespace: O2Namespace, code: int, detail: str, + title=None, instance=None + ) -> None: + self.ns = namespace + self.status = code + self.detail = detail + self.type = request.path + self.title = title if title is not None else self.getTitle(code) + self.instance = instance if instance is not None else [] + + def getTitle(self, code): + return HTTPStatus(code).phrase + + def abort(self): + self.ns.abort(self.status, self.detail, **self.serialize()) + + def serialize(self): + details = {} + for key in dir(self): + if key == 'ns' or key.startswith('__') or\ + callable(getattr(self, key)): + continue + else: + details[key] = getattr(self, key) + return details