X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=o2ims%2Fservice%2Fcommand%2Fnotify_handler.py;h=88a0e90574bbac0cd38c2b63836c9980c20ae412;hb=e161cae314703d7717d55099566c2d3fe112ca52;hp=34470f3ebf6f2958aad68ff6542612c5bb28c76a;hpb=7e21b8d5ceabc857812194f7a865066e4f13ad85;p=pti%2Fo2.git diff --git a/o2ims/service/command/notify_handler.py b/o2ims/service/command/notify_handler.py index 34470f3..88a0e90 100644 --- a/o2ims/service/command/notify_handler.py +++ b/o2ims/service/command/notify_handler.py @@ -15,16 +15,11 @@ # import redis # import requests import json -import ssl -from urllib.parse import urlparse -# from o2common.config import config +# from o2common.config import conf from o2common.domain.filter import gen_orm_filter from o2common.service.unit_of_work import AbstractUnitOfWork -from o2common.service.command.handler import get_https_conn_default -from o2common.service.command.handler import get_http_conn -from o2common.service.command.handler import get_https_conn_selfsigned -from o2common.service.command.handler import post_data +from o2common.adapter.notifications import AbstractNotifications from o2ims.domain import commands, ocloud from o2ims.domain.subscription_obj import Subscription, Message2SMO, \ @@ -41,131 +36,138 @@ logger = o2logging.get_logger(__name__) def notify_change_to_smo( cmd: commands.PubMessage2SMO, uow: AbstractUnitOfWork, + notifications: AbstractNotifications, ): logger.debug('In notify_change_to_smo') msg_type = cmd.type if msg_type == 'ResourceType': - _notify_resourcetype(uow, cmd.data) + _notify_resourcetype(uow, notifications, cmd.data) elif msg_type == 'ResourcePool': - _notify_resourcepool(uow, cmd.data) + _notify_resourcepool(uow, notifications, cmd.data) elif msg_type == 'Dms': - _notify_dms(uow, cmd.data) + _notify_dms(uow, notifications, cmd.data) elif msg_type == 'Resource': - _notify_resource(uow, cmd.data) + _notify_resource(uow, notifications, cmd.data) -def _notify_resourcetype(uow, data): +def __get_object_type_and_value(sub_filter): + exprs = sub_filter.split(';') + for expr in exprs: + items = expr.strip(' ()').split(',') + item_key = items[1].strip() + if item_key == 'objectType': + return True, items[2].strip() + return False, '' + + +def handle_filter(filter: str, f_type: str): + print(filter) + if not filter: + return + + filter_list = filter.strip(' []').split('|') + if not filter_list: + return + + match_type_count = 0 + filters = [] + for sub_filter in filter_list: + objectType, objectTypeValue = __get_object_type_and_value(sub_filter) + if objectTypeValue == f_type: + match_type_count += 1 + filters.append(sub_filter) + elif not objectType and f_type == 'ResourceInfo': + match_type_count += 1 + filters.append(sub_filter) + + return match_type_count, filters + + +def check_filters(filters, sub_data, uow_cls, obj_cls, attr_id, id): + for filter in filters[1]: + logger.debug(f'filter: {filter}') + try: + args = gen_orm_filter(obj_cls, filter) + except KeyError: + logger.warning( + 'Subscription {} filter {} has wrong attribute ' + 'name or value. Ignore the filter.'.format( + sub_data['subscriptionId'], + sub_data['filter'])) + continue + logger.debug(f'args: {args}') + + if len(args) == 0 and 'objectType' in filter: + return False + + args.append(attr_id == id) + obj_count, _ = uow_cls.list_with_count(*args) + if obj_count > 0: + return True + return False + + +def _notify_resourcetype(uow, notifications, data): with uow: resource_type = uow.resource_types.get(data.id) if resource_type is None: logger.warning('ResourceType {} does not exists.'.format(data.id)) return - resource_type_dict = { - 'resourceTypeId': resource_type.resourceTypeId, - 'name': resource_type.name, - 'description': resource_type.description, - 'vendor': resource_type.vendor, - 'model': resource_type.model, - 'version': resource_type.version, - # 'alarmDictionary': resource_type.alarmDictionary.serialize() - } + + resource_type_dict = resource_type.get_notification_dict() subs = uow.subscriptions.list() for sub in subs: sub_data = sub.serialize() logger.debug('Subscription: {}'.format(sub_data['subscriptionId'])) filters = handle_filter(sub_data['filter'], 'ResourceTypeInfo') - if not filters: - callback_smo(sub, data, resource_type_dict) + logger.debug(f'filters: {filters}, sub_data: {sub_data}') + + if not filters or filters[0] == 0 or check_filters( + filters, sub_data, uow.resource_types, ocloud.ResourceType, + ocloud.ResourceType.resourceTypeId, data.id): + callback_smo(notifications, sub, data, resource_type_dict) continue - filter_hit = False - for filter in filters: - try: - args = gen_orm_filter(ocloud.ResourceType, filter) - except KeyError: - logger.warning( - 'Subscription {} filter {} has wrong attribute ' - 'name or value. Ignore the filter.'.format( - sub_data['subscriptionId'], - sub_data['filter'])) - continue - if len(args) == 0 and 'objectType' in filter: - filter_hit = True - break - args.append(ocloud.ResourceType.resourceTypeId == data.id) - obj_count, _ = uow.resource_types.list_with_count(*args) - if obj_count > 0: - filter_hit = True - break - if filter_hit: - logger.info('Subscription {} filter hit, skip ResourceType {}.' - .format(sub_data['subscriptionId'], data.id)) - else: - callback_smo(sub, data, resource_type_dict) + + logger.info('Subscription {} filter hit, skip ResourceType {}.' + .format(sub_data['subscriptionId'], data.id)) -def _notify_resourcepool(uow, data): +def _notify_resourcepool(uow, notifications, data): with uow: resource_pool = uow.resource_pools.get(data.id) if resource_pool is None: logger.warning('ResourcePool {} does not exists.'.format(data.id)) return - resource_pool_dict = { - 'resourcePoolId': resource_pool.resourcePoolId, - 'oCloudId': resource_pool.oCloudId, - 'globalLocationId': resource_pool.globalLocationId, - 'name': resource_pool.name, - 'description': resource_pool.description - } + + resource_pool_dict = resource_pool.get_notification_dict() subs = uow.subscriptions.list() for sub in subs: sub_data = sub.serialize() logger.debug('Subscription: {}'.format(sub_data['subscriptionId'])) filters = handle_filter(sub_data['filter'], 'ResourcePoolInfo') - if not filters: - callback_smo(sub, data, resource_pool_dict) + logger.debug(f'filters: {filters}, sub_data: {sub_data}') + + if not filters or filters[0] == 0 or check_filters( + filters, sub_data, uow.resource_pools, ocloud.ResourcePool, + ocloud.ResourcePool.resourcePoolId, data.id): + callback_smo(notifications, sub, data, resource_pool_dict) continue - filter_hit = False - for filter in filters: - try: - args = gen_orm_filter(ocloud.ResourcePool, filter) - except KeyError: - logger.warning( - 'Subscription {} filter {} has wrong attribute ' - 'name or value. Ignore the filter.'.format( - sub_data['subscriptionId'], - sub_data['filter'])) - continue - if len(args) == 0 and 'objectType' in filter: - filter_hit = True - break - args.append(ocloud.ResourcePool.resourcePoolId == data.id) - obj_count, _ = uow.resource_pools.list_with_count(*args) - if obj_count > 0: - filter_hit = True - break - if filter_hit: - logger.info('Subscription {} filter hit, skip ResourcePool {}.' - .format(sub_data['subscriptionId'], data.id)) - else: - callback_smo(sub, data, resource_pool_dict) + logger.info('Subscription {} filter hit, skip ResourcePool {}.' + .format(sub_data['subscriptionId'], data.id)) -def _notify_dms(uow, data): + +def _notify_dms(uow, notifications, data): with uow: dms = uow.deployment_managers.get(data.id) if dms is None: logger.warning( 'DeploymentManager {} does not exists.'.format(data.id)) return - dms_dict = { - 'deploymentManagerId': dms.deploymentManagerId, - 'name': dms.name, - 'description': dms.description, - 'oCloudId': dms.oCloudId, - 'serviceUri': dms.serviceUri - } + + dms_dict = dms.get_notification_dict() subs = uow.subscriptions.list() for sub in subs: @@ -173,38 +175,21 @@ def _notify_dms(uow, data): logger.debug('Subscription: {}'.format(sub_data['subscriptionId'])) filters = handle_filter( sub_data['filter'], 'DeploymentManagerInfo') - if not filters: - callback_smo(sub, data, dms_dict) + logger.debug(f'filters: {filters}, sub_data: {sub_data}') + + if not filters or filters[0] == 0 or check_filters( + filters, sub_data, uow.deployment_managers, + ocloud.DeploymentManager, + ocloud.DeploymentManager.deploymentManagerId, data.id): + callback_smo(notifications, sub, data, dms_dict) continue - filter_hit = False - for filter in filters: - try: - args = gen_orm_filter(ocloud.DeploymentManager, filter) - except KeyError: - logger.warning( - 'Subscription {} filter {} has wrong attribute ' - 'name or value. Ignore the filter.'.format( - sub_data['subscriptionId'], - sub_data['filter'])) - continue - if len(args) == 0 and 'objectType' in filter: - filter_hit = True - break - args.append( - ocloud.DeploymentManager.deploymentManagerId == data.id) - obj_count, _ = uow.deployment_managers.list_with_count(*args) - if obj_count > 0: - filter_hit = True - break - if filter_hit: - logger.info('Subscription {} filter hit, skip ' - 'DeploymentManager {}.' - .format(sub_data['subscriptionId'], data.id)) - else: - callback_smo(sub, data, dms_dict) + logger.info('Subscription {} filter hit, skip ' + 'DeploymentManager {}.' + .format(sub_data['subscriptionId'], data.id)) -def _notify_resource(uow, data): + +def _notify_resource(uow, notifications, data): with uow: resource = uow.resources.get(data.id) if resource is None: @@ -212,24 +197,21 @@ def _notify_resource(uow, data): return res_pool_id = resource.serialize()['resourcePoolId'] logger.debug('res pool id is {}'.format(res_pool_id)) - res_dict = { - 'resourceId': resource.resourceId, - 'description': resource.description, - 'resourceTypeId': resource.resourceTypeId, - 'resourcePoolId': resource.resourcePoolId, - 'globalAssetId': resource.globalAssetId - } + + res_dict = resource.get_notification_dict() subs = uow.subscriptions.list() for sub in subs: sub_data = sub.serialize() logger.debug('Subscription: {}'.format(sub_data['subscriptionId'])) filters = handle_filter(sub_data['filter'], 'ResourceInfo') - if not filters: - callback_smo(sub, data, res_dict) + if not filters or filters[0] == 0: + callback_smo(notifications, sub, data, res_dict) + continue + if filters[0] > 0 and not filters[1]: continue filter_hit = False - for filter in filters: + for filter in filters[1]: try: args = gen_orm_filter(ocloud.Resource, filter) except KeyError: @@ -252,37 +234,11 @@ def _notify_resource(uow, data): logger.info('Subscription {} filter hit, skip Resource {}.' .format(sub_data['subscriptionId'], data.id)) else: - callback_smo(sub, data, res_dict) + callback_smo(notifications, sub, data, res_dict) -def handle_filter(filter: str, f_type: str): - if not filter: - return - filter_strip = filter.strip(' []') - filter_list = filter_strip.split('|') - filters = list() - for sub_filter in filter_list: - exprs = sub_filter.split(';') - objectType = False - objectTypeValue = '' - for expr in exprs: - expr_strip = expr.strip(' ()') - items = expr_strip.split(',') - item_key = items[1].strip() - if item_key != 'objectType': - continue - objectType = True - objectTypeValue = items[2].strip() - if not objectType: - if f_type == 'ResourceInfo': - filters.append(sub_filter) - continue - if objectTypeValue == f_type: - filters.append(sub_filter) - return filters - - -def callback_smo(sub: Subscription, msg: Message2SMO, obj_dict: dict = None): +def callback_smo(notifications: AbstractNotifications, sub: Subscription, + msg: Message2SMO, obj_dict: dict = None): sub_data = sub.serialize() callback = { 'consumerSubscriptionId': sub_data['consumerSubscriptionId'], @@ -298,37 +254,7 @@ def callback_smo(sub: Subscription, msg: Message2SMO, obj_dict: dict = None): callback['postObjectState'] = json.dumps(obj_dict) if msg.notificationEventType == NotificationEventEnum.DELETE: callback.pop('objectRef') - callback_data = json.dumps(callback) logger.info('callback URL: {}'.format(sub_data['callback'])) - logger.debug('callback data: {}'.format(callback_data)) - - # Call SMO through the SMO callback url - o = urlparse(sub_data['callback']) - if o.scheme == 'https': - conn = get_https_conn_default(o.netloc) - else: - conn = get_http_conn(o.netloc) - try: - rst, status = post_data(conn, o.path, callback_data) - if rst is True: - logger.info( - 'Notify to SMO successed with status: {}'.format(status)) - return - logger.error('Notify Response code is: {}'.format(status)) - except ssl.SSLCertVerificationError as e: - logger.debug( - 'Notify try to post data with trusted ca failed: {}'.format(e)) - if 'self signed' in str(e): - conn = get_https_conn_selfsigned(o.netloc) - try: - return post_data(conn, o.path, callback_data) - except Exception as e: - logger.info( - 'Notify post data with self-signed ca \ - failed: {}'.format(e)) - # TODO: write the status to extension db table. - return False - return False - except Exception as e: - logger.critical('Notify except: {}'.format(e)) - return False + logger.debug('callback data: {}'.format(json.dumps(callback))) + + return notifications.send(sub_data['callback'], callback)