From: Zhang Rong(Jon) Date: Wed, 9 Nov 2022 12:39:02 +0000 (+0800) Subject: Fix INF-346 and INF-347 subscription filter X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=8bf75e2f14a0b44fb5d9b771f3976e3ad80d7ae2;p=pti%2Fo2.git Fix INF-346 and INF-347 subscription filter Issue-ID: INF-346 Issue-ID: INF-347 Signed-off-by: Zhang Rong(Jon) Change-Id: I0577d72b725884edf0ba69ccef1e8ba1174a3949 --- diff --git a/o2common/domain/filter.py b/o2common/domain/filter.py new file mode 100644 index 0000000..80fe322 --- /dev/null +++ b/o2common/domain/filter.py @@ -0,0 +1,102 @@ +# Copyright (C) 2021-2022 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from sqlalchemy.sql.elements import ColumnElement +from sqlalchemy import or_ + +from o2common.helper import o2logging +logger = o2logging.get_logger(__name__) + + +def gen_orm_filter(obj: ColumnElement, filter_str: str): + if not filter_str: + return [] + filter_without_space = filter_str.replace(" ", "") + items = filter_without_space.split(';') + + filter_list = list() + for i in items: + if '(' in i: + i = i.replace("(", "") + if ')' in i: + i = i.replace(")", "") + filter_expr = i.split(',') + if len(filter_expr) < 3: + continue + filter_op = filter_expr[0] + filter_key = filter_expr[1] + filter_vals = filter_expr[2:] + filter_list.extend(toFilterArgs( + filter_op, obj, filter_key, filter_vals)) + logger.info('Filter list length: %d' % len(filter_list)) + return filter_list + + +def toFilterArgs(operation: str, obj: ColumnElement, key: str, values: list): + if not hasattr(obj, key): + logger.warning('Filter attrName %s not in Object %s.' % + (key, str(obj))) + raise KeyError( + 'Filter attrName {} not in the Object'.format(key)) + + if operation in ['eq', 'neq', 'gt', 'lt', 'gte', 'lte']: + if len(values) != 1: + raise KeyError( + 'Filter operation one {} is only support one value.'. + format(operation)) + elif operation in ['in', 'nin', 'cont', 'ncont']: + if len(values) == 0: + raise KeyError('Filter operation {} value is needed.'. + format(operation)) + else: + raise KeyError('Filter operation {} not support.'.format(operation)) + + ll = list() + if operation == 'eq': + val = values[0] + if val.lower() == 'null': + val = None + ll.append(getattr(obj, key) == val) + elif operation == 'neq': + val = values[0] + if val.lower() == 'null': + val = None + ll.append(getattr(obj, key) != val) + elif operation == 'gt': + val = values[0] + ll.append(getattr(obj, key) > val) + elif operation == 'lt': + val = values[0] + ll.append(getattr(obj, key) < val) + elif operation == 'gte': + val = values[0] + ll.append(getattr(obj, key) >= val) + elif operation == 'lte': + val = values[0] + ll.append(getattr(obj, key) <= val) + elif operation == 'in': + ll.append(getattr(obj, key).in_(values)) + elif operation == 'nin': + ll.append(~getattr(obj, key).in_(values)) + elif operation == 'cont': + val_list = list() + for val in values: + val_list.append(getattr(obj, key).contains(val)) + ll.append(or_(*val_list)) + elif operation == 'ncont': + val_list = list() + for val in values: + val_list.append(getattr(obj, key).contains(val)) + ll.append(~or_(*val_list)) + return ll diff --git a/o2common/views/view.py b/o2common/views/view.py index 6d55b71..2a01005 100644 --- a/o2common/views/view.py +++ b/o2common/views/view.py @@ -12,22 +12,52 @@ # See the License for the specific language governing permissions and # limitations under the License. +import re from sqlalchemy.sql.elements import ColumnElement -from sqlalchemy import or_ from o2common.views.route_exception import BadRequestException +from o2common.domain.filter import gen_orm_filter from o2common.helper import o2logging logger = o2logging.get_logger(__name__) def gen_filter(obj: ColumnElement, filter_str: str): - if filter_str == '': - return [] + check_filter(obj, filter_str) + try: + filter_list = gen_orm_filter(obj, filter_str) + except KeyError as e: + raise BadRequestException(e.args[0]) + return filter_list + + +# The regular expressions testing example put on here +# (neq,testkey,value-1) +# (neq,testkey,value-1,value-2) +# (gt,hello,1) +# (gte,world,2) +# (lt,testlt,notint) +# (ncont,key1,v1,v_2) +# (gt,hello,1);(ncont,world,val1,val-2) +# (eq,wrong,60cba7be-e2cd-3b8c-a7ff-16e0f10573f9) +# (eq,description,value key) +def check_filter(obj: ColumnElement, filter_str: str): + if not filter_str: + return + pattern = r'^(\((eq|neq|gt|lt|gte|lte){1},\w+,[\w -]+\)\;?|' +\ + r'\((in|nin|cont|ncont){1},\w*(,[\w -]*)*\)\;?)+' + result = re.match(pattern, filter_str) + logger.warning('filter: {} match result is {}'.format(filter_str, result)) + if not result: + raise BadRequestException( + 'filter value formater not correct.') + check_filter_attribute(obj, filter_str) + + +def check_filter_attribute(obj: ColumnElement, filter_str: str): filter_without_space = filter_str.replace(" ", "") items = filter_without_space.split(';') - filter_list = list() for i in items: if '(' in i: i = i.replace("(", "") @@ -35,67 +65,12 @@ def gen_filter(obj: ColumnElement, filter_str: str): i = i.replace(")", "") filter_expr = i.split(',') if len(filter_expr) < 3: + raise BadRequestException( + 'Filter {} formater not correct.'.format(i)) continue - filter_op = filter_expr[0] + # filter_op = filter_expr[0] filter_key = filter_expr[1] - filter_vals = filter_expr[2:] - filter_list.extend(toFilterArgs( - filter_op, obj, filter_key, filter_vals)) - logger.info('Filter list length: %d' % len(filter_list)) - return filter_list - - -def toFilterArgs(operation: str, obj: ColumnElement, key: str, values: list): - if not hasattr(obj, key): - logger.warning('Filter attrName %s not in Object %s.' % - (key, str(obj))) - raise BadRequestException( - 'Filter attrName {} not in the Object'.format(key)) - - if operation in ['eq', 'neq', 'gt', 'lt', 'gte', 'lte']: - if len(values) != 1: - raise KeyError('Filter operation one is only support one value.') - elif operation in ['in', 'nin', 'cont', 'ncont']: - if len(values) == 0: - raise KeyError('Filter operation value is needed.') - else: - raise KeyError('Filter operation value not support.') - - ll = list() - if operation == 'eq': - val = values[0] - if val.lower() == 'null': - val = None - ll.append(getattr(obj, key) == val) - elif operation == 'neq': - val = values[0] - if val.lower() == 'null': - val = None - ll.append(getattr(obj, key) != val) - elif operation == 'gt': - val = values[0] - ll.append(getattr(obj, key) > val) - elif operation == 'lt': - val = values[0] - ll.append(getattr(obj, key) < val) - elif operation == 'gte': - val = values[0] - ll.append(getattr(obj, key) >= val) - elif operation == 'lte': - val = values[0] - ll.append(getattr(obj, key) <= val) - elif operation == 'in': - ll.append(getattr(obj, key).in_(values)) - elif operation == 'nin': - ll.append(~getattr(obj, key).in_(values)) - elif operation == 'cont': - val_list = list() - for val in values: - val_list.append(getattr(obj, key).contains(val)) - ll.append(or_(*val_list)) - elif operation == 'ncont': - val_list = list() - for val in values: - val_list.append(getattr(obj, key).contains(val)) - ll.append(~or_(*val_list)) - return ll + # filter_vals = filter_expr[2:] + if not hasattr(obj, filter_key): + raise BadRequestException( + 'Filter attrName {} not in the Object'.format(filter_key)) diff --git a/o2ims/service/command/notify_alarm_handler.py b/o2ims/service/command/notify_alarm_handler.py index b4e22a1..8ee4dd8 100644 --- a/o2ims/service/command/notify_alarm_handler.py +++ b/o2ims/service/command/notify_alarm_handler.py @@ -12,20 +12,24 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json # import redis # import requests +import ssl +import json from urllib.parse import urlparse # from o2common.config import config +from o2common.domain.filter import gen_orm_filter from o2common.service.unit_of_work import AbstractUnitOfWork -from o2ims.domain import commands -from o2ims.domain.alarm_obj import AlarmSubscription, AlarmEvent2SMO -import ssl from o2common.service.command.handler import get_https_conn_default from o2common.service.command.handler import get_http_conn from o2common.service.command.handler import get_https_conn_selfsigned from o2common.service.command.handler import post_data + +from o2ims.domain import commands +from o2ims.domain.alarm_obj import AlarmSubscription, AlarmEvent2SMO, \ + AlarmEventRecord + from o2common.helper import o2logging logger = o2logging.get_logger(__name__) @@ -34,7 +38,7 @@ def notify_alarm_to_smo( cmd: commands.PubAlarm2SMO, uow: AbstractUnitOfWork, ): - logger.info('In notify_alarm_to_smo') + logger.debug('In notify_alarm_to_smo') data = cmd.data with uow: subs = uow.alarm_subscriptions.list() @@ -43,6 +47,30 @@ def notify_alarm_to_smo( logger.debug('Alarm Subscription: {}'.format( sub_data['alarmSubscriptionId'])) + alarm = uow.alarm_event_records.get(data.id) + if alarm is None: + logger.debug('Alarm Event {} does not exists.'.format(data.id)) + continue + if sub_data.get('filter', None): + try: + args = gen_orm_filter(AlarmEventRecord, sub_data['filter']) + except KeyError: + logger.warning( + 'Alarm Subscription {} filter {} has wrong attribute ' + 'name or value. Ignore the filter'.format( + sub_data['alarmSubscriptionId'], + sub_data['filter'])) + callback_smo(sub, data) + continue + args.append(AlarmEventRecord.alarmEventRecordId == data.id) + ret = uow.alarm_event_records.list_with_count(*args) + if ret[0] != 0: + logger.debug( + 'Alarm Event {} skip for subscription {} because of ' + 'the filter.' + .format(data.id, sub_data['alarmSubscriptionId'])) + continue + callback_smo(sub, data) diff --git a/o2ims/service/command/notify_handler.py b/o2ims/service/command/notify_handler.py index 8f134af..e7b41e1 100644 --- a/o2ims/service/command/notify_handler.py +++ b/o2ims/service/command/notify_handler.py @@ -12,20 +12,23 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json # import redis # import requests +import json +import ssl from urllib.parse import urlparse # from o2common.config import config +from o2common.domain.filter import gen_orm_filter from o2common.service.unit_of_work import AbstractUnitOfWork -from o2ims.domain import commands -from o2ims.domain.subscription_obj import Subscription, Message2SMO from o2common.service.command.handler import get_https_conn_default from o2common.service.command.handler import get_http_conn from o2common.service.command.handler import get_https_conn_selfsigned from o2common.service.command.handler import post_data -import ssl + +from o2ims.domain import commands, ocloud +from o2ims.domain.subscription_obj import Subscription, Message2SMO + from o2common.helper import o2logging logger = o2logging.get_logger(__name__) @@ -38,28 +41,37 @@ def notify_change_to_smo( cmd: commands.PubMessage2SMO, uow: AbstractUnitOfWork, ): - logger.info('In notify_change_to_smo') + logger.debug('In notify_change_to_smo') data = cmd.data with uow: subs = uow.subscriptions.list() for sub in subs: sub_data = sub.serialize() logger.debug('Subscription: {}'.format(sub_data['subscriptionId'])) - - try: - resource_filter = json.loads(sub_data['filter']) - if len(resource_filter) > 0: - resource = uow.resources.get(data.id) - logger.debug(type(resource)) - if resource: # TODO deal with resource is empty - res_type_id = resource.serialize()['resourceTypeId'] - resourcetype = uow.resource_types.get(res_type_id) - logger.debug(resourcetype.name) - if resourcetype.name not in resource_filter: - continue - except json.decoder.JSONDecodeError as err: - logger.warning( - 'subscription filter decode json failed: {}'.format(err)) + resource = uow.resources.get(data.id) + if resource is None: + logger.debug('Resource {} does not exists.'.format(data.id)) + continue + res_pool_id = resource.serialize()['resourcePoolId'] + logger.debug('res pool id is {}'.format(res_pool_id)) + if sub_data.get('filter', None): + try: + args = gen_orm_filter(ocloud.Resource, sub_data['filter']) + except KeyError: + logger.error( + 'Subscription {} filter {} has wrong attribute name ' + 'or value. Ignore the filter.'.format( + sub_data['subscriptionId'], sub_data['filter'])) + callback_smo(sub, data) + continue + args.append(ocloud.Resource.resourceId == data.id) + ret = uow.resources.list_with_count(res_pool_id, *args) + if ret[0] != 0: + logger.debug( + 'Resource {} skip for subscription {} because of the ' + 'filter.' + .format(data.id, sub_data['subscriptionId'])) + continue callback_smo(sub, data) diff --git a/o2ims/views/alarm_route.py b/o2ims/views/alarm_route.py index 11fad03..45fa4ef 100644 --- a/o2ims/views/alarm_route.py +++ b/o2ims/views/alarm_route.py @@ -17,7 +17,8 @@ from flask_restx import Resource, reqparse from o2common.service.messagebus import MessageBus from o2common.views.pagination_route import link_header, PAGE_PARAM -from o2common.views.route_exception import NotFoundException +from o2common.views.route_exception import NotFoundException, \ + BadRequestException from o2ims.views import alarm_view from o2ims.views.api_ns import api_ims_monitoring as api_monitoring_v1 from o2ims.views.alarm_dto import AlarmDTO, SubscriptionDTO @@ -192,6 +193,9 @@ class SubscriptionsListRouter(Resource): mask='{alarmSubscriptionId,callback,consumerSubscriptionId,filter}') def post(self): data = api_monitoring_v1.payload + callback = data.get('callback', None) + if not callback: + raise BadRequestException('The callback parameter is required') result = alarm_view.subscription_create(data, bus.uow) return result, 201 diff --git a/o2ims/views/alarm_view.py b/o2ims/views/alarm_view.py index b3a9e4a..bb238be 100644 --- a/o2ims/views/alarm_view.py +++ b/o2ims/views/alarm_view.py @@ -16,7 +16,7 @@ import uuid as uuid from o2common.service import unit_of_work from o2common.views.pagination_view import Pagination -from o2common.views.view import gen_filter +from o2common.views.view import gen_filter, check_filter from o2ims.views.alarm_dto import SubscriptionDTO from o2ims.domain.alarm_obj import AlarmSubscription, AlarmEventRecord @@ -61,12 +61,15 @@ def subscription_one(subscriptionId: str, def subscription_create(subscriptionDto: SubscriptionDTO.subscription_create, uow: unit_of_work.AbstractUnitOfWork): + filter = subscriptionDto.get('filter', '') + consumer_subs_id = subscriptionDto.get('consumerSubscriptionId', '') + + check_filter(AlarmEventRecord, filter) sub_uuid = str(uuid.uuid4()) subscription = AlarmSubscription( sub_uuid, subscriptionDto['callback'], - subscriptionDto['consumerSubscriptionId'], - subscriptionDto['filter']) + consumer_subs_id, filter) with uow: uow.alarm_subscriptions.add(subscription) uow.commit() diff --git a/o2ims/views/ocloud_route.py b/o2ims/views/ocloud_route.py index af5e8c6..434a431 100644 --- a/o2ims/views/ocloud_route.py +++ b/o2ims/views/ocloud_route.py @@ -17,7 +17,8 @@ from flask_restx import Resource, reqparse from o2common.service.messagebus import MessageBus from o2common.views.pagination_route import link_header, PAGE_PARAM -from o2common.views.route_exception import NotFoundException +from o2common.views.route_exception import NotFoundException, \ + BadRequestException from o2ims.domain import ocloud from o2ims.views import ocloud_view from o2ims.views.api_ns import api_ims_inventory as api_ims_inventory_v1 @@ -517,6 +518,9 @@ class SubscriptionsListRouter(Resource): mask='{subscriptionId,callback,consumerSubscriptionId,filter}') def post(self): data = api_ims_inventory_v1.payload + callback = data.get('callback', None) + if not callback: + raise BadRequestException('The callback parameter is required') result = ocloud_view.subscription_create(data, bus.uow) return result, 201 diff --git a/o2ims/views/ocloud_view.py b/o2ims/views/ocloud_view.py index 953adc9..215c917 100644 --- a/o2ims/views/ocloud_view.py +++ b/o2ims/views/ocloud_view.py @@ -22,7 +22,7 @@ import shutil from o2common.service import unit_of_work from o2common.config import config from o2common.views.pagination_view import Pagination -from o2common.views.view import gen_filter +from o2common.views.view import gen_filter, check_filter from o2ims.domain import ocloud from o2ims.views.ocloud_dto import SubscriptionDTO from o2ims.domain.subscription_obj import Subscription @@ -220,12 +220,15 @@ def subscription_one(subscriptionId: str, def subscription_create(subscriptionDto: SubscriptionDTO.subscription_create, uow: unit_of_work.AbstractUnitOfWork): + filter = subscriptionDto.get('filter', '') + consumer_subs_id = subscriptionDto.get('consumerSubscriptionId', '') + + check_filter(ocloud.Resource, filter) sub_uuid = str(uuid.uuid4()) subscription = Subscription( sub_uuid, subscriptionDto['callback'], - subscriptionDto['consumerSubscriptionId'], - subscriptionDto['filter']) + consumer_subs_id, filter) with uow: uow.subscriptions.add(subscription) uow.commit() diff --git a/tests/unit/test_alarm.py b/tests/unit/test_alarm.py index 1cd48b8..e32ac8b 100644 --- a/tests/unit/test_alarm.py +++ b/tests/unit/test_alarm.py @@ -180,7 +180,7 @@ def test_flask_post(mock_flask_uow): resp = client.post(apibase+'/alarmSubscriptions', json={ 'callback': sub_callback, 'consumerSubscriptionId': 'consumerSubId1', - 'filter': 'empty' + 'filter': '' }) assert resp.status_code == 201 assert 'alarmSubscriptionId' in resp.get_json() diff --git a/tests/unit/test_ocloud.py b/tests/unit/test_ocloud.py index 3c0fde7..13a7d42 100644 --- a/tests/unit/test_ocloud.py +++ b/tests/unit/test_ocloud.py @@ -457,7 +457,7 @@ def test_flask_post(mock_flask_uow): resp = client.post(apibase+'/subscriptions', json={ 'callback': sub_callback, 'consumerSubscriptionId': 'consumerSubId1', - 'filter': 'empty' + 'filter': '' }) assert resp.status_code == 201 assert 'subscriptionId' in resp.get_json()