From: Zhang Rong(Jon) Date: Sat, 26 Nov 2022 15:49:54 +0000 (+0800) Subject: Fix INF-371 inventoryChange notification of the resourceType, resourcePool, Dms X-Git-Tag: 2.0.0-rc4~7^2 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=209906139ba3d55342e5e95dd7cdb804c7b61943;p=pti%2Fo2.git Fix INF-371 inventoryChange notification of the resourceType, resourcePool, Dms Issue-ID: INF-371 Signed-off-by: Zhang Rong(Jon) Change-Id: I3b6fee5a433359503660b5d784af11e948fb4916 --- diff --git a/o2app/entrypoints/redis_eventconsumer.py b/o2app/entrypoints/redis_eventconsumer.py index cc34099..1410f76 100644 --- a/o2app/entrypoints/redis_eventconsumer.py +++ b/o2app/entrypoints/redis_eventconsumer.py @@ -39,8 +39,11 @@ def main(): bus = bootstrap.bootstrap() pubsub = r.pubsub(ignore_subscribe_messages=True) pubsub.subscribe("NfDeploymentStateChanged") - pubsub.subscribe('ResourceChanged') pubsub.subscribe('OcloudChanged') + pubsub.subscribe('ResourceTypeChanged') + pubsub.subscribe('ResourcePoolChanged') + pubsub.subscribe('DmsChanged') + pubsub.subscribe('ResourceChanged') pubsub.subscribe('AlarmEventChanged') for m in pubsub.listen(): @@ -64,6 +67,42 @@ def handle_changed(m, bus): ToState=data['ToState'] ) bus.handle(cmd) + elif channel == 'ResourceTypeChanged': + datastr = m['data'] + data = json.loads(datastr) + logger.info('ResourceTypeChanged with cmd:{}'.format(data)) + ref = apibase + inventory_api_version + '/resourceTypes/' + \ + data['id'] + cmd = imscmd.PubMessage2SMO(data=Message2SMO( + id=data['id'], ref=ref, + eventtype=data['notificationEventType'], + updatetime=data['updatetime']), + type='ResourceType') + bus.handle(cmd) + elif channel == 'ResourcePoolChanged': + datastr = m['data'] + data = json.loads(datastr) + logger.info('ResourcePoolChanged with cmd:{}'.format(data)) + ref = apibase + inventory_api_version + '/resourcePools/' + \ + data['id'] + cmd = imscmd.PubMessage2SMO(data=Message2SMO( + id=data['id'], ref=ref, + eventtype=data['notificationEventType'], + updatetime=data['updatetime']), + type='ResourcePool') + bus.handle(cmd) + elif channel == 'DmsChanged': + datastr = m['data'] + data = json.loads(datastr) + logger.info('ResourceChanged with cmd:{}'.format(data)) + ref = apibase + inventory_api_version + '/deploymentManagers/' + \ + data['id'] + cmd = imscmd.PubMessage2SMO(data=Message2SMO( + id=data['id'], ref=ref, + eventtype=data['notificationEventType'], + updatetime=data['updatetime']), + type='Dms') + bus.handle(cmd) elif channel == 'ResourceChanged': datastr = m['data'] data = json.loads(datastr) @@ -73,7 +112,8 @@ def handle_changed(m, bus): cmd = imscmd.PubMessage2SMO(data=Message2SMO( id=data['id'], ref=ref, eventtype=data['notificationEventType'], - updatetime=data['updatetime'])) + updatetime=data['updatetime']), + type='Resource') bus.handle(cmd) elif channel == 'OcloudChanged': datastr = m['data'] diff --git a/o2app/service/handlers.py b/o2app/service/handlers.py index 1754f79..9b4f49d 100644 --- a/o2app/service/handlers.py +++ b/o2app/service/handlers.py @@ -32,7 +32,7 @@ from o2ims.service.auditor import ocloud_handler, dms_handler, \ from o2ims.service.command import notify_handler, registration_handler,\ notify_alarm_handler from o2ims.service.event import ocloud_event, resource_event, \ - resource_pool_event, alarm_event + resource_pool_event, alarm_event, dms_event, resource_type_event # if TYPE_CHECKING: # from . import unit_of_work @@ -55,6 +55,9 @@ EVENT_HANDLERS = { # o2dms_events.NfDeploymentUninstalled: [ # nfdeployment_handler.publish_nfdeployment_uninstalled] events.OcloudChanged: [ocloud_event.notify_ocloud_update], + events.ResourceTypeChanged: [resource_type_event.\ + notify_resourcetype_change], + events.DmsChanged: [dms_event.notify_dms_change], events.ResourceChanged: [resource_event.notify_resource_change], events.ResourcePoolChanged: [resource_pool_event.\ notify_resourcepool_change], diff --git a/o2ims/domain/commands.py b/o2ims/domain/commands.py index 4f86a6f..0caca37 100644 --- a/o2ims/domain/commands.py +++ b/o2ims/domain/commands.py @@ -37,6 +37,7 @@ class UpdateFaultObject(Command): @dataclass class PubMessage2SMO(Command): data: Message2SMO + type: str @dataclass diff --git a/o2ims/domain/events.py b/o2ims/domain/events.py index 19adab1..ede48d2 100644 --- a/o2ims/domain/events.py +++ b/o2ims/domain/events.py @@ -31,6 +31,14 @@ class OcloudChanged(Event): @dataclass class ResourceTypeChanged(Event): id: str + notificationEventType: NotificationEventEnum + updatetime: datetime.now() + + +@dataclass +class DmsChanged(Event): + id: str + notificationEventType: NotificationEventEnum updatetime: datetime.now() diff --git a/o2ims/domain/stx_object.py b/o2ims/domain/stx_object.py index c53b877..2eee7b7 100644 --- a/o2ims/domain/stx_object.py +++ b/o2ims/domain/stx_object.py @@ -29,15 +29,20 @@ class StxGenericModel(AgRoot): if api_response: self.id = str(api_response.uuid) self.type = type + self.name = api_response.name self.updatetime = datetime.datetime.strptime( api_response.updated_at.split('.')[0], "%Y-%m-%dT%H:%M:%S") \ if api_response.updated_at else None self.createtime = datetime.datetime.strptime( api_response.created_at.split('.')[0], "%Y-%m-%dT%H:%M:%S") \ if api_response.created_at else None - self.name = api_response.name - self.hash = content_hash if content_hash \ - else str(hash((self.id, self.updatetime))) + self.hash = content_hash + if not self.hash: + if hasattr(api_response, 'filtered'): + self.filtered = api_response.filtered + self.hash = str(hash((self.id, str(self.filtered)))) + else: + self.hash = str(hash((self.id, self.updatetime))) self.content = json.dumps(api_response.to_dict()) if ResourceTypeEnum.RESOURCE_POOL == type: self.res_pool_id = self.id diff --git a/o2ims/service/auditor/dms_handler.py b/o2ims/service/auditor/dms_handler.py index 89811c6..070c929 100644 --- a/o2ims/service/auditor/dms_handler.py +++ b/o2ims/service/auditor/dms_handler.py @@ -14,15 +14,12 @@ # pylint: disable=unused-argument from __future__ import annotations - import base64 import json +from o2ims.domain import commands, events from o2ims.domain.stx_object import StxGenericModel -# from dataclasses import asdict -# from typing import List, Dict, Callable, Type -# TYPE_CHECKING -from o2ims.domain import commands +from o2ims.domain.subscription_obj import NotificationEventEnum from o2common.service.unit_of_work import AbstractUnitOfWork from o2ims.domain.resource_type import MismatchedModel from o2ims.domain.ocloud import DeploymentManager @@ -94,6 +91,12 @@ def create_by(stxobj: StxGenericModel, parentid: str) -> DeploymentManager: localmodel.updatetime = stxobj.updatetime localmodel.hash = stxobj.hash + localmodel.events.append(events.DmsChanged( + id=stxobj.id, + notificationEventType=NotificationEventEnum.CREATE, + updatetime=stxobj.updatetime + )) + return localmodel @@ -108,7 +111,12 @@ def update_by(target: DeploymentManager, stxobj: StxGenericModel, target.oCloudId = parentid target.version_number = target.version_number + 1 target.profile = _convert_content(stxobj.content) - target.events = [] + + target.events.append(events.DmsChanged( + id=stxobj.id, + notificationEventType=NotificationEventEnum.MODIFY, + updatetime=stxobj.updatetime + )) def _convert_content(stxobj_content: str): diff --git a/o2ims/service/auditor/pserver_handler.py b/o2ims/service/auditor/pserver_handler.py index 1ab1189..f8ad40c 100644 --- a/o2ims/service/auditor/pserver_handler.py +++ b/o2ims/service/auditor/pserver_handler.py @@ -67,6 +67,10 @@ def update_pserver( alarm_dictionary = uow.alarm_dictionaries.get(dict_id) if alarm_dictionary: res_type.alarmDictionary = alarm_dictionary + res_type.events.append(events.ResourceTypeChanged( + id=res_type.resourceTypeId, + notificationEventType=NotificationEventEnum.CREATE, + updatetime=stxobj.updatetime)) uow.resource_types.add(res_type) else: resourcetype_id = first['resourceTypeId'] @@ -116,7 +120,7 @@ def create_by(stxobj: StxGenericModel, parentid: str, resourcetype_id: str) \ "operational", "availability", "administrative", "boot_device", "rootfs_device", "install_state", "subfunctions", "clock_synchronization", "max_cpu_mhz_allowed" - ] + ] filtered = dict( filter(lambda item: item[0] in selected_keys, content.items())) extensions = json.dumps(filtered) @@ -128,6 +132,13 @@ def create_by(stxobj: StxGenericModel, parentid: str, resourcetype_id: str) \ resource.updatetime = stxobj.updatetime resource.hash = stxobj.hash + resource.events.append(events.ResourceChanged( + id=stxobj.id, + resourcePoolId=resource.resourcePoolId, + notificationEventType=NotificationEventEnum.CREATE, + updatetime=stxobj.updatetime + )) + return resource diff --git a/o2ims/service/auditor/resourcepool_handler.py b/o2ims/service/auditor/resourcepool_handler.py index 1fb2458..366b77f 100644 --- a/o2ims/service/auditor/resourcepool_handler.py +++ b/o2ims/service/auditor/resourcepool_handler.py @@ -84,7 +84,11 @@ def create_by(stxobj: StxGenericModel, parentid: str) -> ResourcePool: resourcepool.createtime = stxobj.createtime resourcepool.updatetime = stxobj.updatetime resourcepool.hash = stxobj.hash - + resourcepool.events.append(events.ResourcePoolChanged( + id=stxobj.id, + notificationEventType=NotificationEventEnum.CREATE, + updatetime=stxobj.updatetime + )) return resourcepool diff --git a/o2ims/service/command/notify_handler.py b/o2ims/service/command/notify_handler.py index 50c4099..08da45d 100644 --- a/o2ims/service/command/notify_handler.py +++ b/o2ims/service/command/notify_handler.py @@ -27,7 +27,8 @@ from o2common.service.command.handler import get_https_conn_selfsigned from o2common.service.command.handler import post_data from o2ims.domain import commands, ocloud -from o2ims.domain.subscription_obj import Subscription, Message2SMO +from o2ims.domain.subscription_obj import Subscription, Message2SMO, \ + NotificationEventEnum from o2common.helper import o2logging logger = o2logging.get_logger(__name__) @@ -42,11 +43,158 @@ def notify_change_to_smo( uow: AbstractUnitOfWork, ): logger.debug('In notify_change_to_smo') + msg_type = cmd.type + if msg_type == 'ResourceType': + _notify_resourcetype(uow, cmd) + elif msg_type == 'ResourcePool': + _notify_resourcepool(uow, cmd) + elif msg_type == 'Dms': + _notify_dms(uow, cmd) + elif msg_type == 'Resource': + _notify_resource(uow, cmd) + + +def _notify_resourcetype(uow, cmd): + data = cmd.data + msg_type = cmd.type + with uow: + resource_type = uow.resource_types.get(data.id) + if resource_type is None: + logger.warning('ResourceType {} does not exists.'.format(data.id)) + return + resource_type_dict = { + 'resourceTypeId': resource_type.resourceTypeId, + 'name': resource_type.name, + 'description': resource_type.description, + 'vendor': resource_type.vendor, + 'model': resource_type.model, + 'version': resource_type.version, + # 'alarmDictionary': resource_type.alarmDictionary.serialize() + } + + subs = uow.subscriptions.list() + for sub in subs: + sub_data = sub.serialize() + logger.debug('Subscription: {}'.format(sub_data['subscriptionId'])) + if not sub_data.get('filter', None): + callback_smo(sub, data, msg_type, resource_type_dict) + continue + try: + args = gen_orm_filter(ocloud.ResourceType, sub_data['filter']) + except KeyError: + logger.warning( + 'Subscription {} filter {} has wrong attribute name ' + 'or value. Ignore the filter.'.format( + sub_data['subscriptionId'], sub_data['filter'])) + callback_smo(sub, data, msg_type, resource_type_dict) + continue + args.append(ocloud.ResourceType.resourceTypeId == data.id) + ret = uow.resource_types.list_with_count(*args) + if ret[0] != 0: + logger.debug( + 'ResourceType {} skip for subscription {} because of the ' + 'filter.' + .format(data.id, sub_data['subscriptionId'])) + continue + callback_smo(sub, data, msg_type, resource_type_dict) + + +def _notify_resourcepool(uow, cmd): + data = cmd.data + msg_type = cmd.type + with uow: + resource_pool = uow.resource_pools.get(data.id) + if resource_pool is None: + logger.warning('ResourcePool {} does not exists.'.format(data.id)) + return + resource_pool_dict = { + 'resourcePoolId': resource_pool.resourcePoolId, + 'oCloudId': resource_pool.oCloudId, + 'globalLocationId': resource_pool.globalLocationId, + 'name': resource_pool.name, + 'description': resource_pool.description + } + + subs = uow.subscriptions.list() + for sub in subs: + sub_data = sub.serialize() + logger.debug('Subscription: {}'.format(sub_data['subscriptionId'])) + if not sub_data.get('filter', None): + callback_smo(sub, data, msg_type, resource_pool_dict) + continue + try: + args = gen_orm_filter(ocloud.Resource, sub_data['filter']) + except KeyError: + logger.warning( + 'Subscription {} filter {} has wrong attribute name ' + 'or value. Ignore the filter.'.format( + sub_data['subscriptionId'], sub_data['filter'])) + callback_smo(sub, data, msg_type, resource_pool_dict) + continue + args.append(ocloud.ResourcePool.resourcePoolId == data.id) + ret = uow.resource_pools.list_with_count(*args) + if ret[0] != 0: + logger.debug( + 'ResourcePool {} skip for subscription {} because of the ' + 'filter.' + .format(data.id, sub_data['subscriptionId'])) + continue + callback_smo(sub, data, msg_type, resource_pool_dict) + + +def _notify_dms(uow, cmd): data = cmd.data + msg_type = cmd.type + with uow: + dms = uow.deployment_managers.get(data.id) + if dms is None: + logger.warning( + 'DeploymentManager {} does not exists.'.format(data.id)) + return + dms_dict = { + 'deploymentManagerId': dms.deploymentManagerId, + 'name': dms.name, + 'description': dms.description, + 'oCloudId': dms.oCloudId, + 'serviceUri': dms.serviceUri + } + + subs = uow.subscriptions.list() + for sub in subs: + sub_data = sub.serialize() + logger.debug('Subscription: {}'.format(sub_data['subscriptionId'])) + if not sub_data.get('filter', None): + callback_smo(sub, data, msg_type, dms_dict) + continue + try: + args = gen_orm_filter(ocloud.Resource, sub_data['filter']) + except KeyError: + logger.warning( + 'Subscription {} filter {} has wrong attribute name ' + 'or value. Ignore the filter.'.format( + sub_data['subscriptionId'], sub_data['filter'])) + callback_smo(sub, data, msg_type, dms_dict) + continue + args.append( + ocloud.DeploymentManager.deploymentManagerId == data.id) + ret = uow.deployment_managers.list_with_count(*args) + if ret[0] != 0: + logger.debug( + 'DeploymentManager {} skip for subscription {} because of ' + 'the filter.' + .format(data.id, sub_data['subscriptionId'])) + continue + callback_smo(sub, data) + callback_smo(sub, data, msg_type, dms_dict) + + +def _notify_resource(uow, cmd): + data = cmd.data + msg_type = cmd.type with uow: resource = uow.resources.get(data.id) if resource is None: - logger.debug('Resource {} does not exists.'.format(data.id)) + logger.warning('Resource {} does not exists.'.format(data.id)) return res_pool_id = resource.serialize()['resourcePoolId'] logger.debug('res pool id is {}'.format(res_pool_id)) @@ -56,16 +204,16 @@ def notify_change_to_smo( sub_data = sub.serialize() logger.debug('Subscription: {}'.format(sub_data['subscriptionId'])) if not sub_data.get('filter', None): - callback_smo(sub, data) + callback_smo(sub, data, msg_type) continue try: args = gen_orm_filter(ocloud.Resource, sub_data['filter']) except KeyError: - logger.error( + logger.warning( 'Subscription {} filter {} has wrong attribute name ' 'or value. Ignore the filter.'.format( sub_data['subscriptionId'], sub_data['filter'])) - callback_smo(sub, data) + callback_smo(sub, data, msg_type) continue args.append(ocloud.Resource.resourceId == data.id) ret = uow.resources.list_with_count(res_pool_id, *args) @@ -75,10 +223,11 @@ def notify_change_to_smo( 'filter.' .format(data.id, sub_data['subscriptionId'])) continue - callback_smo(sub, data) + callback_smo(sub, data, msg_type) -def callback_smo(sub: Subscription, msg: Message2SMO): +def callback_smo(sub: Subscription, msg: Message2SMO, msg_type: str, + obj_dict: dict = None): sub_data = sub.serialize() callback = { 'consumerSubscriptionId': sub_data['consumerSubscriptionId'], @@ -86,13 +235,13 @@ def callback_smo(sub: Subscription, msg: Message2SMO): 'objectRef': msg.objectRef, 'updateTime': msg.updatetime } - # if msg.notificationEventType in [NotificationEventEnum.DELETE, - # NotificationEventEnum.MODIFY]: - # callback['priorObjectState'] = {} - # if msg.notificationEventType in [NotificationEventEnum.CREATE, - # NotificationEventEnum.MODIFY]: - # callback['postObjectState'] = {} - # logger.warning(callback) + if msg_type != 'Resource': + if msg.notificationEventType in [NotificationEventEnum.DELETE, + NotificationEventEnum.MODIFY]: + callback['priorObjectState'] = obj_dict + if msg.notificationEventType in [NotificationEventEnum.CREATE, + NotificationEventEnum.MODIFY]: + callback['postObjectState'] = obj_dict callback_data = json.dumps(callback) logger.info('URL: {}, data: {}'.format( sub_data['callback'], callback_data)) diff --git a/o2ims/service/event/alarm_event.py b/o2ims/service/event/alarm_event.py index a58c52b..5e80668 100644 --- a/o2ims/service/event/alarm_event.py +++ b/o2ims/service/event/alarm_event.py @@ -24,7 +24,7 @@ def notify_alarm_event_change( event: events.AlarmEventChanged, publish: Callable, ): - logger.info('In notify_alarm_event_change') + logger.debug('In notify_alarm_event_change') publish("AlarmEventChanged", event) logger.debug("published Alarm Event Changed: {}".format( event.id)) diff --git a/o2ims/service/event/dms_event.py b/o2ims/service/event/dms_event.py new file mode 100644 index 0000000..4506f22 --- /dev/null +++ b/o2ims/service/event/dms_event.py @@ -0,0 +1,30 @@ +# Copyright (C) 2021 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Callable + +from o2ims.domain import events + +from o2common.helper import o2logging +logger = o2logging.get_logger(__name__) + + +def notify_dms_change( + event: events.DmsChanged, + publish: Callable, +): + logger.debug('In notify_dms_change') + publish("DmsChanged", event) + logger.debug("published DMS Changed: {}".format( + event.id)) diff --git a/o2ims/service/event/ocloud_event.py b/o2ims/service/event/ocloud_event.py index 6fb4563..f89a89e 100644 --- a/o2ims/service/event/ocloud_event.py +++ b/o2ims/service/event/ocloud_event.py @@ -24,7 +24,7 @@ def notify_ocloud_update( event: events.OcloudChanged, publish: Callable, ): - logger.info('In notify_ocloud_update') + logger.debug('In notify_ocloud_update') publish("OcloudChanged", event) logger.debug("published Ocloud Changed: {}".format( event.id)) diff --git a/o2ims/service/event/resource_event.py b/o2ims/service/event/resource_event.py index 09241ca..e7fcb27 100644 --- a/o2ims/service/event/resource_event.py +++ b/o2ims/service/event/resource_event.py @@ -24,7 +24,7 @@ def notify_resource_change( event: events.ResourceChanged, publish: Callable, ): - logger.info('In notify_resource_change') + logger.debug('In notify_resource_change') publish("ResourceChanged", event) logger.debug("published Resource Changed: {}".format( event.id)) diff --git a/o2ims/service/event/resource_pool_event.py b/o2ims/service/event/resource_pool_event.py index 07e6827..726307a 100644 --- a/o2ims/service/event/resource_pool_event.py +++ b/o2ims/service/event/resource_pool_event.py @@ -24,7 +24,7 @@ def notify_resourcepool_change( event: events.ResourcePoolChanged, publish: Callable, ): - logger.info('In notify_resourcepool_change') + logger.debug('In notify_resourcepool_change') publish("ResourcePoolChanged", event) logger.debug("published Resource Pool Changed: {}".format( event.id)) diff --git a/o2ims/service/event/resource_type_event.py b/o2ims/service/event/resource_type_event.py new file mode 100644 index 0000000..ad1037a --- /dev/null +++ b/o2ims/service/event/resource_type_event.py @@ -0,0 +1,30 @@ +# Copyright (C) 2021 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Callable + +from o2ims.domain import events + +from o2common.helper import o2logging +logger = o2logging.get_logger(__name__) + + +def notify_resourcetype_change( + event: events.ResourceTypeChanged, + publish: Callable, +): + logger.debug('In notify_resourcetype_change') + publish("ResourceTypeChanged", event) + logger.debug("published Resource Type Changed: {}".format( + event.id))