X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=o2common%2Fdomain%2Ffilter.py;fp=o2common%2Fdomain%2Ffilter.py;h=80fe322c46a6159f35bda8c276133590eab430fa;hb=0e743715cabed900807aa0ba9ade459006472274;hp=0000000000000000000000000000000000000000;hpb=738f7c66e2532a1d5c690860fb2144782edfb820;p=pti%2Fo2.git diff --git a/o2common/domain/filter.py b/o2common/domain/filter.py new file mode 100644 index 0000000..80fe322 --- /dev/null +++ b/o2common/domain/filter.py @@ -0,0 +1,102 @@ +# Copyright (C) 2021-2022 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from sqlalchemy.sql.elements import ColumnElement +from sqlalchemy import or_ + +from o2common.helper import o2logging +logger = o2logging.get_logger(__name__) + + +def gen_orm_filter(obj: ColumnElement, filter_str: str): + if not filter_str: + return [] + filter_without_space = filter_str.replace(" ", "") + items = filter_without_space.split(';') + + filter_list = list() + for i in items: + if '(' in i: + i = i.replace("(", "") + if ')' in i: + i = i.replace(")", "") + filter_expr = i.split(',') + if len(filter_expr) < 3: + continue + filter_op = filter_expr[0] + filter_key = filter_expr[1] + filter_vals = filter_expr[2:] + filter_list.extend(toFilterArgs( + filter_op, obj, filter_key, filter_vals)) + logger.info('Filter list length: %d' % len(filter_list)) + return filter_list + + +def toFilterArgs(operation: str, obj: ColumnElement, key: str, values: list): + if not hasattr(obj, key): + logger.warning('Filter attrName %s not in Object %s.' % + (key, str(obj))) + raise KeyError( + 'Filter attrName {} not in the Object'.format(key)) + + if operation in ['eq', 'neq', 'gt', 'lt', 'gte', 'lte']: + if len(values) != 1: + raise KeyError( + 'Filter operation one {} is only support one value.'. + format(operation)) + elif operation in ['in', 'nin', 'cont', 'ncont']: + if len(values) == 0: + raise KeyError('Filter operation {} value is needed.'. + format(operation)) + else: + raise KeyError('Filter operation {} not support.'.format(operation)) + + ll = list() + if operation == 'eq': + val = values[0] + if val.lower() == 'null': + val = None + ll.append(getattr(obj, key) == val) + elif operation == 'neq': + val = values[0] + if val.lower() == 'null': + val = None + ll.append(getattr(obj, key) != val) + elif operation == 'gt': + val = values[0] + ll.append(getattr(obj, key) > val) + elif operation == 'lt': + val = values[0] + ll.append(getattr(obj, key) < val) + elif operation == 'gte': + val = values[0] + ll.append(getattr(obj, key) >= val) + elif operation == 'lte': + val = values[0] + ll.append(getattr(obj, key) <= val) + elif operation == 'in': + ll.append(getattr(obj, key).in_(values)) + elif operation == 'nin': + ll.append(~getattr(obj, key).in_(values)) + elif operation == 'cont': + val_list = list() + for val in values: + val_list.append(getattr(obj, key).contains(val)) + ll.append(or_(*val_list)) + elif operation == 'ncont': + val_list = list() + for val in values: + val_list.append(getattr(obj, key).contains(val)) + ll.append(~or_(*val_list)) + return ll