Fix INF-346 and INF-347 subscription filter 57/9557/3
authorZhang Rong(Jon) <rong.zhang@windriver.com>
Wed, 9 Nov 2022 12:39:02 +0000 (20:39 +0800)
committerZhang Rong(Jon) <rong.zhang@windriver.com>
Thu, 10 Nov 2022 02:04:26 +0000 (10:04 +0800)
Issue-ID: INF-346
Issue-ID: INF-347
Signed-off-by: Zhang Rong(Jon) <rong.zhang@windriver.com>
Change-Id: I0577d72b725884edf0ba69ccef1e8ba1174a3949

o2common/domain/filter.py [new file with mode: 0644]
o2common/views/view.py
o2ims/service/command/notify_alarm_handler.py
o2ims/service/command/notify_handler.py
o2ims/views/alarm_route.py
o2ims/views/alarm_view.py
o2ims/views/ocloud_route.py
o2ims/views/ocloud_view.py
tests/unit/test_alarm.py
tests/unit/test_ocloud.py

diff --git a/o2common/domain/filter.py b/o2common/domain/filter.py
new file mode 100644 (file)
index 0000000..80fe322
--- /dev/null
@@ -0,0 +1,102 @@
+# Copyright (C) 2021-2022 Wind River Systems, Inc.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+from sqlalchemy.sql.elements import ColumnElement
+from sqlalchemy import or_
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+def gen_orm_filter(obj: ColumnElement, filter_str: str):
+    if not filter_str:
+        return []
+    filter_without_space = filter_str.replace(" ", "")
+    items = filter_without_space.split(';')
+
+    filter_list = list()
+    for i in items:
+        if '(' in i:
+            i = i.replace("(", "")
+        if ')' in i:
+            i = i.replace(")", "")
+        filter_expr = i.split(',')
+        if len(filter_expr) < 3:
+            continue
+        filter_op = filter_expr[0]
+        filter_key = filter_expr[1]
+        filter_vals = filter_expr[2:]
+        filter_list.extend(toFilterArgs(
+            filter_op, obj, filter_key, filter_vals))
+    logger.info('Filter list length: %d' % len(filter_list))
+    return filter_list
+
+
+def toFilterArgs(operation: str, obj: ColumnElement, key: str, values: list):
+    if not hasattr(obj, key):
+        logger.warning('Filter attrName %s not in Object %s.' %
+                       (key, str(obj)))
+        raise KeyError(
+            'Filter attrName {} not in the Object'.format(key))
+
+    if operation in ['eq', 'neq', 'gt', 'lt', 'gte', 'lte']:
+        if len(values) != 1:
+            raise KeyError(
+                'Filter operation one {} is only support one value.'.
+                format(operation))
+    elif operation in ['in', 'nin', 'cont', 'ncont']:
+        if len(values) == 0:
+            raise KeyError('Filter operation {} value is needed.'.
+                           format(operation))
+    else:
+        raise KeyError('Filter operation {} not support.'.format(operation))
+
+    ll = list()
+    if operation == 'eq':
+        val = values[0]
+        if val.lower() == 'null':
+            val = None
+        ll.append(getattr(obj, key) == val)
+    elif operation == 'neq':
+        val = values[0]
+        if val.lower() == 'null':
+            val = None
+        ll.append(getattr(obj, key) != val)
+    elif operation == 'gt':
+        val = values[0]
+        ll.append(getattr(obj, key) > val)
+    elif operation == 'lt':
+        val = values[0]
+        ll.append(getattr(obj, key) < val)
+    elif operation == 'gte':
+        val = values[0]
+        ll.append(getattr(obj, key) >= val)
+    elif operation == 'lte':
+        val = values[0]
+        ll.append(getattr(obj, key) <= val)
+    elif operation == 'in':
+        ll.append(getattr(obj, key).in_(values))
+    elif operation == 'nin':
+        ll.append(~getattr(obj, key).in_(values))
+    elif operation == 'cont':
+        val_list = list()
+        for val in values:
+            val_list.append(getattr(obj, key).contains(val))
+        ll.append(or_(*val_list))
+    elif operation == 'ncont':
+        val_list = list()
+        for val in values:
+            val_list.append(getattr(obj, key).contains(val))
+        ll.append(~or_(*val_list))
+    return ll
index 6d55b71..2a01005 100644 (file)
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
+import re
 from sqlalchemy.sql.elements import ColumnElement
