from sqlalchemy.sql.elements import ColumnElement
from sqlalchemy import or_
+from o2ims.domain.alarm_obj import AlarmEventRecord
+from o2ims.domain.ocloud import Ocloud
+
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
def gen_orm_filter(obj: ColumnElement, filter_str: str):
+ logger.debug(filter_str)
if not filter_str:
return []
- # filter_without_space = filter_str.replace(" ", "")
- filter_without_space = filter_str.strip(' ()')
+ filter_without_space = filter_str.strip()
items = filter_without_space.split(';')
filter_list = list()
for i in items:
- # if '(' in i:
- # i = i.replace("(", "")
- # if ')' in i:
- # i = i.replace(")", "")
- filter_expr = i.split(',')
+ item = i.strip(' ()')
+ filter_expr = item.split(',')
if len(filter_expr) < 3:
continue
filter_op = filter_expr[0].strip()
filter_key = filter_expr[1].strip()
+ if filter_key == 'objectType':
+ continue
filter_vals = filter_expr[2:]
filter_list.extend(toFilterArgs(
filter_op, obj, filter_key, filter_vals))
# format(operation))
# else:
# raise KeyError('Filter operation {} not support'.format(operation))
+ key = transfer_filter_attr_name_in_special(obj, key)
ll = list()
if operation == 'eq':
val_list.append(getattr(obj, key).contains(val))
ll.append(~or_(*val_list))
return ll
+
+
+def transfer_filter_attr_name_in_special(obj: ColumnElement, filter_key: str):
+ if obj == AlarmEventRecord:
+ if filter_key == 'resourceTypeID':
+ filter_key = 'resourceTypeId'
+ elif filter_key == 'resourceID':
+ filter_key = 'resourceId'
+ elif filter_key == 'alarmDefinitionID':
+ filter_key = 'alarmDefinitionId'
+ elif filter_key == 'probableCauseID':
+ filter_key = 'probableCauseId'
+ elif obj == Ocloud:
+ if filter_key == 'globalcloudId':
+ filter_key = 'globalCloudId'
+ return filter_key