From 9cda6dd9bfcf3de766d7be0c42a7ec671f5ac308 Mon Sep 17 00:00:00 2001 From: "Zhang Rong(Jon)" Date: Mon, 28 Nov 2022 22:49:42 +0800 Subject: [PATCH] Fix INF-378 inventory subscription filter upgrade Issue-ID: INF-378 Signed-off-by: Zhang Rong(Jon) Change-Id: I66bdc2ee8c72ba5ce2552acef412e71554409d1a --- o2common/domain/filter.py | 13 +- o2common/views/view.py | 16 +- o2ims/service/auditor/pserver_acc_handler.py | 4 + o2ims/service/auditor/pserver_cpu_handler.py | 4 + o2ims/service/auditor/pserver_dev_handler.py | 4 + o2ims/service/auditor/pserver_eth_handler.py | 4 + o2ims/service/auditor/pserver_if_handler.py | 4 + o2ims/service/auditor/pserver_mem_handler.py | 4 + o2ims/service/auditor/pserver_port_handler.py | 4 + o2ims/service/command/notify_handler.py | 262 ++++++++++++++++---------- o2ims/views/ocloud_view.py | 58 +++++- 11 files changed, 258 insertions(+), 119 deletions(-) diff --git a/o2common/domain/filter.py b/o2common/domain/filter.py index a84fdb8..adc7df3 100644 --- a/o2common/domain/filter.py +++ b/o2common/domain/filter.py @@ -23,23 +23,22 @@ logger = o2logging.get_logger(__name__) def gen_orm_filter(obj: ColumnElement, filter_str: str): + logger.debug(filter_str) if not filter_str: return [] - # filter_without_space = filter_str.replace(" ", "") - filter_without_space = filter_str.strip(' ()') + filter_without_space = filter_str.strip() items = filter_without_space.split(';') filter_list = list() for i in items: - # if '(' in i: - # i = i.replace("(", "") - # if ')' in i: - # i = i.replace(")", "") - filter_expr = i.split(',') + item = i.strip(' ()') + filter_expr = item.split(',') if len(filter_expr) < 3: continue filter_op = filter_expr[0].strip() filter_key = filter_expr[1].strip() + if filter_key == 'objectType': + continue filter_vals = filter_expr[2:] filter_list.extend(toFilterArgs( filter_op, obj, filter_key, filter_vals)) diff --git a/o2common/views/view.py b/o2common/views/view.py index af02e78..57294e2 100644 --- a/o2common/views/view.py +++ b/o2common/views/view.py @@ -56,24 +56,22 @@ def check_filter(obj: ColumnElement, filter_str: str): def check_filter_attribute(obj: ColumnElement, filter_str: str): - # filter_without_space = filter_str.replace(" ", "") - filter_without_space = filter_str.strip(' ()') + filter_without_space = filter_str.strip() logger.debug( f"filter_str: {filter_str}, stripped: {filter_without_space}") items = filter_without_space.split(';') for i in items: - # if '(' in i: - # i = i.replace("(", "") - # if ')' in i: - # i = i.replace(")", "") - filter_expr = i.split(',') + item = i.strip(' ()') + filter_expr = item.split(',') if len(filter_expr) < 3: raise BadRequestException( - 'ignore invalid filter {}'.format(i)) - continue + 'invalid filter {}'.format(i)) filter_op = filter_expr[0].strip() filter_key = filter_expr[1].strip() + if filter_key == 'objectType': + logger.debug('ignore objectType while checking formatter') + continue filter_vals = filter_expr[2:] if filter_op in ["eq", "neq", "gt", "lt", "gte", "lte"]: if len(filter_vals) != 1: diff --git a/o2ims/service/auditor/pserver_acc_handler.py b/o2ims/service/auditor/pserver_acc_handler.py index 5c9771b..da1516e 100644 --- a/o2ims/service/auditor/pserver_acc_handler.py +++ b/o2ims/service/auditor/pserver_acc_handler.py @@ -64,6 +64,10 @@ def update_pserver_acc( alarm_dictionary = uow.alarm_dictionaries.get(dict_id) if alarm_dictionary: res_type.alarmDictionary = alarm_dictionary + res_type.events.append(events.ResourceTypeChanged( + id=res_type.resourceTypeId, + notificationEventType=NotificationEventEnum.CREATE, + updatetime=stxobj.updatetime)) uow.resource_types.add(res_type) else: resourcetype_id = first['resourceTypeId'] diff --git a/o2ims/service/auditor/pserver_cpu_handler.py b/o2ims/service/auditor/pserver_cpu_handler.py index 5ad64ca..ac2b50f 100644 --- a/o2ims/service/auditor/pserver_cpu_handler.py +++ b/o2ims/service/auditor/pserver_cpu_handler.py @@ -64,6 +64,10 @@ def update_pserver_cpu( alarm_dictionary = uow.alarm_dictionaries.get(dict_id) if alarm_dictionary: res_type.alarmDictionary = alarm_dictionary + res_type.events.append(events.ResourceTypeChanged( + id=res_type.resourceTypeId, + notificationEventType=NotificationEventEnum.CREATE, + updatetime=stxobj.updatetime)) uow.resource_types.add(res_type) else: resourcetype_id = first['resourceTypeId'] diff --git a/o2ims/service/auditor/pserver_dev_handler.py b/o2ims/service/auditor/pserver_dev_handler.py index 5b8ede8..0efe123 100644 --- a/o2ims/service/auditor/pserver_dev_handler.py +++ b/o2ims/service/auditor/pserver_dev_handler.py @@ -64,6 +64,10 @@ def update_pserver_dev( alarm_dictionary = uow.alarm_dictionaries.get(dict_id) if alarm_dictionary: res_type.alarmDictionary = alarm_dictionary + res_type.events.append(events.ResourceTypeChanged( + id=res_type.resourceTypeId, + notificationEventType=NotificationEventEnum.CREATE, + updatetime=stxobj.updatetime)) uow.resource_types.add(res_type) else: resourcetype_id = first['resourceTypeId'] diff --git a/o2ims/service/auditor/pserver_eth_handler.py b/o2ims/service/auditor/pserver_eth_handler.py index 6d3a5ff..4e9872c 100644 --- a/o2ims/service/auditor/pserver_eth_handler.py +++ b/o2ims/service/auditor/pserver_eth_handler.py @@ -64,6 +64,10 @@ def update_pserver_eth( alarm_dictionary = uow.alarm_dictionaries.get(dict_id) if alarm_dictionary: res_type.alarmDictionary = alarm_dictionary + res_type.events.append(events.ResourceTypeChanged( + id=res_type.resourceTypeId, + notificationEventType=NotificationEventEnum.CREATE, + updatetime=stxobj.updatetime)) uow.resource_types.add(res_type) else: resourcetype_id = first['resourceTypeId'] diff --git a/o2ims/service/auditor/pserver_if_handler.py b/o2ims/service/auditor/pserver_if_handler.py index fa0b9d4..022f717 100644 --- a/o2ims/service/auditor/pserver_if_handler.py +++ b/o2ims/service/auditor/pserver_if_handler.py @@ -64,6 +64,10 @@ def update_pserver_if( alarm_dictionary = uow.alarm_dictionaries.get(dict_id) if alarm_dictionary: res_type.alarmDictionary = alarm_dictionary + res_type.events.append(events.ResourceTypeChanged( + id=res_type.resourceTypeId, + notificationEventType=NotificationEventEnum.CREATE, + updatetime=stxobj.updatetime)) uow.resource_types.add(res_type) else: resourcetype_id = first['resourceTypeId'] diff --git a/o2ims/service/auditor/pserver_mem_handler.py b/o2ims/service/auditor/pserver_mem_handler.py index a6fb500..21af49d 100644 --- a/o2ims/service/auditor/pserver_mem_handler.py +++ b/o2ims/service/auditor/pserver_mem_handler.py @@ -64,6 +64,10 @@ def update_pserver_mem( alarm_dictionary = uow.alarm_dictionaries.get(dict_id) if alarm_dictionary: res_type.alarmDictionary = alarm_dictionary + res_type.events.append(events.ResourceTypeChanged( + id=res_type.resourceTypeId, + notificationEventType=NotificationEventEnum.CREATE, + updatetime=stxobj.updatetime)) uow.resource_types.add(res_type) else: resourcetype_id = first['resourceTypeId'] diff --git a/o2ims/service/auditor/pserver_port_handler.py b/o2ims/service/auditor/pserver_port_handler.py index 30c0ddd..1faa08a 100644 --- a/o2ims/service/auditor/pserver_port_handler.py +++ b/o2ims/service/auditor/pserver_port_handler.py @@ -64,6 +64,10 @@ def update_pserver_port( alarm_dictionary = uow.alarm_dictionaries.get(dict_id) if alarm_dictionary: res_type.alarmDictionary = alarm_dictionary + res_type.events.append(events.ResourceTypeChanged( + id=res_type.resourceTypeId, + notificationEventType=NotificationEventEnum.CREATE, + updatetime=stxobj.updatetime)) uow.resource_types.add(res_type) else: resourcetype_id = first['resourceTypeId'] diff --git a/o2ims/service/command/notify_handler.py b/o2ims/service/command/notify_handler.py index 08da45d..06dc824 100644 --- a/o2ims/service/command/notify_handler.py +++ b/o2ims/service/command/notify_handler.py @@ -45,18 +45,16 @@ def notify_change_to_smo( logger.debug('In notify_change_to_smo') msg_type = cmd.type if msg_type == 'ResourceType': - _notify_resourcetype(uow, cmd) + _notify_resourcetype(uow, cmd.data) elif msg_type == 'ResourcePool': - _notify_resourcepool(uow, cmd) + _notify_resourcepool(uow, cmd.data) elif msg_type == 'Dms': - _notify_dms(uow, cmd) + _notify_dms(uow, cmd.data) elif msg_type == 'Resource': - _notify_resource(uow, cmd) + _notify_resource(uow, cmd.data) -def _notify_resourcetype(uow, cmd): - data = cmd.data - msg_type = cmd.type +def _notify_resourcetype(uow, data): with uow: resource_type = uow.resource_types.get(data.id) if resource_type is None: @@ -76,32 +74,36 @@ def _notify_resourcetype(uow, cmd): for sub in subs: sub_data = sub.serialize() logger.debug('Subscription: {}'.format(sub_data['subscriptionId'])) - if not sub_data.get('filter', None): - callback_smo(sub, data, msg_type, resource_type_dict) - continue - try: - args = gen_orm_filter(ocloud.ResourceType, sub_data['filter']) - except KeyError: - logger.warning( - 'Subscription {} filter {} has wrong attribute name ' - 'or value. Ignore the filter.'.format( - sub_data['subscriptionId'], sub_data['filter'])) - callback_smo(sub, data, msg_type, resource_type_dict) + filters = handle_filter(sub_data['filter'], 'ResourceTypeInfo') + if not filters: + callback_smo(sub, data, resource_type_dict) continue - args.append(ocloud.ResourceType.resourceTypeId == data.id) - ret = uow.resource_types.list_with_count(*args) - if ret[0] != 0: - logger.debug( - 'ResourceType {} skip for subscription {} because of the ' - 'filter.' - .format(data.id, sub_data['subscriptionId'])) + filter_effect = 0 + for filter in filters: + try: + args = gen_orm_filter(ocloud.ResourceType, filter) + except KeyError: + logger.warning( + 'Subscription {} filter {} has wrong attribute ' + 'name or value. Ignore the filter.'.format( + sub_data['subscriptionId'], + sub_data['filter'])) + continue + args.append(ocloud.ResourceType.resourceTypeId == data.id) + ret = uow.resource_types.list_with_count(*args) + if ret[0] > 0: + logger.debug( + 'ResourcePool {} skip for subscription {} because of' + ' the filter.' + .format(data.id, sub_data['subscriptionId'])) + filter_effect += 1 + break + if filter_effect > 0: continue - callback_smo(sub, data, msg_type, resource_type_dict) + callback_smo(sub, data, resource_type_dict) -def _notify_resourcepool(uow, cmd): - data = cmd.data - msg_type = cmd.type +def _notify_resourcepool(uow, data): with uow: resource_pool = uow.resource_pools.get(data.id) if resource_pool is None: @@ -119,32 +121,36 @@ def _notify_resourcepool(uow, cmd): for sub in subs: sub_data = sub.serialize() logger.debug('Subscription: {}'.format(sub_data['subscriptionId'])) - if not sub_data.get('filter', None): - callback_smo(sub, data, msg_type, resource_pool_dict) + filters = handle_filter(sub_data['filter'], 'ResourcePoolInfo') + if not filters: + callback_smo(sub, data, resource_pool_dict) continue - try: - args = gen_orm_filter(ocloud.Resource, sub_data['filter']) - except KeyError: - logger.warning( - 'Subscription {} filter {} has wrong attribute name ' - 'or value. Ignore the filter.'.format( - sub_data['subscriptionId'], sub_data['filter'])) - callback_smo(sub, data, msg_type, resource_pool_dict) + filter_effect = 0 + for filter in filters: + try: + args = gen_orm_filter(ocloud.ResourcePool, filter) + except KeyError: + logger.warning( + 'Subscription {} filter {} has wrong attribute ' + 'name or value. Ignore the filter.'.format( + sub_data['subscriptionId'], + sub_data['filter'])) + continue + args.append(ocloud.ResourcePool.resourcePoolId == data.id) + ret = uow.resource_pools.list_with_count(*args) + if ret[0] > 0: + logger.debug( + 'ResourcePool {} skip for subscription {} because of' + ' the filter.' + .format(data.id, sub_data['subscriptionId'])) + filter_effect += 1 + break + if filter_effect > 0: continue - args.append(ocloud.ResourcePool.resourcePoolId == data.id) - ret = uow.resource_pools.list_with_count(*args) - if ret[0] != 0: - logger.debug( - 'ResourcePool {} skip for subscription {} because of the ' - 'filter.' - .format(data.id, sub_data['subscriptionId'])) - continue - callback_smo(sub, data, msg_type, resource_pool_dict) + callback_smo(sub, data, resource_pool_dict) -def _notify_dms(uow, cmd): - data = cmd.data - msg_type = cmd.type +def _notify_dms(uow, data): with uow: dms = uow.deployment_managers.get(data.id) if dms is None: @@ -163,34 +169,46 @@ def _notify_dms(uow, cmd): for sub in subs: sub_data = sub.serialize() logger.debug('Subscription: {}'.format(sub_data['subscriptionId'])) - if not sub_data.get('filter', None): - callback_smo(sub, data, msg_type, dms_dict) - continue - try: - args = gen_orm_filter(ocloud.Resource, sub_data['filter']) - except KeyError: - logger.warning( - 'Subscription {} filter {} has wrong attribute name ' - 'or value. Ignore the filter.'.format( - sub_data['subscriptionId'], sub_data['filter'])) - callback_smo(sub, data, msg_type, dms_dict) + filters = handle_filter( + sub_data['filter'], 'DeploymentManagerInfo') + if not filters: + callback_smo(sub, data, dms_dict) continue - args.append( - ocloud.DeploymentManager.deploymentManagerId == data.id) - ret = uow.deployment_managers.list_with_count(*args) - if ret[0] != 0: - logger.debug( - 'DeploymentManager {} skip for subscription {} because of ' - 'the filter.' - .format(data.id, sub_data['subscriptionId'])) + filter_effect = 0 + for filter in filters: + try: + args = gen_orm_filter(ocloud.DeploymentManager, filter) + except KeyError: + logger.warning( + 'Subscription {} filter {} has wrong attribute ' + 'name or value. Ignore the filter.'.format( + sub_data['subscriptionId'], + sub_data['filter'])) + continue + args.append( + ocloud.DeploymentManager.deploymentManagerId == data.id) + ret = uow.deployment_managers.list_with_count(*args) + if ret[0] > 0: + logger.debug( + 'DeploymentManager {} skip for subscription {} because' + ' of the filter.' + .format(data.id, sub_data['subscriptionId'])) + filter_effect += 1 + break + if filter_effect > 0: continue - callback_smo(sub, data) - callback_smo(sub, data, msg_type, dms_dict) + callback_smo(sub, data, dms_dict) -def _notify_resource(uow, cmd): - data = cmd.data - msg_type = cmd.type +class FilterNotEffect(Exception): + pass + + +class FilterEffect(Exception): + pass + + +def _notify_resource(uow, data): with uow: resource = uow.resources.get(data.id) if resource is None: @@ -198,36 +216,75 @@ def _notify_resource(uow, cmd): return res_pool_id = resource.serialize()['resourcePoolId'] logger.debug('res pool id is {}'.format(res_pool_id)) + res_dict = { + 'resourceId': resource.resourceId, + 'description': resource.description, + 'resourceTypeId': resource.resourceTypeId, + 'resourcePoolId': resource.resourcePoolId, + 'globalAssetId': resource.globalAssetId + } subs = uow.subscriptions.list() for sub in subs: sub_data = sub.serialize() logger.debug('Subscription: {}'.format(sub_data['subscriptionId'])) - if not sub_data.get('filter', None): - callback_smo(sub, data, msg_type) + filters = handle_filter(sub_data['filter'], 'ResourceInfo') + if not filters: + callback_smo(sub, data, res_dict) continue - try: - args = gen_orm_filter(ocloud.Resource, sub_data['filter']) - except KeyError: - logger.warning( - 'Subscription {} filter {} has wrong attribute name ' - 'or value. Ignore the filter.'.format( - sub_data['subscriptionId'], sub_data['filter'])) - callback_smo(sub, data, msg_type) + filter_effect = 0 + for filter in filters: + try: + args = gen_orm_filter(ocloud.Resource, filter) + except KeyError: + logger.warning( + 'Subscription {} filter {} has wrong attribute ' + 'name or value. Ignore the filter.'.format( + sub_data['subscriptionId'], + sub_data['filter'])) + continue + args.append(ocloud.Resource.resourceId == data.id) + ret = uow.resources.list_with_count(res_pool_id, *args) + if ret[0] > 0: + logger.debug( + 'Resource {} skip for subscription {} because of ' + 'the filter.' + .format(data.id, sub_data['subscriptionId'])) + filter_effect += 1 + break + if filter_effect > 0: continue - args.append(ocloud.Resource.resourceId == data.id) - ret = uow.resources.list_with_count(res_pool_id, *args) - if ret[0] != 0: - logger.debug( - 'Resource {} skip for subscription {} because of the ' - 'filter.' - .format(data.id, sub_data['subscriptionId'])) + callback_smo(sub, data, res_dict) + + +def handle_filter(filter: str, f_type: str): + if not filter: + return + filter_strip = filter.strip(' []') + filter_list = filter_strip.split('|') + filters = list() + for sub_filter in filter_list: + exprs = sub_filter.split(';') + objectType = False + objectTypeValue = '' + for expr in exprs: + expr_strip = expr.strip(' ()') + items = expr_strip.split(',') + item_key = items[1].strip() + if item_key != 'objectType': continue - callback_smo(sub, data, msg_type) + objectType = True + objectTypeValue = items[2].strip() + if not objectType: + if f_type == 'ResourceInfo': + filters.append(sub_filter) + continue + if objectTypeValue == f_type: + filters.append(sub_filter) + return filters -def callback_smo(sub: Subscription, msg: Message2SMO, msg_type: str, - obj_dict: dict = None): +def callback_smo(sub: Subscription, msg: Message2SMO, obj_dict: dict = None): sub_data = sub.serialize() callback = { 'consumerSubscriptionId': sub_data['consumerSubscriptionId'], @@ -235,13 +292,14 @@ def callback_smo(sub: Subscription, msg: Message2SMO, msg_type: str, 'objectRef': msg.objectRef, 'updateTime': msg.updatetime } - if msg_type != 'Resource': - if msg.notificationEventType in [NotificationEventEnum.DELETE, - NotificationEventEnum.MODIFY]: - callback['priorObjectState'] = obj_dict - if msg.notificationEventType in [NotificationEventEnum.CREATE, - NotificationEventEnum.MODIFY]: - callback['postObjectState'] = obj_dict + if msg.notificationEventType in [NotificationEventEnum.DELETE, + NotificationEventEnum.MODIFY]: + callback['priorObjectState'] = obj_dict + if msg.notificationEventType in [NotificationEventEnum.CREATE, + NotificationEventEnum.MODIFY]: + callback['postObjectState'] = obj_dict + if msg.notificationEventType == NotificationEventEnum.DELETE: + callback.pop('objectRef') callback_data = json.dumps(callback) logger.info('URL: {}, data: {}'.format( sub_data['callback'], callback_data)) diff --git a/o2ims/views/ocloud_view.py b/o2ims/views/ocloud_view.py index 5216125..d062ada 100644 --- a/o2ims/views/ocloud_view.py +++ b/o2ims/views/ocloud_view.py @@ -241,7 +241,7 @@ def subscription_create(subscriptionDto: SubscriptionDTO.subscription_create, filter = subscriptionDto.get('filter', '') consumer_subs_id = subscriptionDto.get('consumerSubscriptionId', '') - check_filter(ocloud.Resource, filter) + _check_subscription_filter(filter) sub_uuid = str(uuid.uuid4()) subscription = Subscription( @@ -263,6 +263,62 @@ def subscription_create(subscriptionDto: SubscriptionDTO.subscription_create, return first.serialize() +def _check_subscription_filter(filter: str): + if not filter: + return + + def _sub_filter_checking(sub_filter: str): + exprs = sub_filter.split(';') + objectType = False + objectTypeValue = '' + for expr in exprs: + expr_strip = expr.strip(' ()') + items = expr_strip.split(',') + if len(items) < 3: + raise BadRequestException("invalid filter {}".format(expr)) + item_key = items[1].strip() + if item_key != 'objectType': + continue + item_op = items[0].strip() + if item_op != 'eq': + raise BadRequestException( + "Filter objectType only support 'eq' operation") + objectType = True + objectTypeValue = items[2].strip() + if not objectType: + # if there has no objectType specific, by default is ResourceInfo + check_filter(ocloud.Resource, sub_filter) + # return 'ResourceInfo' + return + if objectTypeValue == 'ResourceTypeInfo': + check_filter(ocloud.ResourceType, sub_filter) + elif objectTypeValue == 'ResourcePoolInfo': + check_filter(ocloud.ResourcePool, sub_filter) + elif objectTypeValue == 'DeploymentManagerInfo': + check_filter(ocloud.DeploymentManager, sub_filter) + elif objectTypeValue == 'CloudInfo': + check_filter(ocloud.Ocloud, sub_filter) + elif objectTypeValue == 'ResourceInfo': + check_filter(ocloud.Resource, sub_filter) + else: + raise BadRequestException( + "Filter ObjectType {} not support.".format(items[2])) + # return objectTypeValue + filter_strip = filter.strip(' []') + filter_list = filter_strip.split('|') + # check_duplication = dict() + for sub_filter in filter_list: + _sub_filter_checking(sub_filter) + # obj_type = _sub_filter_checking(sub_filter) + # if obj_type not in check_duplication: + # check_duplication[obj_type] = 0 + # check_duplication[obj_type] += 1 + # if check_duplication[obj_type] > 1: + # raise BadRequestException( + # "Filter objectType {} only support one in each." + # .format(obj_type)) + + def subscription_delete(subscriptionId: str, uow: unit_of_work.AbstractUnitOfWork): with uow: -- 2.16.6