-from sqlalchemy import or_
 
 from o2common.views.route_exception import BadRequestException
+from o2common.domain.filter import gen_orm_filter
 
 from o2common.helper import o2logging
 logger = o2logging.get_logger(__name__)
 
 
 def gen_filter(obj: ColumnElement, filter_str: str):
-    if filter_str == '':
-        return []
+    check_filter(obj, filter_str)
+    try:
+        filter_list = gen_orm_filter(obj, filter_str)
+    except KeyError as e:
+        raise BadRequestException(e.args[0])
+    return filter_list
+
+
+# The regular expressions testing example put on here
+# (neq,testkey,value-1)
+# (neq,testkey,value-1,value-2)
+# (gt,hello,1)
+# (gte,world,2)
+# (lt,testlt,notint)
+# (ncont,key1,v1,v_2)
+# (gt,hello,1);(ncont,world,val1,val-2)
+# (eq,wrong,60cba7be-e2cd-3b8c-a7ff-16e0f10573f9)
+# (eq,description,value key)
+def check_filter(obj: ColumnElement, filter_str: str):
+    if not filter_str:
+        return
+    pattern = r'^(\((eq|neq|gt|lt|gte|lte){1},\w+,[\w -]+\)\;?|' +\
+        r'\((in|nin|cont|ncont){1},\w*(,[\w -]*)*\)\;?)+'
+    result = re.match(pattern, filter_str)
+    logger.warning('filter: {} match result is {}'.format(filter_str, result))
+    if not result:
+        raise BadRequestException(
+            'filter value formater not correct.')
+    check_filter_attribute(obj, filter_str)
+
+
+def check_filter_attribute(obj: ColumnElement, filter_str: str):
     filter_without_space = filter_str.replace(" ", "")
     items = filter_without_space.split(';')
 
-    filter_list = list()
     for i in items:
         if '(' in i:
             i = i.replace("(", "")
@@ -35,67 +65,12 @@ def gen_filter(obj: ColumnElement, filter_str: str):
             i = i.replace(")", "")
         filter_expr = i.split(',')
         if len(filter_expr) < 3:
+            raise BadRequestException(
+                'Filter {} formater not correct.'.format(i))
             continue
-        filter_op = filter_expr[0]
+        filter_op = filter_expr[0]
         filter_key = filter_expr[1]
-        filter_vals = filter_expr[2:]
-        filter_list.extend(toFilterArgs(
-            filter_op, obj, filter_key, filter_vals))
-    logger.info('Filter list length: %d' % len(filter_list))
-    return filter_list
-
-
-def toFilterArgs(operation: str, obj: ColumnElement, key: str, values: list):
-    if not hasattr(obj, key):
-        logger.warning('Filter attrName %s not in Object %s.' %
-                       (key, str(obj)))
-        raise BadRequestException(
-            'Filter attrName {} not in the Object'.format(key))
-
-    if operation in ['eq', 'neq', 'gt', 'lt', 'gte', 'lte']:
-        if len(values) != 1:
-            raise KeyError('Filter operation one is only support one value.')
-    elif operation in ['in', 'nin', 'cont', 'ncont']:
-        if len(values) == 0:
-            raise KeyError('Filter operation value is needed.')
-    else:
-        raise KeyError('Filter operation value not support.')
-
-    ll = list()
-    if operation == 'eq':
-        val = values[0]
-        if val.lower() == 'null':
-            val = None
-        ll.append(getattr(obj, key) == val)
-    elif operation == 'neq':
-        val = values[0]
-        if val.lower() == 'null':
-            val = None
-        ll.append(getattr(obj, key) != val)
-    elif operation == 'gt':
-        val = values[0]
-        ll.append(getattr(obj, key) > val)
-    elif operation == 'lt':
-        val = values[0]
-        ll.append(getattr(obj, key) < val)
-    elif operation == 'gte':
-        val = values[0]
-        ll.append(getattr(obj, key) >= val)
-    elif operation == 'lte':
-        val = values[0]
-        ll.append(getattr(obj, key) <= val)
-    elif operation == 'in':
-        ll.append(getattr(obj, key).in_(values))
-    elif operation == 'nin':
-        ll.append(~getattr(obj, key).in_(values))
-    elif operation == 'cont':
-        val_list = list()
-        for val in values:
-            val_list.append(getattr(obj, key).contains(val))
-        ll.append(or_(*val_list))
-    elif operation == 'ncont':
-        val_list = list()
-        for val in values:
-            val_list.append(getattr(obj, key).contains(val))
-        ll.append(~or_(*val_list))
-    return ll
+        # filter_vals = filter_expr[2:]
+        if not hasattr(obj, filter_key):
+            raise BadRequestException(
+                'Filter attrName {} not in the Object'.format(filter_key))
index b4e22a1..8ee4dd8 100644 (file)
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
-import json
 # import redis
 # import requests
