X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=o2common%2Fviews%2Fview.py;h=af02e782c04c7b3ea2739848c9933da017ce680c;hb=46d810e671d23449e36f1c5d78e5e725ff06a7ac;hp=6d55b71623890fe78507afb9f419105003eb7e85;hpb=4e221e2f91b6a960e31befcf3aa24a1b4e065afc;p=pti%2Fo2.git diff --git a/o2common/views/view.py b/o2common/views/view.py index 6d55b71..af02e78 100644 --- a/o2common/views/view.py +++ b/o2common/views/view.py @@ -12,90 +12,84 @@ # See the License for the specific language governing permissions and # limitations under the License. +# import re from sqlalchemy.sql.elements import ColumnElement -from sqlalchemy import or_ from o2common.views.route_exception import BadRequestException +from o2common.domain.filter import gen_orm_filter, \ + transfer_filter_attr_name_in_special from o2common.helper import o2logging logger = o2logging.get_logger(__name__) def gen_filter(obj: ColumnElement, filter_str: str): - if filter_str == '': - return [] - filter_without_space = filter_str.replace(" ", "") + check_filter(obj, filter_str) + try: + filter_list = gen_orm_filter(obj, filter_str) + except KeyError as e: + raise BadRequestException(e.args[0]) + return filter_list + + +# The regular expressions testing example put on here +# (neq,testkey,value-1) +# (neq,testkey,value-1,value-2) +# (gt,hello,1) +# (gte,world,2) +# (lt,testlt,notint) +# (ncont,key1,v1,v_2) +# (gt,hello,1);(ncont,world,val1,val-2) +# (eq,wrong,60cba7be-e2cd-3b8c-a7ff-16e0f10573f9) +# (eq,description,value key) +def check_filter(obj: ColumnElement, filter_str: str): + if not filter_str: + return + # pattern = r'^(\((eq|neq|gt|lt|gte|lte){1},\w+,[\w -\.]+\)\;?|' +\ + # r'\((in|nin|cont|ncont){1},\w*(,[\w -\.]*)*\)\;?)+' + # result = re.match(pattern, filter_str) + # logger.debug('filter: {} match result is {}'.format(filter_str, result)) + # if not result: + # raise BadRequestException( + # 'filter value format is invalid') + check_filter_attribute(obj, filter_str) + + +def check_filter_attribute(obj: ColumnElement, filter_str: str): + # filter_without_space = filter_str.replace(" ", "") + filter_without_space = filter_str.strip(' ()') + logger.debug( + f"filter_str: {filter_str}, stripped: {filter_without_space}") items = filter_without_space.split(';') - filter_list = list() for i in items: - if '(' in i: - i = i.replace("(", "") - if ')' in i: - i = i.replace(")", "") + # if '(' in i: + # i = i.replace("(", "") + # if ')' in i: + # i = i.replace(")", "") filter_expr = i.split(',') if len(filter_expr) < 3: + raise BadRequestException( + 'ignore invalid filter {}'.format(i)) continue - filter_op = filter_expr[0] - filter_key = filter_expr[1] + filter_op = filter_expr[0].strip() + filter_key = filter_expr[1].strip() filter_vals = filter_expr[2:] - filter_list.extend(toFilterArgs( - filter_op, obj, filter_key, filter_vals)) - logger.info('Filter list length: %d' % len(filter_list)) - return filter_list - - -def toFilterArgs(operation: str, obj: ColumnElement, key: str, values: list): - if not hasattr(obj, key): - logger.warning('Filter attrName %s not in Object %s.' % - (key, str(obj))) - raise BadRequestException( - 'Filter attrName {} not in the Object'.format(key)) - - if operation in ['eq', 'neq', 'gt', 'lt', 'gte', 'lte']: - if len(values) != 1: - raise KeyError('Filter operation one is only support one value.') - elif operation in ['in', 'nin', 'cont', 'ncont']: - if len(values) == 0: - raise KeyError('Filter operation value is needed.') - else: - raise KeyError('Filter operation value not support.') - - ll = list() - if operation == 'eq': - val = values[0] - if val.lower() == 'null': - val = None - ll.append(getattr(obj, key) == val) - elif operation == 'neq': - val = values[0] - if val.lower() == 'null': - val = None - ll.append(getattr(obj, key) != val) - elif operation == 'gt': - val = values[0] - ll.append(getattr(obj, key) > val) - elif operation == 'lt': - val = values[0] - ll.append(getattr(obj, key) < val) - elif operation == 'gte': - val = values[0] - ll.append(getattr(obj, key) >= val) - elif operation == 'lte': - val = values[0] - ll.append(getattr(obj, key) <= val) - elif operation == 'in': - ll.append(getattr(obj, key).in_(values)) - elif operation == 'nin': - ll.append(~getattr(obj, key).in_(values)) - elif operation == 'cont': - val_list = list() - for val in values: - val_list.append(getattr(obj, key).contains(val)) - ll.append(or_(*val_list)) - elif operation == 'ncont': - val_list = list() - for val in values: - val_list.append(getattr(obj, key).contains(val)) - ll.append(~or_(*val_list)) - return ll + if filter_op in ["eq", "neq", "gt", "lt", "gte", "lte"]: + if len(filter_vals) != 1: + raise BadRequestException( + "Found {} values: {} while only single value" + " is allowed for operation {}".format( + len(filter_vals), filter_vals, filter_op) + ) + elif filter_op not in ["in", "nin", "cont", "ncont"]: + raise BadRequestException( + 'Filter operation {} is invalid'.format(filter_op) + ) + else: + pass + filter_key = transfer_filter_attr_name_in_special(obj, filter_key) + if not hasattr(obj, filter_key) or \ + filter_key in ['hash', 'updatetime', 'createtime', 'events']: + raise BadRequestException( + 'Filter attrName {} is invalid'.format(filter_key))