X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=o2ims%2Fservice%2Fcommand%2Fnotify_handler.py;h=19ff0c0520c72812e207685f667468a24ae3646e;hb=a1c1b38b2d8e83337cdc963922a0d60c5a9c76c1;hp=8f134aff32944bc2ce9352b1094139b70d26114a;hpb=242185955154b1d709c2fdbce5445e4aae8058ac;p=pti%2Fo2.git diff --git a/o2ims/service/command/notify_handler.py b/o2ims/service/command/notify_handler.py index 8f134af..19ff0c0 100644 --- a/o2ims/service/command/notify_handler.py +++ b/o2ims/service/command/notify_handler.py @@ -12,20 +12,19 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json # import redis # import requests -from urllib.parse import urlparse +import json -# from o2common.config import config +# from o2common.config import conf +from o2common.domain.filter import gen_orm_filter from o2common.service.unit_of_work import AbstractUnitOfWork -from o2ims.domain import commands -from o2ims.domain.subscription_obj import Subscription, Message2SMO -from o2common.service.command.handler import get_https_conn_default -from o2common.service.command.handler import get_http_conn -from o2common.service.command.handler import get_https_conn_selfsigned -from o2common.service.command.handler import post_data -import ssl +from o2common.adapter.notifications import AbstractNotifications + +from o2ims.domain import commands, ocloud +from o2ims.domain.subscription_obj import Subscription, Message2SMO, \ + NotificationEventEnum + from o2common.helper import o2logging logger = o2logging.get_logger(__name__) @@ -37,84 +36,398 @@ logger = o2logging.get_logger(__name__) def notify_change_to_smo( cmd: commands.PubMessage2SMO, uow: AbstractUnitOfWork, + notifications: AbstractNotifications, ): - logger.info('In notify_change_to_smo') - data = cmd.data + logger.debug('In notify_change_to_smo') + msg_type = cmd.type + if msg_type == 'ResourceType': + _notify_resourcetype(uow, notifications, cmd.data) + elif msg_type == 'ResourcePool': + _notify_resourcepool(uow, notifications, cmd.data) + elif msg_type == 'Dms': + _notify_dms(uow, notifications, cmd.data) + elif msg_type == 'Resource': + _notify_resource(uow, notifications, cmd.data) + + +# def get_resource_dict(resource_type): +# return { +# 'resourceTypeId': resource_type.resourceTypeId, +# 'name': resource_type.name, +# 'description': resource_type.description, +# 'vendor': resource_type.vendor, +# 'model': resource_type.model, +# 'version': resource_type.version, +# } +# +# +# def handle_filter(filter: str, f_type: str): +# if not filter: +# return +# +# filter_list = filter.strip(' []').split('|') +# +# match_type_count = 0 +# filters = [] +# for sub_filter in filter_list: +# objectType, objectTypeValue = get_object_type_and_value(sub_filter) +# if objectTypeValue == f_type: +# match_type_count += 1 +# filters.append(sub_filter) +# elif not objectType and f_type == 'ResourceInfo': +# filters.append(sub_filter) +# +# return match_type_count, filters +# +# +# def get_object_type_and_value(sub_filter): +# exprs = sub_filter.split(';') +# for expr in exprs: +# items = expr.strip(' ()').split(',') +# item_key = items[1].strip() +# if item_key == 'objectType': +# return True, items[2].strip() +# return False, '' +# +# +# def check_filters(filters, sub_data, uow, id): +# for filter in filters[1]: +# if isinstance(filter, bool) and filter: +# return True +# +# try: +# args = gen_orm_filter(ocloud.ResourceType, filter) +# except KeyError: +# logger.warning( +# 'Subscription {} filter {} has wrong attribute ' +# 'name or value. Ignore the filter.'.format( +# sub_data['subscriptionId'], +# sub_data['filter'])) +# continue +# +# if len(args) == 0 and 'objectType' in filter: +# return True +# +# args.append(ocloud.ResourceType.resourceTypeId == id) +# obj_count, _ = uow.resource_types.list_with_count(*args) +# if obj_count > 0: +# return True +# return False +# +# +# def _notify_resourcetype(uow, notifications, data): +# with uow: +# resource_type = uow.resource_types.get(data.id) +# if resource_type is None: +# logger.warning('ResourceType {} does not exists.'.format(data.id)) +# return +# +# resource_type_dict = get_resource_dict(resource_type) +# +# subs = uow.subscriptions.list() +# for sub in subs: +# sub_data = sub.serialize() +# filters = handle_filter(sub_data['filter'], 'ResourceTypeInfo') +# logger.debug(f'filters: {filters}, sub_data: {sub_data}') +# +# if not filters or filters[0] == 0 or check_filters( +# filters, sub_data, uow, data.id): +# callback_smo(notifications, sub, data, resource_type_dict) +# continue +# +# logger.info('Subscription {} filter hit, skip ResourceType {}.' +# .format(sub_data['subscriptionId'], data.id)) + +def _notify_resourcetype(uow, notifications, data): with uow: + resource_type = uow.resource_types.get(data.id) + if resource_type is None: + logger.warning('ResourceType {} does not exists.'.format(data.id)) + return + resource_type_dict = { + 'resourceTypeId': resource_type.resourceTypeId, + 'name': resource_type.name, + 'description': resource_type.description, + 'vendor': resource_type.vendor, + 'model': resource_type.model, + 'version': resource_type.version, + # 'alarmDictionary': resource_type.alarmDictionary.serialize() + } + subs = uow.subscriptions.list() for sub in subs: sub_data = sub.serialize() logger.debug('Subscription: {}'.format(sub_data['subscriptionId'])) + filters = handle_filter(sub_data['filter'], 'ResourceTypeInfo') + if not filters or filters[0] == 0: + callback_smo(notifications, sub, data, resource_type_dict) + continue + if filters[0] > 0 and not filters[1]: + continue + + filter_hit = False + for filter in filters[1]: + try: + args = gen_orm_filter(ocloud.ResourceType, filter) + except KeyError: + logger.warning( + 'Subscription {} filter {} has wrong attribute ' + 'name or value. Ignore the filter.'.format( + sub_data['subscriptionId'], + sub_data['filter'])) + continue + if len(args) == 0 and 'objectType' in filter: + filter_hit = True + break + args.append(ocloud.ResourceType.resourceTypeId == data.id) + obj_count, _ = uow.resource_types.list_with_count(*args) + if obj_count > 0: + filter_hit = True + break + if filter_hit: + logger.info('Subscription {} filter hit, skip ResourceType {}.' + .format(sub_data['subscriptionId'], data.id)) + else: + callback_smo(notifications, sub, data, resource_type_dict) - try: - resource_filter = json.loads(sub_data['filter']) - if len(resource_filter) > 0: - resource = uow.resources.get(data.id) - logger.debug(type(resource)) - if resource: # TODO deal with resource is empty - res_type_id = resource.serialize()['resourceTypeId'] - resourcetype = uow.resource_types.get(res_type_id) - logger.debug(resourcetype.name) - if resourcetype.name not in resource_filter: - continue - except json.decoder.JSONDecodeError as err: - logger.warning( - 'subscription filter decode json failed: {}'.format(err)) - - callback_smo(sub, data) - - -def callback_smo(sub: Subscription, msg: Message2SMO): + +def _notify_resourcepool(uow, notifications, data): + with uow: + resource_pool = uow.resource_pools.get(data.id) + if resource_pool is None: + logger.warning('ResourcePool {} does not exists.'.format(data.id)) + return + resource_pool_dict = { + 'resourcePoolId': resource_pool.resourcePoolId, + 'oCloudId': resource_pool.oCloudId, + 'globalLocationId': resource_pool.globalLocationId, + 'name': resource_pool.name, + 'description': resource_pool.description + } + + subs = uow.subscriptions.list() + for sub in subs: + sub_data = sub.serialize() + logger.debug('Subscription: {}'.format(sub_data['subscriptionId'])) + filters = handle_filter(sub_data['filter'], 'ResourcePoolInfo') + if not filters or filters[0] == 0: + callback_smo(notifications, sub, data, resource_pool_dict) + continue + if filters[0] > 0 and not filters[1]: + continue + filter_hit = False + for filter in filters[1]: + try: + args = gen_orm_filter(ocloud.ResourcePool, filter) + except KeyError: + logger.warning( + 'Subscription {} filter {} has wrong attribute ' + 'name or value. Ignore the filter.'.format( + sub_data['subscriptionId'], + sub_data['filter'])) + continue + if len(args) == 0 and 'objectType' in filter: + filter_hit = True + break + args.append(ocloud.ResourcePool.resourcePoolId == data.id) + obj_count, _ = uow.resource_pools.list_with_count(*args) + if obj_count > 0: + filter_hit = True + break + if filter_hit: + logger.info('Subscription {} filter hit, skip ResourcePool {}.' + .format(sub_data['subscriptionId'], data.id)) + else: + callback_smo(notifications, sub, data, resource_pool_dict) + + +def _notify_dms(uow, notifications, data): + with uow: + dms = uow.deployment_managers.get(data.id) + if dms is None: + logger.warning( + 'DeploymentManager {} does not exists.'.format(data.id)) + return + dms_dict = { + 'deploymentManagerId': dms.deploymentManagerId, + 'name': dms.name, + 'description': dms.description, + 'oCloudId': dms.oCloudId, + 'serviceUri': dms.serviceUri + } + + subs = uow.subscriptions.list() + for sub in subs: + sub_data = sub.serialize() + logger.debug('Subscription: {}'.format(sub_data['subscriptionId'])) + filters_rst = handle_filter( + sub_data['filter'], 'DeploymentManagerInfo') + if not filters_rst or filters_rst[0] == 0: + callback_smo(notifications, sub, data, dms_dict) + continue + if filters_rst[0] > 0 and not filters_rst[1]: + continue + filter_hit = False + for filter in filters_rst[1]: + try: + args = gen_orm_filter(ocloud.DeploymentManager, filter) + except KeyError: + logger.warning( + 'Subscription {} filter {} has wrong attribute ' + 'name or value. Ignore the filter.'.format( + sub_data['subscriptionId'], + sub_data['filter'])) + continue + if len(args) == 0 and 'objectType' in filter: + filter_hit = True + break + args.append( + ocloud.DeploymentManager.deploymentManagerId == data.id) + obj_count, _ = uow.deployment_managers.list_with_count(*args) + if obj_count > 0: + filter_hit = True + break + if filter_hit: + logger.info('Subscription {} filter hit, skip ' + 'DeploymentManager {}.' + .format(sub_data['subscriptionId'], data.id)) + else: + continue + # callback_smo(notifications, sub, data, dms_dict) + + +def _notify_resource(uow, notifications, data): + with uow: + resource = uow.resources.get(data.id) + if resource is None: + logger.warning('Resource {} does not exists.'.format(data.id)) + return + res_pool_id = resource.serialize()['resourcePoolId'] + logger.debug('res pool id is {}'.format(res_pool_id)) + res_dict = { + 'resourceId': resource.resourceId, + 'description': resource.description, + 'resourceTypeId': resource.resourceTypeId, + 'resourcePoolId': resource.resourcePoolId, + 'globalAssetId': resource.globalAssetId + } + + subs = uow.subscriptions.list() + for sub in subs: + sub_data = sub.serialize() + logger.debug('Subscription: {}'.format(sub_data['subscriptionId'])) + filters = handle_filter(sub_data['filter'], 'ResourceInfo') + if not filters or filters[0] == 0: + callback_smo(notifications, sub, data, res_dict) + continue + if filters[0] > 0 and not filters[1]: + continue + filter_hit = False + for filter in filters[1]: + try: + args = gen_orm_filter(ocloud.Resource, filter) + except KeyError: + logger.warning( + 'Subscription {} filter {} has wrong attribute ' + 'name or value. Ignore the filter.'.format( + sub_data['subscriptionId'], + sub_data['filter'])) + continue + if len(args) == 0 and 'objectType' in filter: + filter_hit = True + break + args.append(ocloud.Resource.resourceId == data.id) + obj_count, _ = uow.resources.list_with_count( + res_pool_id, *args) + if obj_count > 0: + filter_hit = True + break + if filter_hit: + logger.info('Subscription {} filter hit, skip Resource {}.' + .format(sub_data['subscriptionId'], data.id)) + else: + callback_smo(notifications, sub, data, res_dict) + + +def handle_filter(filter: str, f_type: str): + if not filter: + return + match_type_count = 0 + filter_strip = filter.strip(' []') + filter_list = filter_strip.split('|') + if not filter_list: + return + filters = list() + for sub_filter in filter_list: + exprs = sub_filter.split(';') + objectType = False + objectTypeValue = '' + for expr in exprs: + expr_strip = expr.strip(' ()') + items = expr_strip.split(',') + item_key = items[1].strip() + if item_key != 'objectType': + continue + objectType = True + objectTypeValue = items[2].strip() + if not objectType: + if f_type == 'ResourceInfo': + filters.append(sub_filter) + continue + if objectTypeValue == f_type: + match_type_count += 1 + filters.append(sub_filter) + return (match_type_count, filters) + + +def callback_smo(notifications: AbstractNotifications, sub: Subscription, + msg: Message2SMO, obj_dict: dict = None): sub_data = sub.serialize() - callback_data = json.dumps({ + callback = { 'consumerSubscriptionId': sub_data['consumerSubscriptionId'], 'notificationEventType': msg.notificationEventType, 'objectRef': msg.objectRef, 'updateTime': msg.updatetime - }) - logger.info('URL: {}, data: {}'.format( - sub_data['callback'], callback_data)) - # r.publish(sub_data['subscriptionId'], json.dumps({ - # 'consumerSubscriptionId': sub_data['consumerSubscriptionId'], - # 'notificationEventType': msg.notificationEventType, - # 'objectRef': msg.objectRef - # })) + } + if msg.notificationEventType in [NotificationEventEnum.DELETE, + NotificationEventEnum.MODIFY]: + callback['priorObjectState'] = json.dumps(obj_dict) + if msg.notificationEventType in [NotificationEventEnum.CREATE, + NotificationEventEnum.MODIFY]: + callback['postObjectState'] = json.dumps(obj_dict) + if msg.notificationEventType == NotificationEventEnum.DELETE: + callback.pop('objectRef') + logger.info('callback URL: {}'.format(sub_data['callback'])) + logger.debug('callback data: {}'.format(json.dumps(callback))) + + return notifications.send(sub_data['callback'], callback) + + # Call SMO through the SMO callback url + # o = urlparse(sub_data['callback']) + # if o.scheme == 'https': + # conn = get_https_conn_default(o.netloc) + # else: + # conn = get_http_conn(o.netloc) # try: - # headers = {'User-Agent': 'Mozilla/5.0'} - # resp = requests.post(sub_data['callback'], data=callback_data, - # headers=headers) - # if resp.status_code == 202 or resp.status_code == 200: - # logger.info('Notify to SMO successed') + # rst, status = post_data(conn, o.path, callback_data) + # if rst is True: + # logger.info( + # 'Notify to SMO successed with status: {}'.format(status)) # return - # logger.error('Response code is: {}'.format(resp.status_code)) - # except requests.exceptions.HTTPError as err: - # logger.error('request smo error: {}'.format(err)) - o = urlparse(sub_data['callback']) - if o.scheme == 'https': - conn = get_https_conn_default(o.netloc) - else: - conn = get_http_conn(o.netloc) - try: - rst, status = post_data(conn, o.path, callback_data) - if rst is True: - logger.info( - 'Notify to SMO successed with status: {}'.format(status)) - return - logger.error('Notify Response code is: {}'.format(status)) - except ssl.SSLCertVerificationError as e: - logger.debug( - 'Notify try to post data with trusted ca failed: {}'.format(e)) - if 'self signed' in str(e): - conn = get_https_conn_selfsigned(o.netloc) - try: - return post_data(conn, o.path, callback_data) - except Exception as e: - logger.info( - 'Notify post data with self-signed ca \ - failed: {}'.format(e)) - # TODO: write the status to extension db table. - return False - return False - except Exception as e: - logger.critical('Notify except: {}'.format(e)) - return False + # logger.error('Notify Response code is: {}'.format(status)) + # except ssl.SSLCertVerificationError as e: + # logger.debug( + # 'Notify try to post data with trusted ca failed: {}'.format(e)) + # if 'self signed' in str(e): + # conn = get_https_conn_selfsigned(o.netloc) + # try: + # return post_data(conn, o.path, callback_data) + # except Exception as e: + # logger.info( + # 'Notify post data with self-signed ca \ + # failed: {}'.format(e)) + # return False + # return False + # except Exception as e: + # logger.critical('Notify except: {}'.format(e)) + # return False