+import ssl
+import json
 from urllib.parse import urlparse
 
 # from o2common.config import config
+from o2common.domain.filter import gen_orm_filter
 from o2common.service.unit_of_work import AbstractUnitOfWork
-from o2ims.domain import commands
-from o2ims.domain.alarm_obj import AlarmSubscription, AlarmEvent2SMO
-import ssl
 from o2common.service.command.handler import get_https_conn_default
 from o2common.service.command.handler import get_http_conn
 from o2common.service.command.handler import get_https_conn_selfsigned
 from o2common.service.command.handler import post_data
+
+from o2ims.domain import commands
+from o2ims.domain.alarm_obj import AlarmSubscription, AlarmEvent2SMO, \
+    AlarmEventRecord
+
 from o2common.helper import o2logging
 logger = o2logging.get_logger(__name__)
 
@@ -34,7 +38,7 @@ def notify_alarm_to_smo(
     cmd: commands.PubAlarm2SMO,
     uow: AbstractUnitOfWork,
 ):
-    logger.info('In notify_alarm_to_smo')
+    logger.debug('In notify_alarm_to_smo')
     data = cmd.data
     with uow:
         subs = uow.alarm_subscriptions.list()
@@ -43,6 +47,30 @@ def notify_alarm_to_smo(
             logger.debug('Alarm Subscription: {}'.format(
                 sub_data['alarmSubscriptionId']))
 
+            alarm = uow.alarm_event_records.get(data.id)
+            if alarm is None:
+                logger.debug('Alarm Event {} does not exists.'.format(data.id))
+                continue
+            if sub_data.get('filter', None):
+                try:
+                    args = gen_orm_filter(AlarmEventRecord, sub_data['filter'])
+                except KeyError:
+                    logger.warning(
+                        'Alarm Subscription {} filter {} has wrong attribute '
+                        'name or value. Ignore the filter'.format(
+                            sub_data['alarmSubscriptionId'],
+                            sub_data['filter']))
+                    callback_smo(sub, data)
+                    continue
+                args.append(AlarmEventRecord.alarmEventRecordId == data.id)
+                ret = uow.alarm_event_records.list_with_count(*args)
+                if ret[0] != 0:
+                    logger.debug(
+                        'Alarm Event {} skip for subscription {} because of '
+                        'the filter.'
+                        .format(data.id, sub_data['alarmSubscriptionId']))
+                    continue
+
             callback_smo(sub, data)
 
 
index 8f134af..e7b41e1 100644 (file)
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
-import json
 # import redis
 # import requests
+import json
+import ssl
 from urllib.parse import urlparse
 
 # from o2common.config import config
+from o2common.domain.filter import gen_orm_filter
 from o2common.service.unit_of_work import AbstractUnitOfWork
-from o2ims.domain import commands
-from o2ims.domain.subscription_obj import Subscription, Message2SMO
 from o2common.service.command.handler import get_https_conn_default
 from o2common.service.command.handler import get_http_conn
 from o2common.service.command.handler import get_https_conn_selfsigned
 from o2common.service.command.handler import post_data
-import ssl
+
+from o2ims.domain import commands, ocloud
+from o2ims.domain.subscription_obj import Subscription, Message2SMO
+
 from o2common.helper import o2logging
 logger = o2logging.get_logger(__name__)
 
@@ -38,28 +41,37 @@ def notify_change_to_smo(
     cmd: commands.PubMessage2SMO,
     uow: AbstractUnitOfWork,
 ):
-    logger.info('In notify_change_to_smo')
+    logger.debug('In notify_change_to_smo')
     data = cmd.data
     with uow:
         subs = uow.subscriptions.list()
         for sub in subs:
             sub_data = sub.serialize()
             logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
-
-            try:
-                resource_filter = json.loads(sub_data['filter'])
-                if len(resource_filter) > 0:
-                    resource = uow.resources.get(data.id)
-                    logger.debug(type(resource))
-                    if resource:  # TODO deal with resource is empty
-                        res_type_id = resource.serialize()['resourceTypeId']
-                        resourcetype = uow.resource_types.get(res_type_id)
-                        logger.debug(resourcetype.name)
-                        if resourcetype.name not in resource_filter:
-                            continue
-            except json.decoder.JSONDecodeError as err:
-                logger.warning(
-                    'subscription filter decode json failed: {}'.format(err))
+            resource = uow.resources.get(data.id)
+            if resource is None:
+                logger.debug('Resource {} does not exists.'.format(data.id))
+                continue
+            res_pool_id = resource.serialize()['resourcePoolId']
+            logger.debug('res pool id is {}'.format(res_pool_id))
+            if sub_data.get('filter', None):
+                try:
+                    args = gen_orm_filter(ocloud.Resource, sub_data['filter'])
+                except KeyError:
+                    logger.error(
+                        'Subscription {} filter {} has wrong attribute name '
+                        'or value. Ignore the filter.'.format(
+                            sub_data['subscriptionId'], sub_data['filter']))
+                    callback_smo(sub, data)
+                    continue
+                args.append(ocloud.Resource.resourceId == data.id)
+                ret = uow.resources.list_with_count(res_pool_id, *args)
+                if ret[0] != 0:
+                    logger.debug(
+                        'Resource {} skip for subscription {} because of the '
+                        'filter.'
+                        .format(data.id, sub_data['subscriptionId']))
+                    continue
 
             callback_smo(sub, data)
 
index 11fad03..45fa4ef 100644 (file)
@@ -17,7 +17,8 @@ from flask_restx import Resource, reqparse
 
 from o2common.service.messagebus import MessageBus
 from o2common.views.pagination_route import link_header, PAGE_PARAM
-from o2common.views.route_exception import NotFoundException
+from o2common.views.route_exception import NotFoundException, \
+    BadRequestException
 from o2ims.views import alarm_view
 from o2ims.views.api_ns import api_ims_monitoring as api_monitoring_v1
 from o2ims.views.alarm_dto import AlarmDTO, SubscriptionDTO
@@ -192,6 +193,9 @@ class SubscriptionsListRouter(Resource):
         mask='{alarmSubscriptionId,callback,consumerSubscriptionId,filter}')
     def post(self):
         data = api_monitoring_v1.payload
