X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=o2ims%2Fservice%2Fcommand%2Fnotify_handler.py;h=19363f92992440afe99b605a033d4def09236892;hb=250c3960b727b9549581b50b88849218ef951609;hp=947942fc86e5532be8cf2404e5d43be271c44767;hpb=af290d19532a5595ed22d56d0006ec486390f8de;p=pti%2Fo2.git diff --git a/o2ims/service/command/notify_handler.py b/o2ims/service/command/notify_handler.py index 947942f..19363f9 100644 --- a/o2ims/service/command/notify_handler.py +++ b/o2ims/service/command/notify_handler.py @@ -15,16 +15,11 @@ # import redis # import requests import json -import ssl -from urllib.parse import urlparse -# from o2common.config import config +# from o2common.config import conf from o2common.domain.filter import gen_orm_filter from o2common.service.unit_of_work import AbstractUnitOfWork -from o2common.service.command.handler import get_https_conn_default -from o2common.service.command.handler import get_http_conn -from o2common.service.command.handler import get_https_conn_selfsigned -from o2common.service.command.handler import post_data +from o2common.adapter.notifications import AbstractNotifications from o2ims.domain import commands, ocloud from o2ims.domain.subscription_obj import Subscription, Message2SMO, \ @@ -41,20 +36,21 @@ logger = o2logging.get_logger(__name__) def notify_change_to_smo( cmd: commands.PubMessage2SMO, uow: AbstractUnitOfWork, + notifications: AbstractNotifications, ): logger.debug('In notify_change_to_smo') msg_type = cmd.type if msg_type == 'ResourceType': - _notify_resourcetype(uow, cmd.data) + _notify_resourcetype(uow, notifications, cmd.data) elif msg_type == 'ResourcePool': - _notify_resourcepool(uow, cmd.data) + _notify_resourcepool(uow, notifications, cmd.data) elif msg_type == 'Dms': - _notify_dms(uow, cmd.data) + _notify_dms(uow, notifications, cmd.data) elif msg_type == 'Resource': - _notify_resource(uow, cmd.data) + _notify_resource(uow, notifications, cmd.data) -def _notify_resourcetype(uow, data): +def _notify_resourcetype(uow, notifications, data): with uow: resource_type = uow.resource_types.get(data.id) if resource_type is None: @@ -76,9 +72,9 @@ def _notify_resourcetype(uow, data): logger.debug('Subscription: {}'.format(sub_data['subscriptionId'])) filters = handle_filter(sub_data['filter'], 'ResourceTypeInfo') if not filters: - callback_smo(sub, data, resource_type_dict) + callback_smo(notifications, sub, data, resource_type_dict) continue - filter_effect = 0 + filter_hit = False for filter in filters: try: args = gen_orm_filter(ocloud.ResourceType, filter) @@ -90,23 +86,21 @@ def _notify_resourcetype(uow, data): sub_data['filter'])) continue if len(args) == 0 and 'objectType' in filter: - filter_effect += 1 + filter_hit = True break args.append(ocloud.ResourceType.resourceTypeId == data.id) - ret = uow.resource_types.list_with_count(*args) - if ret[0] > 0: - logger.debug( - 'ResourcePool {} skip for subscription {} because of' - ' the filter.' - .format(data.id, sub_data['subscriptionId'])) - filter_effect += 1 + obj_count, _ = uow.resource_types.list_with_count(*args) + if obj_count > 0: + filter_hit = True break - if filter_effect > 0: - continue - callback_smo(sub, data, resource_type_dict) + if filter_hit: + logger.info('Subscription {} filter hit, skip ResourceType {}.' + .format(sub_data['subscriptionId'], data.id)) + else: + callback_smo(notifications, sub, data, resource_type_dict) -def _notify_resourcepool(uow, data): +def _notify_resourcepool(uow, notifications, data): with uow: resource_pool = uow.resource_pools.get(data.id) if resource_pool is None: @@ -126,9 +120,9 @@ def _notify_resourcepool(uow, data): logger.debug('Subscription: {}'.format(sub_data['subscriptionId'])) filters = handle_filter(sub_data['filter'], 'ResourcePoolInfo') if not filters: - callback_smo(sub, data, resource_pool_dict) + callback_smo(notifications, sub, data, resource_pool_dict) continue - filter_effect = 0 + filter_hit = False for filter in filters: try: args = gen_orm_filter(ocloud.ResourcePool, filter) @@ -140,23 +134,21 @@ def _notify_resourcepool(uow, data): sub_data['filter'])) continue if len(args) == 0 and 'objectType' in filter: - filter_effect += 1 + filter_hit = True break args.append(ocloud.ResourcePool.resourcePoolId == data.id) - ret = uow.resource_pools.list_with_count(*args) - if ret[0] > 0: - logger.debug( - 'ResourcePool {} skip for subscription {} because of' - ' the filter.' - .format(data.id, sub_data['subscriptionId'])) - filter_effect += 1 + obj_count, _ = uow.resource_pools.list_with_count(*args) + if obj_count > 0: + filter_hit = True break - if filter_effect > 0: - continue - callback_smo(sub, data, resource_pool_dict) + if filter_hit: + logger.info('Subscription {} filter hit, skip ResourcePool {}.' + .format(sub_data['subscriptionId'], data.id)) + else: + callback_smo(notifications, sub, data, resource_pool_dict) -def _notify_dms(uow, data): +def _notify_dms(uow, notifications, data): with uow: dms = uow.deployment_managers.get(data.id) if dms is None: @@ -178,9 +170,9 @@ def _notify_dms(uow, data): filters = handle_filter( sub_data['filter'], 'DeploymentManagerInfo') if not filters: - callback_smo(sub, data, dms_dict) + callback_smo(notifications, sub, data, dms_dict) continue - filter_effect = 0 + filter_hit = False for filter in filters: try: args = gen_orm_filter(ocloud.DeploymentManager, filter) @@ -192,24 +184,23 @@ def _notify_dms(uow, data): sub_data['filter'])) continue if len(args) == 0 and 'objectType' in filter: - filter_effect += 1 + filter_hit = True break args.append( ocloud.DeploymentManager.deploymentManagerId == data.id) - ret = uow.deployment_managers.list_with_count(*args) - if ret[0] > 0: - logger.debug( - 'DeploymentManager {} skip for subscription {} because' - ' of the filter.' - .format(data.id, sub_data['subscriptionId'])) - filter_effect += 1 + obj_count, _ = uow.deployment_managers.list_with_count(*args) + if obj_count > 0: + filter_hit = True break - if filter_effect > 0: - continue - callback_smo(sub, data, dms_dict) + if filter_hit: + logger.info('Subscription {} filter hit, skip ' + 'DeploymentManager {}.' + .format(sub_data['subscriptionId'], data.id)) + else: + callback_smo(notifications, sub, data, dms_dict) -def _notify_resource(uow, data): +def _notify_resource(uow, notifications, data): with uow: resource = uow.resources.get(data.id) if resource is None: @@ -231,9 +222,9 @@ def _notify_resource(uow, data): logger.debug('Subscription: {}'.format(sub_data['subscriptionId'])) filters = handle_filter(sub_data['filter'], 'ResourceInfo') if not filters: - callback_smo(sub, data, res_dict) + callback_smo(notifications, sub, data, res_dict) continue - filter_effect = 0 + filter_hit = False for filter in filters: try: args = gen_orm_filter(ocloud.Resource, filter) @@ -245,20 +236,19 @@ def _notify_resource(uow, data): sub_data['filter'])) continue if len(args) == 0 and 'objectType' in filter: - filter_effect += 1 + filter_hit = True break args.append(ocloud.Resource.resourceId == data.id) - ret = uow.resources.list_with_count(res_pool_id, *args) - if ret[0] > 0: - logger.debug( - 'Resource {} skip for subscription {} because of ' - 'the filter.' - .format(data.id, sub_data['subscriptionId'])) - filter_effect += 1 + obj_count, _ = uow.resources.list_with_count( + res_pool_id, *args) + if obj_count > 0: + filter_hit = True break - if filter_effect > 0: - continue - callback_smo(sub, data, res_dict) + if filter_hit: + logger.info('Subscription {} filter hit, skip Resource {}.' + .format(sub_data['subscriptionId'], data.id)) + else: + callback_smo(notifications, sub, data, res_dict) def handle_filter(filter: str, f_type: str): @@ -288,7 +278,8 @@ def handle_filter(filter: str, f_type: str): return filters -def callback_smo(sub: Subscription, msg: Message2SMO, obj_dict: dict = None): +def callback_smo(notifications: AbstractNotifications, sub: Subscription, + msg: Message2SMO, obj_dict: dict = None): sub_data = sub.serialize() callback = { 'consumerSubscriptionId': sub_data['consumerSubscriptionId'], @@ -305,36 +296,37 @@ def callback_smo(sub: Subscription, msg: Message2SMO, obj_dict: dict = None): if msg.notificationEventType == NotificationEventEnum.DELETE: callback.pop('objectRef') callback_data = json.dumps(callback) - logger.info('URL: {}'.format(sub_data['callback'])) + logger.info('callback URL: {}'.format(sub_data['callback'])) logger.debug('callback data: {}'.format(callback_data)) + return notifications.send(sub_data['callback'], callback_data) + # Call SMO through the SMO callback url - o = urlparse(sub_data['callback']) - if o.scheme == 'https': - conn = get_https_conn_default(o.netloc) - else: - conn = get_http_conn(o.netloc) - try: - rst, status = post_data(conn, o.path, callback_data) - if rst is True: - logger.info( - 'Notify to SMO successed with status: {}'.format(status)) - return - logger.error('Notify Response code is: {}'.format(status)) - except ssl.SSLCertVerificationError as e: - logger.debug( - 'Notify try to post data with trusted ca failed: {}'.format(e)) - if 'self signed' in str(e): - conn = get_https_conn_selfsigned(o.netloc) - try: - return post_data(conn, o.path, callback_data) - except Exception as e: - logger.info( - 'Notify post data with self-signed ca \ - failed: {}'.format(e)) - # TODO: write the status to extension db table. - return False - return False - except Exception as e: - logger.critical('Notify except: {}'.format(e)) - return False + # o = urlparse(sub_data['callback']) + # if o.scheme == 'https': + # conn = get_https_conn_default(o.netloc) + # else: + # conn = get_http_conn(o.netloc) + # try: + # rst, status = post_data(conn, o.path, callback_data) + # if rst is True: + # logger.info( + # 'Notify to SMO successed with status: {}'.format(status)) + # return + # logger.error('Notify Response code is: {}'.format(status)) + # except ssl.SSLCertVerificationError as e: + # logger.debug( + # 'Notify try to post data with trusted ca failed: {}'.format(e)) + # if 'self signed' in str(e): + # conn = get_https_conn_selfsigned(o.netloc) + # try: + # return post_data(conn, o.path, callback_data) + # except Exception as e: + # logger.info( + # 'Notify post data with self-signed ca \ + # failed: {}'.format(e)) + # return False + # return False + # except Exception as e: + # logger.critical('Notify except: {}'.format(e)) + # return False