class Pagination:
def __init__(self, **kwargs) -> None:
# filter key should be the same with database name
- self.filter_kwargs = {}
+ self.pagination_kwargs = {}
self.limit = int(kwargs['per_page']) if 'per_page' in kwargs else 30
self.page = int(kwargs['page']) if 'page' in kwargs else 1
if self.page < 1:
self.page = 1
self.start = (self.page - 1) * self.limit
- self.filter_kwargs['limit'] = self.limit
- self.filter_kwargs['start'] = self.start
+ self.pagination_kwargs['limit'] = self.limit
+ self.pagination_kwargs['start'] = self.start
- def get_filter(self):
- return self.filter_kwargs
+ def get_pagination(self):
+ return self.pagination_kwargs
def get_result(self, ret: Tuple[int, List[Serializer]]):
count = ret[0]
resp = f(*args, **kwargs)
req_args = request.args
- mask = self._gen_mask_from_filter(**req_args)
+ mask = self._gen_mask_from_selector(**req_args)
# mask = self.mask
return wrapper
- def _gen_mask_from_filter(self, **kwargs) -> str:
+ def _gen_mask_from_selector(self, **kwargs) -> str:
mask_val = ''
if 'all_fields' in kwargs:
all_fields_without_space = kwargs['all_fields'].replace(" ", "")
# continue
# mask_val_list.append(f)
# mask_val = '{%s}' % ','.join(mask_val_list)
- default_fields = {}
+ selector = {}
- self.__update_filter_value(
- default_fields, fields_without_space, True)
+ self.__update_selector_value(selector, fields_without_space, True)
- mask_val = self.__gen_mask_from_filter_tree(default_fields)
+ mask_val = self.__gen_mask_from_selector(selector)
elif 'exclude_fields' in kwargs and kwargs['exclude_fields'] != '':
exclude_fields_without_space = kwargs['exclude_fields'].replace(
" ", "")
- default_fields = self.__gen_filter_tree_from_model_with_value(
+ selector = self.__gen_selector_from_model_with_value(
self.fields)
- self.__update_filter_value(
- default_fields, exclude_fields_without_space, False)
+ self.__update_selector_value(
+ selector, exclude_fields_without_space, False)
- mask_val = self.__gen_mask_from_filter_tree(default_fields)
+ mask_val = self.__gen_mask_from_selector(selector)
elif 'exclude_default' in kwargs and kwargs['exclude_default'] != '':
exclude_default_without_space = kwargs['exclude_default'].replace(
" ", "")
else:
return '{%s}' % f[0]
- def __gen_filter_tree_from_model_with_value(
+ def __gen_selector_from_model_with_value(
self, model: Model, default_val: bool = True) -> dict:
- filter = dict()
+ selector = dict()
for i in model:
if type(model[i]) is List:
if type(model[i].container) is String:
- filter[i] = default_val
+ selector[i] = default_val
continue
- filter[i] = self.__gen_filter_tree_from_model_with_value(
+ selector[i] = self.__gen_selector_from_model_with_value(
model[i].container.model, default_val)
continue
elif type(model[i]) is Nested:
- filter[i] = self.__gen_filter_tree_from_model_with_value(
+ selector[i] = self.__gen_selector_from_model_with_value(
model[i].model, default_val)
- filter[i] = default_val
- return filter
+ selector[i] = default_val
+ return selector
- def __update_filter_value(self, default_fields: dict, filter: str,
- val: bool):
+ def __update_selector_value(self, default_selector: dict, filter: str,
+ val: bool):
fields = filter.split(',')
for f in fields:
if '/' in f:
- self.__update_filter_tree_value(default_fields, f, val)
+ self.__update_selector_tree_value(default_selector, f, val)
continue
- default_fields[f] = val
+ default_selector[f] = val
- def __update_filter_tree_value(self, m: dict, filter: str, val: bool):
+ def __update_selector_tree_value(self, m: dict, filter: str, val: bool):
filter_list = filter.split('/', 1)
if filter_list[0] not in m:
m[filter_list[0]] = dict()
if len(filter_list) > 1:
- self.__update_filter_tree_value(
+ self.__update_selector_tree_value(
m[filter_list[0]], filter_list[1], val)
return
m[filter_list[0]] = val
- def __gen_mask_from_filter_tree(self, fields: dict) -> str:
+ def __gen_mask_from_selector(self, fields: dict) -> str:
mask_li = list()
for k, v in fields.items():
if type(v) is dict:
- s = self.__gen_mask_from_filter_tree(v)
+ s = self.__gen_mask_from_selector(v)
mask_li.append('%s%s' % (k, s))
continue
if v:
# limitations under the License.
from sqlalchemy.sql.elements import ColumnElement
+from sqlalchemy import or_
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
(key, str(obj)))
return []
+ if operation in ['eq', 'neq', 'gt', 'lt', 'gte', 'lte']:
+ if len(values) != 1:
+ raise KeyError('Filter operation one is only support one value.')
+ elif operation in ['in', 'nin', 'cont', 'ncont']:
+ if len(values) == 0:
+ raise KeyError('Filter operation value is needed.')
+ else:
+ raise KeyError('Filter operation value not support.')
+
ll = list()
if operation == 'eq':
- for val in values:
- if val.lower() == 'null':
- val = None
- ll.append(getattr(obj, key) == val)
+ val = values[0]
+ if val.lower() == 'null':
+ val = None
+ ll.append(getattr(obj, key) == val)
elif operation == 'neq':
- for val in values:
- if val.lower() == 'null':
- val = None
- ll.append(getattr(obj, key) != val)
+ val = values[0]
+ if val.lower() == 'null':
+ val = None
+ ll.append(getattr(obj, key) != val)
elif operation == 'gt':
- for val in values:
- ll.append(getattr(obj, key) > val)
+ val = values[0]
+ ll.append(getattr(obj, key) > val)
elif operation == 'lt':
- for val in values:
- ll.append(getattr(obj, key) < val)
+ val = values[0]
+ ll.append(getattr(obj, key) < val)
elif operation == 'gte':
- for val in values:
- ll.append(getattr(obj, key) >= val)
+ val = values[0]
+ ll.append(getattr(obj, key) >= val)
elif operation == 'lte':
- for val in values:
- ll.append(getattr(obj, key) <= val)
+ val = values[0]
+ ll.append(getattr(obj, key) <= val)
elif operation == 'in':
- pass
+ ll.append(getattr(obj, key).in_(values))
elif operation == 'nin':
- pass
- elif operation == 'count':
- pass
- elif operation == 'ncount':
- pass
- else:
- raise KeyError('Filter operation value not support.')
-
+ ll.append(~getattr(obj, key).in_(values))
+ elif operation == 'cont':
+ val_list = list()
+ for val in values:
+ val_list.append(getattr(obj, key).contains(val))
+ ll.append(or_(*val_list))
+ elif operation == 'ncont':
+ val_list = list()
+ for val in values:
+ val_list.append(getattr(obj, key).contains(val))
+ ll.append(~or_(*val_list))
return ll
'Exclude showing all default fields, Set "true" to enable.',
_in='query')
@api_monitoring_v1.param(
- 'exclude_default',
- 'Exclude showing all default fields, Set "true" to enable.',
+ 'filter',
+ 'Filter of the query.',
_in='query')
class AlarmListRouter(Resource):
'Exclude showing all default fields, Set "true" to enable.',
_in='query')
@api_monitoring_v1.param(
- 'exclude_default',
- 'Exclude showing all default fields, Set "true" to enable.',
+ 'filter',
+ 'Filter of the query.',
_in='query')
def get(self):
parser = reqparse.RequestParser()
def alarm_event_records(uow: unit_of_work.AbstractUnitOfWork, **kwargs):
pagination = Pagination(**kwargs)
- filter_kwargs = pagination.get_filter()
+ query_kwargs = pagination.get_pagination()
args = gen_filter(AlarmEventRecord,
kwargs['filter']) if 'filter' in kwargs else []
with uow:
- li = uow.alarm_event_records.list_with_count(*args, **filter_kwargs)
+ li = uow.alarm_event_records.list_with_count(*args, **query_kwargs)
return pagination.get_result(li)
def subscriptions(uow: unit_of_work.AbstractUnitOfWork, **kwargs):
pagination = Pagination(**kwargs)
- filter_kwargs = pagination.get_filter()
+ query_kwargs = pagination.get_pagination()
args = gen_filter(AlarmSubscription,
kwargs['filter']) if 'filter' in kwargs else []
with uow:
- li = uow.alarm_subscriptions.list_with_count(*args, **filter_kwargs)
+ li = uow.alarm_subscriptions.list_with_count(*args, **query_kwargs)
return pagination.get_result(li)
def resource_types(uow: unit_of_work.AbstractUnitOfWork, **kwargs):
pagination = Pagination(**kwargs)
- filter_kwargs = pagination.get_filter()
+ query_kwargs = pagination.get_pagination()
args = gen_filter(ocloud.ResourceType,
kwargs['filter']) if 'filter' in kwargs else []
with uow:
- li = uow.resource_types.list_with_count(*args, **filter_kwargs)
+ li = uow.resource_types.list_with_count(*args, **query_kwargs)
return pagination.get_result(li)
def resource_pools(uow: unit_of_work.AbstractUnitOfWork, **kwargs):
pagination = Pagination(**kwargs)
- filter_kwargs = pagination.get_filter()
+ query_kwargs = pagination.get_pagination()
args = gen_filter(ocloud.ResourcePool,
kwargs['filter']) if 'filter' in kwargs else []
with uow:
- li = uow.resource_pools.list_with_count(*args, **filter_kwargs)
+ li = uow.resource_pools.list_with_count(*args, **query_kwargs)
return pagination.get_result(li)
**kwargs):
pagination = Pagination(**kwargs)
# filter key should be the same with database name
- filter_kwargs = pagination.get_filter()
+ query_kwargs = pagination.get_pagination()
if 'resourceTypeName' in kwargs:
resource_type_name = kwargs['resourceTypeName']
with uow:
# restype_id = '' if len(restype_ids) == 0 else restype_ids[0]
res_type = uow.resource_types.get_by_name(resource_type_name)
restype_id = '' if res_type is None else res_type.resourceTypeId
- filter_kwargs['resourceTypeId'] = restype_id
+ query_kwargs['resourceTypeId'] = restype_id
args = gen_filter(
ocloud.Resource, kwargs['filter']) if 'filter' in kwargs else []
args.append(ocloud.Resource.resourcePoolId == resourcePoolId)
# args.append(ocloud.Resource.parentId == None)
if 'parentId' in kwargs:
- filter_kwargs['parentId'] = kwargs['parentId']
+ query_kwargs['parentId'] = kwargs['parentId']
if 'sort' in kwargs:
- filter_kwargs['sort'] = kwargs['sort']
+ query_kwargs['sort'] = kwargs['sort']
with uow:
ret = uow.resources.list_with_count(
- resourcePoolId, *args, **filter_kwargs)
+ resourcePoolId, *args, **query_kwargs)
return pagination.get_result(ret)
def deployment_managers(uow: unit_of_work.AbstractUnitOfWork, **kwargs):
pagination = Pagination(**kwargs)
- filter_kwargs = pagination.get_filter()
+ query_kwargs = pagination.get_pagination()
args = gen_filter(ocloud.DeploymentManager,
kwargs['filter']) if 'filter' in kwargs else []
with uow:
- li = uow.deployment_managers.list_with_count(*args, **filter_kwargs)
+ li = uow.deployment_managers.list_with_count(*args, **query_kwargs)
return pagination.get_result(li)
def subscriptions(uow: unit_of_work.AbstractUnitOfWork, **kwargs):
pagination = Pagination(**kwargs)
- filter_kwargs = pagination.get_filter()
- args = gen_filter(ocloud.DeploymentManager,
+ query_kwargs = pagination.get_pagination()
+ args = gen_filter(Subscription,
kwargs['filter']) if 'filter' in kwargs else []
with uow:
- li = uow.subscriptions.list_with_count(*args, **filter_kwargs)
+ li = uow.subscriptions.list_with_count(*args, **query_kwargs)
return pagination.get_result(li)