--- /dev/null
+# Copyright (C) 2021-2022 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from sqlalchemy.sql.elements import ColumnElement
+from sqlalchemy import or_
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+def gen_orm_filter(obj: ColumnElement, filter_str: str):
+ if not filter_str:
+ return []
+ filter_without_space = filter_str.replace(" ", "")
+ items = filter_without_space.split(';')
+
+ filter_list = list()
+ for i in items:
+ if '(' in i:
+ i = i.replace("(", "")
+ if ')' in i:
+ i = i.replace(")", "")
+ filter_expr = i.split(',')
+ if len(filter_expr) < 3:
+ continue
+ filter_op = filter_expr[0]
+ filter_key = filter_expr[1]
+ filter_vals = filter_expr[2:]
+ filter_list.extend(toFilterArgs(
+ filter_op, obj, filter_key, filter_vals))
+ logger.info('Filter list length: %d' % len(filter_list))
+ return filter_list
+
+
+def toFilterArgs(operation: str, obj: ColumnElement, key: str, values: list):
+ if not hasattr(obj, key):
+ logger.warning('Filter attrName %s not in Object %s.' %
+ (key, str(obj)))
+ raise KeyError(
+ 'Filter attrName {} not in the Object'.format(key))
+
+ if operation in ['eq', 'neq', 'gt', 'lt', 'gte', 'lte']:
+ if len(values) != 1:
+ raise KeyError(
+ 'Filter operation one {} is only support one value.'.
+ format(operation))
+ elif operation in ['in', 'nin', 'cont', 'ncont']:
+ if len(values) == 0:
+ raise KeyError('Filter operation {} value is needed.'.
+ format(operation))
+ else:
+ raise KeyError('Filter operation {} not support.'.format(operation))
+
+ ll = list()
+ if operation == 'eq':
+ val = values[0]
+ if val.lower() == 'null':
+ val = None
+ ll.append(getattr(obj, key) == val)
+ elif operation == 'neq':
+ val = values[0]
+ if val.lower() == 'null':
+ val = None
+ ll.append(getattr(obj, key) != val)
+ elif operation == 'gt':
+ val = values[0]
+ ll.append(getattr(obj, key) > val)
+ elif operation == 'lt':
+ val = values[0]
+ ll.append(getattr(obj, key) < val)
+ elif operation == 'gte':
+ val = values[0]
+ ll.append(getattr(obj, key) >= val)
+ elif operation == 'lte':
+ val = values[0]
+ ll.append(getattr(obj, key) <= val)
+ elif operation == 'in':
+ ll.append(getattr(obj, key).in_(values))
+ elif operation == 'nin':
+ ll.append(~getattr(obj, key).in_(values))
+ elif operation == 'cont':
+ val_list = list()
+ for val in values:
+ val_list.append(getattr(obj, key).contains(val))
+ ll.append(or_(*val_list))
+ elif operation == 'ncont':
+ val_list = list()
+ for val in values:
+ val_list.append(getattr(obj, key).contains(val))
+ ll.append(~or_(*val_list))
+ return ll
# See the License for the specific language governing permissions and
# limitations under the License.
+import re
from sqlalchemy.sql.elements import ColumnElement
-from sqlalchemy import or_
from o2common.views.route_exception import BadRequestException
+from o2common.domain.filter import gen_orm_filter
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
def gen_filter(obj: ColumnElement, filter_str: str):
- if filter_str == '':
- return []
+ check_filter(obj, filter_str)
+ try:
+ filter_list = gen_orm_filter(obj, filter_str)
+ except KeyError as e:
+ raise BadRequestException(e.args[0])
+ return filter_list
+
+
+# The regular expressions testing example put on here
+# (neq,testkey,value-1)
+# (neq,testkey,value-1,value-2)
+# (gt,hello,1)
+# (gte,world,2)
+# (lt,testlt,notint)
+# (ncont,key1,v1,v_2)
+# (gt,hello,1);(ncont,world,val1,val-2)
+# (eq,wrong,60cba7be-e2cd-3b8c-a7ff-16e0f10573f9)
+# (eq,description,value key)
+def check_filter(obj: ColumnElement, filter_str: str):
+ if not filter_str:
+ return
+ pattern = r'^(\((eq|neq|gt|lt|gte|lte){1},\w+,[\w -]+\)\;?|' +\
+ r'\((in|nin|cont|ncont){1},\w*(,[\w -]*)*\)\;?)+'
+ result = re.match(pattern, filter_str)
+ logger.warning('filter: {} match result is {}'.format(filter_str, result))
+ if not result:
+ raise BadRequestException(
+ 'filter value formater not correct.')
+ check_filter_attribute(obj, filter_str)
+
+
+def check_filter_attribute(obj: ColumnElement, filter_str: str):
filter_without_space = filter_str.replace(" ", "")
items = filter_without_space.split(';')
- filter_list = list()
for i in items:
if '(' in i:
i = i.replace("(", "")
i = i.replace(")", "")
filter_expr = i.split(',')
if len(filter_expr) < 3:
+ raise BadRequestException(
+ 'Filter {} formater not correct.'.format(i))
continue
- filter_op = filter_expr[0]
+ # filter_op = filter_expr[0]
filter_key = filter_expr[1]
- filter_vals = filter_expr[2:]
- filter_list.extend(toFilterArgs(
- filter_op, obj, filter_key, filter_vals))
- logger.info('Filter list length: %d' % len(filter_list))
- return filter_list
-
-
-def toFilterArgs(operation: str, obj: ColumnElement, key: str, values: list):
- if not hasattr(obj, key):
- logger.warning('Filter attrName %s not in Object %s.' %
- (key, str(obj)))
- raise BadRequestException(
- 'Filter attrName {} not in the Object'.format(key))
-
- if operation in ['eq', 'neq', 'gt', 'lt', 'gte', 'lte']:
- if len(values) != 1:
- raise KeyError('Filter operation one is only support one value.')
- elif operation in ['in', 'nin', 'cont', 'ncont']:
- if len(values) == 0:
- raise KeyError('Filter operation value is needed.')
- else:
- raise KeyError('Filter operation value not support.')
-
- ll = list()
- if operation == 'eq':
- val = values[0]
- if val.lower() == 'null':
- val = None
- ll.append(getattr(obj, key) == val)
- elif operation == 'neq':
- val = values[0]
- if val.lower() == 'null':
- val = None
- ll.append(getattr(obj, key) != val)
- elif operation == 'gt':
- val = values[0]
- ll.append(getattr(obj, key) > val)
- elif operation == 'lt':
- val = values[0]
- ll.append(getattr(obj, key) < val)
- elif operation == 'gte':
- val = values[0]
- ll.append(getattr(obj, key) >= val)
- elif operation == 'lte':
- val = values[0]
- ll.append(getattr(obj, key) <= val)
- elif operation == 'in':
- ll.append(getattr(obj, key).in_(values))
- elif operation == 'nin':
- ll.append(~getattr(obj, key).in_(values))
- elif operation == 'cont':
- val_list = list()
- for val in values:
- val_list.append(getattr(obj, key).contains(val))
- ll.append(or_(*val_list))
- elif operation == 'ncont':
- val_list = list()
- for val in values:
- val_list.append(getattr(obj, key).contains(val))
- ll.append(~or_(*val_list))
- return ll
+ # filter_vals = filter_expr[2:]
+ if not hasattr(obj, filter_key):
+ raise BadRequestException(
+ 'Filter attrName {} not in the Object'.format(filter_key))
# See the License for the specific language governing permissions and
# limitations under the License.
-import json
# import redis
# import requests
+import ssl
+import json
from urllib.parse import urlparse
# from o2common.config import config
+from o2common.domain.filter import gen_orm_filter
from o2common.service.unit_of_work import AbstractUnitOfWork
-from o2ims.domain import commands
-from o2ims.domain.alarm_obj import AlarmSubscription, AlarmEvent2SMO
-import ssl
from o2common.service.command.handler import get_https_conn_default
from o2common.service.command.handler import get_http_conn
from o2common.service.command.handler import get_https_conn_selfsigned
from o2common.service.command.handler import post_data
+
+from o2ims.domain import commands
+from o2ims.domain.alarm_obj import AlarmSubscription, AlarmEvent2SMO, \
+ AlarmEventRecord
+
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
cmd: commands.PubAlarm2SMO,
uow: AbstractUnitOfWork,
):
- logger.info('In notify_alarm_to_smo')
+ logger.debug('In notify_alarm_to_smo')
data = cmd.data
with uow:
subs = uow.alarm_subscriptions.list()
logger.debug('Alarm Subscription: {}'.format(
sub_data['alarmSubscriptionId']))
+ alarm = uow.alarm_event_records.get(data.id)
+ if alarm is None:
+ logger.debug('Alarm Event {} does not exists.'.format(data.id))
+ continue
+ if sub_data.get('filter', None):
+ try:
+ args = gen_orm_filter(AlarmEventRecord, sub_data['filter'])
+ except KeyError:
+ logger.warning(
+ 'Alarm Subscription {} filter {} has wrong attribute '
+ 'name or value. Ignore the filter'.format(
+ sub_data['alarmSubscriptionId'],
+ sub_data['filter']))
+ callback_smo(sub, data)
+ continue
+ args.append(AlarmEventRecord.alarmEventRecordId == data.id)
+ ret = uow.alarm_event_records.list_with_count(*args)
+ if ret[0] != 0:
+ logger.debug(
+ 'Alarm Event {} skip for subscription {} because of '
+ 'the filter.'
+ .format(data.id, sub_data['alarmSubscriptionId']))
+ continue
+
callback_smo(sub, data)
# See the License for the specific language governing permissions and
# limitations under the License.
-import json
# import redis
# import requests
+import json
+import ssl
from urllib.parse import urlparse
# from o2common.config import config
+from o2common.domain.filter import gen_orm_filter
from o2common.service.unit_of_work import AbstractUnitOfWork
-from o2ims.domain import commands
-from o2ims.domain.subscription_obj import Subscription, Message2SMO
from o2common.service.command.handler import get_https_conn_default
from o2common.service.command.handler import get_http_conn
from o2common.service.command.handler import get_https_conn_selfsigned
from o2common.service.command.handler import post_data
-import ssl
+
+from o2ims.domain import commands, ocloud
+from o2ims.domain.subscription_obj import Subscription, Message2SMO
+
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
cmd: commands.PubMessage2SMO,
uow: AbstractUnitOfWork,
):
- logger.info('In notify_change_to_smo')
+ logger.debug('In notify_change_to_smo')
data = cmd.data
with uow:
subs = uow.subscriptions.list()
for sub in subs:
sub_data = sub.serialize()
logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
-
- try:
- resource_filter = json.loads(sub_data['filter'])
- if len(resource_filter) > 0:
- resource = uow.resources.get(data.id)
- logger.debug(type(resource))
- if resource: # TODO deal with resource is empty
- res_type_id = resource.serialize()['resourceTypeId']
- resourcetype = uow.resource_types.get(res_type_id)
- logger.debug(resourcetype.name)
- if resourcetype.name not in resource_filter:
- continue
- except json.decoder.JSONDecodeError as err:
- logger.warning(
- 'subscription filter decode json failed: {}'.format(err))
+ resource = uow.resources.get(data.id)
+ if resource is None:
+ logger.debug('Resource {} does not exists.'.format(data.id))
+ continue
+ res_pool_id = resource.serialize()['resourcePoolId']
+ logger.debug('res pool id is {}'.format(res_pool_id))
+ if sub_data.get('filter', None):
+ try:
+ args = gen_orm_filter(ocloud.Resource, sub_data['filter'])
+ except KeyError:
+ logger.error(
+ 'Subscription {} filter {} has wrong attribute name '
+ 'or value. Ignore the filter.'.format(
+ sub_data['subscriptionId'], sub_data['filter']))
+ callback_smo(sub, data)
+ continue
+ args.append(ocloud.Resource.resourceId == data.id)
+ ret = uow.resources.list_with_count(res_pool_id, *args)
+ if ret[0] != 0:
+ logger.debug(
+ 'Resource {} skip for subscription {} because of the '
+ 'filter.'
+ .format(data.id, sub_data['subscriptionId']))
+ continue
callback_smo(sub, data)
from o2common.service.messagebus import MessageBus
from o2common.views.pagination_route import link_header, PAGE_PARAM
-from o2common.views.route_exception import NotFoundException
+from o2common.views.route_exception import NotFoundException, \
+ BadRequestException
from o2ims.views import alarm_view
from o2ims.views.api_ns import api_ims_monitoring as api_monitoring_v1
from o2ims.views.alarm_dto import AlarmDTO, SubscriptionDTO
mask='{alarmSubscriptionId,callback,consumerSubscriptionId,filter}')
def post(self):
data = api_monitoring_v1.payload
+ callback = data.get('callback', None)
+ if not callback:
+ raise BadRequestException('The callback parameter is required')
result = alarm_view.subscription_create(data, bus.uow)
return result, 201
from o2common.service import unit_of_work
from o2common.views.pagination_view import Pagination
-from o2common.views.view import gen_filter
+from o2common.views.view import gen_filter, check_filter
from o2ims.views.alarm_dto import SubscriptionDTO
from o2ims.domain.alarm_obj import AlarmSubscription, AlarmEventRecord
def subscription_create(subscriptionDto: SubscriptionDTO.subscription_create,
uow: unit_of_work.AbstractUnitOfWork):
+ filter = subscriptionDto.get('filter', '')
+ consumer_subs_id = subscriptionDto.get('consumerSubscriptionId', '')
+
+ check_filter(AlarmEventRecord, filter)
sub_uuid = str(uuid.uuid4())
subscription = AlarmSubscription(
sub_uuid, subscriptionDto['callback'],
- subscriptionDto['consumerSubscriptionId'],
- subscriptionDto['filter'])
+ consumer_subs_id, filter)
with uow:
uow.alarm_subscriptions.add(subscription)
uow.commit()
from o2common.service.messagebus import MessageBus
from o2common.views.pagination_route import link_header, PAGE_PARAM
-from o2common.views.route_exception import NotFoundException
+from o2common.views.route_exception import NotFoundException, \
+ BadRequestException
from o2ims.domain import ocloud
from o2ims.views import ocloud_view
from o2ims.views.api_ns import api_ims_inventory as api_ims_inventory_v1
mask='{subscriptionId,callback,consumerSubscriptionId,filter}')
def post(self):
data = api_ims_inventory_v1.payload
+ callback = data.get('callback', None)
+ if not callback:
+ raise BadRequestException('The callback parameter is required')
result = ocloud_view.subscription_create(data, bus.uow)
return result, 201
from o2common.service import unit_of_work
from o2common.config import config
from o2common.views.pagination_view import Pagination
-from o2common.views.view import gen_filter
+from o2common.views.view import gen_filter, check_filter
from o2ims.domain import ocloud
from o2ims.views.ocloud_dto import SubscriptionDTO
from o2ims.domain.subscription_obj import Subscription
def subscription_create(subscriptionDto: SubscriptionDTO.subscription_create,
uow: unit_of_work.AbstractUnitOfWork):
+ filter = subscriptionDto.get('filter', '')
+ consumer_subs_id = subscriptionDto.get('consumerSubscriptionId', '')
+
+ check_filter(ocloud.Resource, filter)
sub_uuid = str(uuid.uuid4())
subscription = Subscription(
sub_uuid, subscriptionDto['callback'],
- subscriptionDto['consumerSubscriptionId'],
- subscriptionDto['filter'])
+ consumer_subs_id, filter)
with uow:
uow.subscriptions.add(subscription)
uow.commit()
resp = client.post(apibase+'/alarmSubscriptions', json={
'callback': sub_callback,
'consumerSubscriptionId': 'consumerSubId1',
- 'filter': 'empty'
+ 'filter': ''
})
assert resp.status_code == 201
assert 'alarmSubscriptionId' in resp.get_json()
resp = client.post(apibase+'/subscriptions', json={
'callback': sub_callback,
'consumerSubscriptionId': 'consumerSubId1',
- 'filter': 'empty'
+ 'filter': ''
})
assert resp.status_code == 201
assert 'subscriptionId' in resp.get_json()