def gen_orm_filter(obj: ColumnElement, filter_str: str):
if not filter_str:
return []
- filter_without_space = filter_str.replace(" ", "")
+ # filter_without_space = filter_str.replace(" ", "")
+ filter_without_space = filter_str.strip(' ()')
items = filter_without_space.split(';')
filter_list = list()
for i in items:
- if '(' in i:
- i = i.replace("(", "")
- if ')' in i:
- i = i.replace(")", "")
+ # if '(' in i:
+ # i = i.replace("(", "")
+ # if ')' in i:
+ # i = i.replace(")", "")
filter_expr = i.split(',')
if len(filter_expr) < 3:
continue
- filter_op = filter_expr[0]
- filter_key = filter_expr[1]
+ filter_op = filter_expr[0].strip()
+ filter_key = filter_expr[1].strip()
filter_vals = filter_expr[2:]
filter_list.extend(toFilterArgs(
filter_op, obj, filter_key, filter_vals))
- logger.info('Filter list length: %d' % len(filter_list))
+ logger.debug('Filter list length: %d' % len(filter_list))
return filter_list
def toFilterArgs(operation: str, obj: ColumnElement, key: str, values: list):
- if not hasattr(obj, key):
- logger.warning('Filter attrName %s not in Object %s.' %
- (key, str(obj)))
- raise KeyError(
- 'Filter attrName {} not in the Object'.format(key))
+ # if not hasattr(obj, key):
+ # logger.warning('Filter attrName %s not in Object %s' %
+ # (key, str(obj)))
+ # raise KeyError(
+ # 'Filter attrName {} not in the Object'.format(key))
- if operation in ['eq', 'neq', 'gt', 'lt', 'gte', 'lte']:
- if len(values) != 1:
- raise KeyError(
- 'Filter operation one {} is only support one value.'.
- format(operation))
- elif operation in ['in', 'nin', 'cont', 'ncont']:
- if len(values) == 0:
- raise KeyError('Filter operation {} value is needed.'.
- format(operation))
- else:
- raise KeyError('Filter operation {} not support.'.format(operation))
+ # if operation in ['eq', 'neq', 'gt', 'lt', 'gte', 'lte']:
+ # if len(values) != 1:
+ # raise KeyError(
+ # 'Filter operation one {} is only support one value'.
+ # format(operation))
+ # elif operation in ['in', 'nin', 'cont', 'ncont']:
+ # if len(values) == 0:
+ # raise KeyError('Filter operation {} value is needed'.
+ # format(operation))
+ # else:
+ # raise KeyError('Filter operation {} not support'.format(operation))
ll = list()
if operation == 'eq':
# See the License for the specific language governing permissions and
# limitations under the License.
-import re
+# import re
from sqlalchemy.sql.elements import ColumnElement
from o2common.views.route_exception import BadRequestException
def check_filter(obj: ColumnElement, filter_str: str):
if not filter_str:
return
- pattern = r'^(\((eq|neq|gt|lt|gte|lte){1},\w+,[\w -\.]+\)\;?|' +\
- r'\((in|nin|cont|ncont){1},\w*(,[\w -\.]*)*\)\;?)+'
- result = re.match(pattern, filter_str)
- logger.warning('filter: {} match result is {}'.format(filter_str, result))
- if not result:
- raise BadRequestException(
- 'filter value formater not correct.')
+ # pattern = r'^(\((eq|neq|gt|lt|gte|lte){1},\w+,[\w -\.]+\)\;?|' +\
+ # r'\((in|nin|cont|ncont){1},\w*(,[\w -\.]*)*\)\;?)+'
+ # result = re.match(pattern, filter_str)
+ # logger.debug('filter: {} match result is {}'.format(filter_str, result))
+ # if not result:
+ # raise BadRequestException(
+ # 'filter value format is invalid')
check_filter_attribute(obj, filter_str)
def check_filter_attribute(obj: ColumnElement, filter_str: str):
- filter_without_space = filter_str.replace(" ", "")
+ # filter_without_space = filter_str.replace(" ", "")
+ filter_without_space = filter_str.strip(' ()')
+ logger.debug(
+ f"filter_str: {filter_str}, stripped: {filter_without_space}")
items = filter_without_space.split(';')
for i in items:
- if '(' in i:
- i = i.replace("(", "")
- if ')' in i:
- i = i.replace(")", "")
+ # if '(' in i:
+ # i = i.replace("(", "")
+ # if ')' in i:
+ # i = i.replace(")", "")
filter_expr = i.split(',')
if len(filter_expr) < 3:
raise BadRequestException(
- 'Filter {} formater not correct.'.format(i))
+ 'ignore invalid filter {}'.format(i))
continue
- # filter_op = filter_expr[0]
- filter_key = filter_expr[1]
- # filter_vals = filter_expr[2:]
+ filter_op = filter_expr[0].strip()
+ filter_key = filter_expr[1].strip()
+ filter_vals = filter_expr[2:]
+ if filter_op in ["eq", "neq", "gt", "lt", "gte", "lte"]:
+ if len(filter_vals) != 1:
+ raise BadRequestException(
+ "Found {} values: {} while only single value"
+ " is allowed for operation {}".format(
+ len(filter_vals), filter_vals, filter_op)
+ )
+ elif filter_op not in ["in", "nin", "cont", "ncont"]:
+ raise BadRequestException(
+ 'Filter operation {} is invalid'.format(filter_op)
+ )
+ else:
+ pass
if not hasattr(obj, filter_key):
raise BadRequestException(
- 'Filter attrName {} not in the Object'.format(filter_key))
+ 'Filter attrName {} is invalid'.format(filter_key))