+        callback = data.get('callback', None)
+        if not callback:
+            raise BadRequestException('The callback parameter is required')
         result = alarm_view.subscription_create(data, bus.uow)
         return result, 201
 
index b3a9e4a..bb238be 100644 (file)
@@ -16,7 +16,7 @@ import uuid as uuid
 
 from o2common.service import unit_of_work
 from o2common.views.pagination_view import Pagination
-from o2common.views.view import gen_filter
+from o2common.views.view import gen_filter, check_filter
 from o2ims.views.alarm_dto import SubscriptionDTO
 from o2ims.domain.alarm_obj import AlarmSubscription, AlarmEventRecord
 
@@ -61,12 +61,15 @@ def subscription_one(subscriptionId: str,
 
 def subscription_create(subscriptionDto: SubscriptionDTO.subscription_create,
                         uow: unit_of_work.AbstractUnitOfWork):
+    filter = subscriptionDto.get('filter', '')
+    consumer_subs_id = subscriptionDto.get('consumerSubscriptionId', '')
+
+    check_filter(AlarmEventRecord, filter)
 
     sub_uuid = str(uuid.uuid4())
     subscription = AlarmSubscription(
         sub_uuid, subscriptionDto['callback'],
-        subscriptionDto['consumerSubscriptionId'],
-        subscriptionDto['filter'])
+        consumer_subs_id, filter)
     with uow:
         uow.alarm_subscriptions.add(subscription)
         uow.commit()
