From: Zhang Rong(Jon) Date: Wed, 30 Nov 2022 07:18:08 +0000 (+0800) Subject: Fix inventory subscription filter with 'neq'; fix CloudInfo notification X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=4cf8adc69fbc8f44cdd6d978cebed0119bc5a216;p=pti%2Fo2.git Fix inventory subscription filter with 'neq'; fix CloudInfo notification Signed-off-by: Zhang Rong(Jon) Change-Id: Ib62345109abb296878348f0a32e3daba924babf7 --- diff --git a/o2app/entrypoints/redis_eventconsumer.py b/o2app/entrypoints/redis_eventconsumer.py index 1410f76..458514f 100644 --- a/o2app/entrypoints/redis_eventconsumer.py +++ b/o2app/entrypoints/redis_eventconsumer.py @@ -120,8 +120,8 @@ def handle_changed(m, bus): data = json.loads(datastr) logger.info('OcloudChanged with cmd:{}'.format(data)) cmd = imscmd.Register2SMO(data=RegistrationMessage( - data['notificationEventType'], - id=data['id'])) + id=data['id'], eventtype=data['notificationEventType'], + updatetime=data['updatetime'])) bus.handle(cmd) elif channel == 'AlarmEventChanged': datastr = m['data'] diff --git a/o2ims/adapter/ocloud_repository.py b/o2ims/adapter/ocloud_repository.py index 23d4259..a026629 100644 --- a/o2ims/adapter/ocloud_repository.py +++ b/o2ims/adapter/ocloud_repository.py @@ -35,8 +35,9 @@ class OcloudSqlAlchemyRepository(OcloudRepository): return self.session.query(ocloud.Ocloud).filter_by( oCloudId=ocloud_id).first() - def _list(self) -> List[ocloud.Ocloud]: - return self.session.query(ocloud.Ocloud) + def _list(self, *args) -> List[ocloud.Ocloud]: + return self.session.query(ocloud.Ocloud).filter(*args).order_by( + 'oCloudId') def _update(self, ocloud: ocloud.Ocloud): self.session.add(ocloud) diff --git a/o2ims/domain/ocloud_repo.py b/o2ims/domain/ocloud_repo.py index 4c4a075..cf2d979 100644 --- a/o2ims/domain/ocloud_repo.py +++ b/o2ims/domain/ocloud_repo.py @@ -31,8 +31,8 @@ class OcloudRepository(abc.ABC): self.seen.add(ocloud) return ocloud - def list(self) -> List[ocloud.Ocloud]: - return self._list() + def list(self, *args) -> List[ocloud.Ocloud]: + return self._list(*args) def update(self, ocloud: ocloud.Ocloud): self._update(ocloud) @@ -49,6 +49,10 @@ class OcloudRepository(abc.ABC): def _get(self, ocloud_id) -> ocloud.Ocloud: raise NotImplementedError + @abc.abstractmethod + def _list(self, *args) -> List[ocloud.Ocloud]: + raise NotImplementedError + @abc.abstractmethod def _update(self, ocloud: ocloud.Ocloud): raise NotImplementedError diff --git a/o2ims/domain/subscription_obj.py b/o2ims/domain/subscription_obj.py index c23a9ea..dfa950d 100644 --- a/o2ims/domain/subscription_obj.py +++ b/o2ims/domain/subscription_obj.py @@ -47,9 +47,11 @@ class Message2SMO(Serializer): class RegistrationMessage(Serializer): - def __init__(self, eventtype: NotificationEventEnum, id: str = '') -> None: + def __init__(self, eventtype: NotificationEventEnum, id: str = '', + updatetime: str = '') -> None: self.notificationEventType = eventtype self.id = id + self.updatetime = updatetime @dataclass diff --git a/o2ims/service/command/notify_alarm_handler.py b/o2ims/service/command/notify_alarm_handler.py index dd32b96..3d44e64 100644 --- a/o2ims/service/command/notify_alarm_handler.py +++ b/o2ims/service/command/notify_alarm_handler.py @@ -99,7 +99,7 @@ def callback_smo(sub: AlarmSubscription, msg: AlarmEvent2SMO, } # logger.warning(callback) callback_data = json.dumps(callback) - logger.info('URL: {}'.format(sub_data['callback'])) + logger.info('callback URL: {}'.format(sub_data['callback'])) logger.debug('callback data: {}'.format(callback_data)) o = urlparse(sub_data['callback']) diff --git a/o2ims/service/command/notify_handler.py b/o2ims/service/command/notify_handler.py index 947942f..34470f3 100644 --- a/o2ims/service/command/notify_handler.py +++ b/o2ims/service/command/notify_handler.py @@ -78,7 +78,7 @@ def _notify_resourcetype(uow, data): if not filters: callback_smo(sub, data, resource_type_dict) continue - filter_effect = 0 + filter_hit = False for filter in filters: try: args = gen_orm_filter(ocloud.ResourceType, filter) @@ -90,20 +90,18 @@ def _notify_resourcetype(uow, data): sub_data['filter'])) continue if len(args) == 0 and 'objectType' in filter: - filter_effect += 1 + filter_hit = True break args.append(ocloud.ResourceType.resourceTypeId == data.id) - ret = uow.resource_types.list_with_count(*args) - if ret[0] > 0: - logger.debug( - 'ResourcePool {} skip for subscription {} because of' - ' the filter.' - .format(data.id, sub_data['subscriptionId'])) - filter_effect += 1 + obj_count, _ = uow.resource_types.list_with_count(*args) + if obj_count > 0: + filter_hit = True break - if filter_effect > 0: - continue - callback_smo(sub, data, resource_type_dict) + if filter_hit: + logger.info('Subscription {} filter hit, skip ResourceType {}.' + .format(sub_data['subscriptionId'], data.id)) + else: + callback_smo(sub, data, resource_type_dict) def _notify_resourcepool(uow, data): @@ -128,7 +126,7 @@ def _notify_resourcepool(uow, data): if not filters: callback_smo(sub, data, resource_pool_dict) continue - filter_effect = 0 + filter_hit = False for filter in filters: try: args = gen_orm_filter(ocloud.ResourcePool, filter) @@ -140,20 +138,18 @@ def _notify_resourcepool(uow, data): sub_data['filter'])) continue if len(args) == 0 and 'objectType' in filter: - filter_effect += 1 + filter_hit = True break args.append(ocloud.ResourcePool.resourcePoolId == data.id) - ret = uow.resource_pools.list_with_count(*args) - if ret[0] > 0: - logger.debug( - 'ResourcePool {} skip for subscription {} because of' - ' the filter.' - .format(data.id, sub_data['subscriptionId'])) - filter_effect += 1 + obj_count, _ = uow.resource_pools.list_with_count(*args) + if obj_count > 0: + filter_hit = True break - if filter_effect > 0: - continue - callback_smo(sub, data, resource_pool_dict) + if filter_hit: + logger.info('Subscription {} filter hit, skip ResourcePool {}.' + .format(sub_data['subscriptionId'], data.id)) + else: + callback_smo(sub, data, resource_pool_dict) def _notify_dms(uow, data): @@ -180,7 +176,7 @@ def _notify_dms(uow, data): if not filters: callback_smo(sub, data, dms_dict) continue - filter_effect = 0 + filter_hit = False for filter in filters: try: args = gen_orm_filter(ocloud.DeploymentManager, filter) @@ -192,21 +188,20 @@ def _notify_dms(uow, data): sub_data['filter'])) continue if len(args) == 0 and 'objectType' in filter: - filter_effect += 1 + filter_hit = True break args.append( ocloud.DeploymentManager.deploymentManagerId == data.id) - ret = uow.deployment_managers.list_with_count(*args) - if ret[0] > 0: - logger.debug( - 'DeploymentManager {} skip for subscription {} because' - ' of the filter.' - .format(data.id, sub_data['subscriptionId'])) - filter_effect += 1 + obj_count, _ = uow.deployment_managers.list_with_count(*args) + if obj_count > 0: + filter_hit = True break - if filter_effect > 0: - continue - callback_smo(sub, data, dms_dict) + if filter_hit: + logger.info('Subscription {} filter hit, skip ' + 'DeploymentManager {}.' + .format(sub_data['subscriptionId'], data.id)) + else: + callback_smo(sub, data, dms_dict) def _notify_resource(uow, data): @@ -233,7 +228,7 @@ def _notify_resource(uow, data): if not filters: callback_smo(sub, data, res_dict) continue - filter_effect = 0 + filter_hit = False for filter in filters: try: args = gen_orm_filter(ocloud.Resource, filter) @@ -245,20 +240,19 @@ def _notify_resource(uow, data): sub_data['filter'])) continue if len(args) == 0 and 'objectType' in filter: - filter_effect += 1 + filter_hit = True break args.append(ocloud.Resource.resourceId == data.id) - ret = uow.resources.list_with_count(res_pool_id, *args) - if ret[0] > 0: - logger.debug( - 'Resource {} skip for subscription {} because of ' - 'the filter.' - .format(data.id, sub_data['subscriptionId'])) - filter_effect += 1 + obj_count, _ = uow.resources.list_with_count( + res_pool_id, *args) + if obj_count > 0: + filter_hit = True break - if filter_effect > 0: - continue - callback_smo(sub, data, res_dict) + if filter_hit: + logger.info('Subscription {} filter hit, skip Resource {}.' + .format(sub_data['subscriptionId'], data.id)) + else: + callback_smo(sub, data, res_dict) def handle_filter(filter: str, f_type: str): @@ -305,7 +299,7 @@ def callback_smo(sub: Subscription, msg: Message2SMO, obj_dict: dict = None): if msg.notificationEventType == NotificationEventEnum.DELETE: callback.pop('objectRef') callback_data = json.dumps(callback) - logger.info('URL: {}'.format(sub_data['callback'])) + logger.info('callback URL: {}'.format(sub_data['callback'])) logger.debug('callback data: {}'.format(callback_data)) # Call SMO through the SMO callback url diff --git a/o2ims/service/command/registration_handler.py b/o2ims/service/command/registration_handler.py index c144405..21cbbf8 100644 --- a/o2ims/service/command/registration_handler.py +++ b/o2ims/service/command/registration_handler.py @@ -12,44 +12,58 @@ # See the License for the specific language governing permissions and # limitations under the License. -# import time +import ssl import json -# import asyncio -# import requests - -from urllib.parse import urlparse from retry import retry +from urllib.parse import urlparse -from o2common.service.unit_of_work import AbstractUnitOfWork from o2common.config import config, conf -from o2common.service.command.handler import get_https_conn_default -from o2common.service.command.handler import get_http_conn -from o2common.service.command.handler import get_https_conn_selfsigned -from o2common.service.command.handler import post_data -import ssl -from o2ims.domain import commands -from o2ims.domain.subscription_obj import NotificationEventEnum +from o2common.domain.filter import gen_orm_filter +from o2common.service.unit_of_work import AbstractUnitOfWork +from o2common.service.command.handler import get_https_conn_default, \ + get_http_conn, get_https_conn_selfsigned, post_data + +from o2ims.domain import commands, ocloud as cloud +from o2ims.domain.subscription_obj import Message2SMO, NotificationEventEnum + +from .notify_handler import handle_filter, callback_smo from o2common.helper import o2logging logger = o2logging.get_logger(__name__) +apibase = config.get_o2ims_api_base() +api_monitoring_base = config.get_o2ims_monitoring_api_base() +inventory_api_version = config.get_o2ims_inventory_api_v1() + def registry_to_smo( cmd: commands.Register2SMO, uow: AbstractUnitOfWork, ): - logger.info('In registry_to_smo') + logger.debug('In registry_to_smo') data = cmd.data logger.info('The Register2SMO notificationEventType is {}'.format( data.notificationEventType)) with uow: ocloud = uow.oclouds.get(data.id) if ocloud is None: + logger.warning('Ocloud {} does not exists.'.format(data.id)) return logger.debug('O-Cloud Global UUID: {}'.format(ocloud.globalCloudId)) - ocloud_dict = ocloud.serialize() + # ocloud_dict = ocloud.serialize() + ocloud_dict = { + 'oCloudId': ocloud.oCloudId, + 'globalcloudId': ocloud.globalCloudId, + 'globalCloudId': ocloud.globalCloudId, + 'name': ocloud.name, + 'description': ocloud.description, + 'serviceUri': ocloud.serviceUri + } if data.notificationEventType == NotificationEventEnum.CREATE: register_smo(uow, ocloud_dict) + elif data.notificationEventType in [NotificationEventEnum.MODIFY, + NotificationEventEnum.DELETE]: + _notify_ocloud(uow, data, ocloud_dict) class RegIMSToSMOExp(Exception): @@ -67,16 +81,44 @@ def register_smo(uow, ocloud_data): # TODO: record the result for the smo register -# def retry(fun, max_tries=2): -# for i in range(max_tries): -# try: -# time.sleep(5*i) -# # await asyncio.sleep(5*i) -# res = fun() -# logger.debug('retry function result: {}'.format(res)) -# return res -# except Exception: -# continue +def _notify_ocloud(uow, data, ocloud_dict): + ref = api_monitoring_base + inventory_api_version + msg = Message2SMO( + eventtype=data.notificationEventType, id=data.id, + ref=ref, updatetime=data.updatetime) + ocloud_dict.pop('globalCloudId') + subs = uow.subscriptions.list() + for sub in subs: + sub_data = sub.serialize() + logger.debug('Subscription: {}'.format(sub_data['subscriptionId'])) + filters = handle_filter(sub_data['filter'], 'CloudInfo') + if not filters: + callback_smo(sub, msg, ocloud_dict) + continue + filter_hit = False + for filter in filters: + try: + args = gen_orm_filter(cloud.Ocloud, filter) + except KeyError: + logger.warning( + 'Subscription {} filter {} has wrong attribute ' + 'name or value. Ignore the filter.'.format( + sub_data['subscriptionId'], + sub_data['filter'])) + continue + if len(args) == 0 and 'objectType' in filter: + filter_hit = True + break + args.append(cloud.Ocloud.oCloudId == data.id) + ret = uow.oclouds.list(*args) + if ret.count() > 0: + filter_hit = True + break + if filter_hit: + logger.info('Subscription {} filter hit, skip oCloud {}.' + .format(sub_data['subscriptionId'], data.id)) + else: + callback_smo(sub, msg, ocloud_dict) @retry((ConnectionRefusedError), tries=2, delay=2) @@ -97,8 +139,8 @@ def call_smo(reg_data: dict): 'IMS_EP': config.get_api_url(), 'smo_token_data': smo_token_info }) - logger.info('URL: {}, data: {}'.format( - conf.DEFAULT.smo_register_url, callback_data)) + logger.info('callback URL: {}'.format(conf.DEFAULT.smo_register_url)) + logger.debug('callback data: {}'.format(callback_data)) o = urlparse(conf.DEFAULT.smo_register_url) if o.scheme == 'https': conn = get_https_conn_default(o.netloc) diff --git a/tests/unit/test_ocloud.py b/tests/unit/test_ocloud.py index 5a5ed2c..18c16cb 100644 --- a/tests/unit/test_ocloud.py +++ b/tests/unit/test_ocloud.py @@ -101,7 +101,8 @@ def test_view_olcouds(mock_uow): ocloud1 = MagicMock() ocloud1.serialize.return_value = { 'oCloudId': ocloud1_UUID, 'name': 'ocloud1'} - session.return_value.query.return_value = [ocloud1] + session.return_value.query.return_value.filter.return_value.\ + order_by.return_value = [ocloud1] ocloud_list = ocloud_view.oclouds(uow) # assert str(ocloud_list[0].get("oCloudId")) == ocloud1_UUID