index af5e8c6..434a431 100644 (file)
@@ -17,7 +17,8 @@ from flask_restx import Resource, reqparse
 
 from o2common.service.messagebus import MessageBus
 from o2common.views.pagination_route import link_header, PAGE_PARAM
-from o2common.views.route_exception import NotFoundException
+from o2common.views.route_exception import NotFoundException, \
+    BadRequestException
 from o2ims.domain import ocloud
 from o2ims.views import ocloud_view
 from o2ims.views.api_ns import api_ims_inventory as api_ims_inventory_v1
@@ -517,6 +518,9 @@ class SubscriptionsListRouter(Resource):
         mask='{subscriptionId,callback,consumerSubscriptionId,filter}')
     def post(self):
         data = api_ims_inventory_v1.payload
+        callback = data.get('callback', None)
+        if not callback:
+            raise BadRequestException('The callback parameter is required')
         result = ocloud_view.subscription_create(data, bus.uow)
         return result, 201
 
index 953adc9..215c917 100644 (file)
@@ -22,7 +22,7 @@ import shutil
 from o2common.service import unit_of_work
 from o2common.config import config
 from o2common.views.pagination_view import Pagination
-from o2common.views.view import gen_filter
+from o2common.views.view import gen_filter, check_filter
 from o2ims.domain import ocloud
 from o2ims.views.ocloud_dto import SubscriptionDTO
 from o2ims.domain.subscription_obj import Subscription
@@ -220,12 +220,15 @@ def subscription_one(subscriptionId: str,
 
 def subscription_create(subscriptionDto: SubscriptionDTO.subscription_create,
                         uow: unit_of_work.AbstractUnitOfWork):
+    filter = subscriptionDto.get('filter', '')
+    consumer_subs_id = subscriptionDto.get('consumerSubscriptionId', '')
+
+    check_filter(ocloud.Resource, filter)
 
     sub_uuid = str(uuid.uuid4())
     subscription = Subscription(
         sub_uuid, subscriptionDto['callback'],
-        subscriptionDto['consumerSubscriptionId'],
-        subscriptionDto['filter'])
+        consumer_subs_id, filter)
     with uow:
         uow.subscriptions.add(subscription)
         uow.commit()
index 1cd48b8..e32ac8b 100644 (file)
@@ -180,7 +180,7 @@ def test_flask_post(mock_flask_uow):
         resp = client.post(apibase+'/alarmSubscriptions', json={
             'callback': sub_callback,
             'consumerSubscriptionId': 'consumerSubId1',
-            'filter': 'empty'
+            'filter': ''
         })
         assert resp.status_code == 201
         assert 'alarmSubscriptionId' in resp.get_json()
index 3c0fde7..13a7d42 100644 (file)
@@ -457,7 +457,7 @@ def test_flask_post(mock_flask_uow):
         resp = client.post(apibase+'/subscriptions', json={
             'callback': sub_callback,
             'consumerSubscriptionId': 'consumerSubId1',
-            'filter': 'empty'
+            'filter': ''
         })
         assert resp.status_code == 201
         assert 'subscriptionId' in resp.get_json()