Fix INF-371 inventoryChange notification of the resourceType, resourcePool, Dms 38/9938/1
authorZhang Rong(Jon) <rong.zhang@windriver.com>
Sat, 26 Nov 2022 15:49:54 +0000 (23:49 +0800)
committerJackie Huang <jackie.huang@windriver.com>
Mon, 5 Dec 2022 04:47:01 +0000 (12:47 +0800)
Issue-ID: INF-371
Signed-off-by: Zhang Rong(Jon) <rong.zhang@windriver.com>
Change-Id: I3b6fee5a433359503660b5d784af11e948fb4916

15 files changed:
o2app/entrypoints/redis_eventconsumer.py
o2app/service/handlers.py
o2ims/domain/commands.py
o2ims/domain/events.py
o2ims/domain/stx_object.py
o2ims/service/auditor/dms_handler.py
o2ims/service/auditor/pserver_handler.py
o2ims/service/auditor/resourcepool_handler.py
o2ims/service/command/notify_handler.py
o2ims/service/event/alarm_event.py
o2ims/service/event/dms_event.py [new file with mode: 0644]
o2ims/service/event/ocloud_event.py
o2ims/service/event/resource_event.py
o2ims/service/event/resource_pool_event.py
o2ims/service/event/resource_type_event.py [new file with mode: 0644]

index cc34099..1410f76 100644 (file)
@@ -39,8 +39,11 @@ def main():
     bus = bootstrap.bootstrap()
     pubsub = r.pubsub(ignore_subscribe_messages=True)
     pubsub.subscribe("NfDeploymentStateChanged")
-    pubsub.subscribe('ResourceChanged')
     pubsub.subscribe('OcloudChanged')
+    pubsub.subscribe('ResourceTypeChanged')
+    pubsub.subscribe('ResourcePoolChanged')
+    pubsub.subscribe('DmsChanged')
+    pubsub.subscribe('ResourceChanged')
     pubsub.subscribe('AlarmEventChanged')
 
     for m in pubsub.listen():
@@ -64,6 +67,42 @@ def handle_changed(m, bus):
             ToState=data['ToState']
         )
         bus.handle(cmd)
+    elif channel == 'ResourceTypeChanged':
+        datastr = m['data']
+        data = json.loads(datastr)
+        logger.info('ResourceTypeChanged with cmd:{}'.format(data))
+        ref = apibase + inventory_api_version + '/resourceTypes/' + \
+            data['id']
+        cmd = imscmd.PubMessage2SMO(data=Message2SMO(
+            id=data['id'], ref=ref,
+            eventtype=data['notificationEventType'],
+            updatetime=data['updatetime']),
+            type='ResourceType')
+        bus.handle(cmd)
+    elif channel == 'ResourcePoolChanged':
+        datastr = m['data']
+        data = json.loads(datastr)
+        logger.info('ResourcePoolChanged with cmd:{}'.format(data))
+        ref = apibase + inventory_api_version + '/resourcePools/' + \
+            data['id']
+        cmd = imscmd.PubMessage2SMO(data=Message2SMO(
+            id=data['id'], ref=ref,
+            eventtype=data['notificationEventType'],
+            updatetime=data['updatetime']),
+            type='ResourcePool')
+        bus.handle(cmd)
+    elif channel == 'DmsChanged':
+        datastr = m['data']
+        data = json.loads(datastr)
+        logger.info('ResourceChanged with cmd:{}'.format(data))
+        ref = apibase + inventory_api_version + '/deploymentManagers/' + \
+            data['id']
+        cmd = imscmd.PubMessage2SMO(data=Message2SMO(
+            id=data['id'], ref=ref,
+            eventtype=data['notificationEventType'],
+            updatetime=data['updatetime']),
+            type='Dms')
+        bus.handle(cmd)
     elif channel == 'ResourceChanged':
         datastr = m['data']
         data = json.loads(datastr)
@@ -73,7 +112,8 @@ def handle_changed(m, bus):
         cmd = imscmd.PubMessage2SMO(data=Message2SMO(
             id=data['id'], ref=ref,
             eventtype=data['notificationEventType'],
-            updatetime=data['updatetime']))
+            updatetime=data['updatetime']),
+            type='Resource')
         bus.handle(cmd)
     elif channel == 'OcloudChanged':
         datastr = m['data']
index 1754f79..9b4f49d 100644 (file)
@@ -32,7 +32,7 @@ from o2ims.service.auditor import ocloud_handler, dms_handler, \
 from o2ims.service.command import notify_handler, registration_handler,\
     notify_alarm_handler
 from o2ims.service.event import ocloud_event, resource_event, \
-    resource_pool_event, alarm_event
+    resource_pool_event, alarm_event, dms_event, resource_type_event
 
 # if TYPE_CHECKING:
 #     from . import unit_of_work
@@ -55,6 +55,9 @@ EVENT_HANDLERS = {
     # o2dms_events.NfDeploymentUninstalled: [
     #     nfdeployment_handler.publish_nfdeployment_uninstalled]
     events.OcloudChanged: [ocloud_event.notify_ocloud_update],
+    events.ResourceTypeChanged: [resource_type_event.\
+                                 notify_resourcetype_change],
+    events.DmsChanged: [dms_event.notify_dms_change],
     events.ResourceChanged: [resource_event.notify_resource_change],
     events.ResourcePoolChanged: [resource_pool_event.\
                                  notify_resourcepool_change],
index 4f86a6f..0caca37 100644 (file)
@@ -37,6 +37,7 @@ class UpdateFaultObject(Command):
 @dataclass
 class PubMessage2SMO(Command):
     data: Message2SMO
+    type: str
 
 
 @dataclass
index 19adab1..ede48d2 100644 (file)
@@ -31,6 +31,14 @@ class OcloudChanged(Event):
 @dataclass
 class ResourceTypeChanged(Event):
     id: str
+    notificationEventType: NotificationEventEnum
+    updatetime: datetime.now()
+
+
+@dataclass
+class DmsChanged(Event):
+    id: str
+    notificationEventType: NotificationEventEnum
     updatetime: datetime.now()
 
 
index c53b877..2eee7b7 100644 (file)
@@ -29,15 +29,20 @@ class StxGenericModel(AgRoot):
         if api_response:
             self.id = str(api_response.uuid)
             self.type = type
+            self.name = api_response.name
             self.updatetime = datetime.datetime.strptime(
                 api_response.updated_at.split('.')[0], "%Y-%m-%dT%H:%M:%S") \
                 if api_response.updated_at else None
             self.createtime = datetime.datetime.strptime(
                 api_response.created_at.split('.')[0], "%Y-%m-%dT%H:%M:%S") \
                 if api_response.created_at else None
-            self.name = api_response.name
-            self.hash = content_hash if content_hash \
-                else str(hash((self.id, self.updatetime)))
+            self.hash = content_hash
+            if not self.hash:
+                if hasattr(api_response, 'filtered'):
+                    self.filtered = api_response.filtered
+                    self.hash = str(hash((self.id, str(self.filtered))))
+                else:
+                    self.hash = str(hash((self.id, self.updatetime)))
             self.content = json.dumps(api_response.to_dict())
             if ResourceTypeEnum.RESOURCE_POOL == type:
                 self.res_pool_id = self.id
index 89811c6..070c929 100644 (file)
 
 # pylint: disable=unused-argument
 from __future__ import annotations
-
 import base64
 import json
 
+from o2ims.domain import commands, events
 from o2ims.domain.stx_object import StxGenericModel
-# from dataclasses import asdict
-# from typing import List, Dict, Callable, Type
-# TYPE_CHECKING
-from o2ims.domain import commands
+from o2ims.domain.subscription_obj import NotificationEventEnum
 from o2common.service.unit_of_work import AbstractUnitOfWork
 from o2ims.domain.resource_type import MismatchedModel
 from o2ims.domain.ocloud import DeploymentManager
@@ -94,6 +91,12 @@ def create_by(stxobj: StxGenericModel, parentid: str) -> DeploymentManager:
     localmodel.updatetime = stxobj.updatetime
     localmodel.hash = stxobj.hash
 
+    localmodel.events.append(events.DmsChanged(
+        id=stxobj.id,
+        notificationEventType=NotificationEventEnum.CREATE,
+        updatetime=stxobj.updatetime
+    ))
+
     return localmodel
 
 
@@ -108,7 +111,12 @@ def update_by(target: DeploymentManager, stxobj: StxGenericModel,
     target.oCloudId = parentid
     target.version_number = target.version_number + 1
     target.profile = _convert_content(stxobj.content)
-    target.events = []
+
+    target.events.append(events.DmsChanged(
+        id=stxobj.id,
+        notificationEventType=NotificationEventEnum.MODIFY,
+        updatetime=stxobj.updatetime
+    ))
 
 
 def _convert_content(stxobj_content: str):
index 1ab1189..f8ad40c 100644 (file)
@@ -67,6 +67,10 @@ def update_pserver(
             alarm_dictionary = uow.alarm_dictionaries.get(dict_id)
             if alarm_dictionary:
                 res_type.alarmDictionary = alarm_dictionary
+            res_type.events.append(events.ResourceTypeChanged(
+                id=res_type.resourceTypeId,
+                notificationEventType=NotificationEventEnum.CREATE,
+                updatetime=stxobj.updatetime))
             uow.resource_types.add(res_type)
         else:
             resourcetype_id = first['resourceTypeId']
@@ -116,7 +120,7 @@ def create_by(stxobj: StxGenericModel, parentid: str, resourcetype_id: str) \
         "operational", "availability", "administrative",
         "boot_device", "rootfs_device", "install_state", "subfunctions",
         "clock_synchronization", "max_cpu_mhz_allowed"
-        ]
+    ]
     filtered = dict(
         filter(lambda item: item[0] in selected_keys, content.items()))
     extensions = json.dumps(filtered)
@@ -128,6 +132,13 @@ def create_by(stxobj: StxGenericModel, parentid: str, resourcetype_id: str) \
     resource.updatetime = stxobj.updatetime
     resource.hash = stxobj.hash
 
+    resource.events.append(events.ResourceChanged(
+        id=stxobj.id,
+        resourcePoolId=resource.resourcePoolId,
+        notificationEventType=NotificationEventEnum.CREATE,
+        updatetime=stxobj.updatetime
+    ))
+
     return resource
 
 
index 1fb2458..366b77f 100644 (file)
@@ -84,7 +84,11 @@ def create_by(stxobj: StxGenericModel, parentid: str) -> ResourcePool:
     resourcepool.createtime = stxobj.createtime
     resourcepool.updatetime = stxobj.updatetime
     resourcepool.hash = stxobj.hash
-
+    resourcepool.events.append(events.ResourcePoolChanged(
+        id=stxobj.id,
+        notificationEventType=NotificationEventEnum.CREATE,
+        updatetime=stxobj.updatetime
+    ))
     return resourcepool
 
 
index 50c4099..08da45d 100644 (file)
@@ -27,7 +27,8 @@ from o2common.service.command.handler import get_https_conn_selfsigned
 from o2common.service.command.handler import post_data
 
 from o2ims.domain import commands, ocloud
-from o2ims.domain.subscription_obj import Subscription, Message2SMO
+from o2ims.domain.subscription_obj import Subscription, Message2SMO, \
+    NotificationEventEnum
 
 from o2common.helper import o2logging
 logger = o2logging.get_logger(__name__)
@@ -42,11 +43,158 @@ def notify_change_to_smo(
     uow: AbstractUnitOfWork,
 ):
     logger.debug('In notify_change_to_smo')
+    msg_type = cmd.type
+    if msg_type == 'ResourceType':
+        _notify_resourcetype(uow, cmd)
+    elif msg_type == 'ResourcePool':
+        _notify_resourcepool(uow, cmd)
+    elif msg_type == 'Dms':
+        _notify_dms(uow, cmd)
+    elif msg_type == 'Resource':
+        _notify_resource(uow, cmd)
+
+
+def _notify_resourcetype(uow, cmd):
+    data = cmd.data
+    msg_type = cmd.type
+    with uow:
+        resource_type = uow.resource_types.get(data.id)
+        if resource_type is None:
+            logger.warning('ResourceType {} does not exists.'.format(data.id))
+            return
+        resource_type_dict = {
+            'resourceTypeId': resource_type.resourceTypeId,
+            'name': resource_type.name,
+            'description': resource_type.description,
+            'vendor': resource_type.vendor,
+            'model': resource_type.model,
+            'version': resource_type.version,
+            # 'alarmDictionary': resource_type.alarmDictionary.serialize()
+        }
+
+        subs = uow.subscriptions.list()
+        for sub in subs:
+            sub_data = sub.serialize()
+            logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
+            if not sub_data.get('filter', None):
+                callback_smo(sub, data, msg_type, resource_type_dict)
+                continue
+            try:
+                args = gen_orm_filter(ocloud.ResourceType, sub_data['filter'])
+            except KeyError:
+                logger.warning(
+                    'Subscription {} filter {} has wrong attribute name '
+                    'or value. Ignore the filter.'.format(
+                        sub_data['subscriptionId'], sub_data['filter']))
+                callback_smo(sub, data, msg_type, resource_type_dict)
+                continue
+            args.append(ocloud.ResourceType.resourceTypeId == data.id)
+            ret = uow.resource_types.list_with_count(*args)
+            if ret[0] != 0:
+                logger.debug(
+                    'ResourceType {} skip for subscription {} because of the '
+                    'filter.'
+                    .format(data.id, sub_data['subscriptionId']))
+                continue
+            callback_smo(sub, data, msg_type, resource_type_dict)
+
+
+def _notify_resourcepool(uow, cmd):
+    data = cmd.data
+    msg_type = cmd.type
+    with uow:
+        resource_pool = uow.resource_pools.get(data.id)
+        if resource_pool is None:
+            logger.warning('ResourcePool {} does not exists.'.format(data.id))
+            return
+        resource_pool_dict = {
+            'resourcePoolId': resource_pool.resourcePoolId,
+            'oCloudId': resource_pool.oCloudId,
+            'globalLocationId': resource_pool.globalLocationId,
+            'name': resource_pool.name,
+            'description': resource_pool.description
+        }
+
+        subs = uow.subscriptions.list()
+        for sub in subs:
+            sub_data = sub.serialize()
+            logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
+            if not sub_data.get('filter', None):
+                callback_smo(sub, data, msg_type, resource_pool_dict)
+                continue
+            try:
+                args = gen_orm_filter(ocloud.Resource, sub_data['filter'])
+            except KeyError:
+                logger.warning(
+                    'Subscription {} filter {} has wrong attribute name '
+                    'or value. Ignore the filter.'.format(
+                        sub_data['subscriptionId'], sub_data['filter']))
+                callback_smo(sub, data, msg_type, resource_pool_dict)
+                continue
+            args.append(ocloud.ResourcePool.resourcePoolId == data.id)
+            ret = uow.resource_pools.list_with_count(*args)
+            if ret[0] != 0:
+                logger.debug(
+                    'ResourcePool {} skip for subscription {} because of the '
+                    'filter.'
+                    .format(data.id, sub_data['subscriptionId']))
+                continue
+            callback_smo(sub, data, msg_type, resource_pool_dict)
+
+
+def _notify_dms(uow, cmd):
     data = cmd.data
+    msg_type = cmd.type
+    with uow:
+        dms = uow.deployment_managers.get(data.id)
+        if dms is None:
+            logger.warning(
+                'DeploymentManager {} does not exists.'.format(data.id))
+            return
+        dms_dict = {
+            'deploymentManagerId': dms.deploymentManagerId,
+            'name': dms.name,
+            'description': dms.description,
+            'oCloudId': dms.oCloudId,
+            'serviceUri': dms.serviceUri
+        }
+
+        subs = uow.subscriptions.list()
+        for sub in subs:
+            sub_data = sub.serialize()
+            logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
+            if not sub_data.get('filter', None):
+                callback_smo(sub, data, msg_type, dms_dict)
+                continue
+            try:
+                args = gen_orm_filter(ocloud.Resource, sub_data['filter'])
+            except KeyError:
+                logger.warning(
+                    'Subscription {} filter {} has wrong attribute name '
+                    'or value. Ignore the filter.'.format(
+                        sub_data['subscriptionId'], sub_data['filter']))
+                callback_smo(sub, data, msg_type, dms_dict)
+                continue
+            args.append(
+                ocloud.DeploymentManager.deploymentManagerId == data.id)
+            ret = uow.deployment_managers.list_with_count(*args)
+            if ret[0] != 0:
+                logger.debug(
+                    'DeploymentManager {} skip for subscription {} because of '
+                    'the filter.'
+                    .format(data.id, sub_data['subscriptionId']))
+                continue
+            callback_smo(sub, data)
+            callback_smo(sub, data, msg_type, dms_dict)
+
+
+def _notify_resource(uow, cmd):
+    data = cmd.data
+    msg_type = cmd.type
     with uow:
         resource = uow.resources.get(data.id)
         if resource is None:
-            logger.debug('Resource {} does not exists.'.format(data.id))
+            logger.warning('Resource {} does not exists.'.format(data.id))
             return
         res_pool_id = resource.serialize()['resourcePoolId']
         logger.debug('res pool id is {}'.format(res_pool_id))
@@ -56,16 +204,16 @@ def notify_change_to_smo(
             sub_data = sub.serialize()
             logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
             if not sub_data.get('filter', None):
-                callback_smo(sub, data)
+                callback_smo(sub, data, msg_type)
                 continue
             try:
                 args = gen_orm_filter(ocloud.Resource, sub_data['filter'])
             except KeyError:
-                logger.error(
+                logger.warning(
                     'Subscription {} filter {} has wrong attribute name '
                     'or value. Ignore the filter.'.format(
                         sub_data['subscriptionId'], sub_data['filter']))
-                callback_smo(sub, data)
+                callback_smo(sub, data, msg_type)
                 continue
             args.append(ocloud.Resource.resourceId == data.id)
             ret = uow.resources.list_with_count(res_pool_id, *args)
@@ -75,10 +223,11 @@ def notify_change_to_smo(
                     'filter.'
                     .format(data.id, sub_data['subscriptionId']))
                 continue
-            callback_smo(sub, data)
+            callback_smo(sub, data, msg_type)
 
 
-def callback_smo(sub: Subscription, msg: Message2SMO):
+def callback_smo(sub: Subscription, msg: Message2SMO, msg_type: str,
+                 obj_dict: dict = None):
     sub_data = sub.serialize()
     callback = {
         'consumerSubscriptionId': sub_data['consumerSubscriptionId'],
@@ -86,13 +235,13 @@ def callback_smo(sub: Subscription, msg: Message2SMO):
         'objectRef': msg.objectRef,
         'updateTime': msg.updatetime
     }
-    # if msg.notificationEventType in [NotificationEventEnum.DELETE,
-    #                                  NotificationEventEnum.MODIFY]:
-    #     callback['priorObjectState'] = {}
-    # if msg.notificationEventType in [NotificationEventEnum.CREATE,
-    #                                  NotificationEventEnum.MODIFY]:
-    #     callback['postObjectState'] = {}
-    # logger.warning(callback)
+    if msg_type != 'Resource':
+        if msg.notificationEventType in [NotificationEventEnum.DELETE,
+                                         NotificationEventEnum.MODIFY]:
+            callback['priorObjectState'] = obj_dict
+        if msg.notificationEventType in [NotificationEventEnum.CREATE,
+                                         NotificationEventEnum.MODIFY]:
+            callback['postObjectState'] = obj_dict
     callback_data = json.dumps(callback)
     logger.info('URL: {}, data: {}'.format(
         sub_data['callback'], callback_data))
index a58c52b..5e80668 100644 (file)
@@ -24,7 +24,7 @@ def notify_alarm_event_change(
     event: events.AlarmEventChanged,
     publish: Callable,
 ):
-    logger.info('In notify_alarm_event_change')
+    logger.debug('In notify_alarm_event_change')
     publish("AlarmEventChanged", event)
     logger.debug("published Alarm Event Changed: {}".format(
         event.id))
diff --git a/o2ims/service/event/dms_event.py b/o2ims/service/event/dms_event.py
new file mode 100644 (file)
index 0000000..4506f22
--- /dev/null
@@ -0,0 +1,30 @@
+# Copyright (C) 2021 Wind River Systems, Inc.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+from typing import Callable
+
+from o2ims.domain import events
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+def notify_dms_change(
+    event: events.DmsChanged,
+    publish: Callable,
+):
+    logger.debug('In notify_dms_change')
+    publish("DmsChanged", event)
+    logger.debug("published DMS Changed: {}".format(
+        event.id))
index 6fb4563..f89a89e 100644 (file)
@@ -24,7 +24,7 @@ def notify_ocloud_update(
     event: events.OcloudChanged,
     publish: Callable,
 ):
-    logger.info('In notify_ocloud_update')
+    logger.debug('In notify_ocloud_update')
     publish("OcloudChanged", event)
     logger.debug("published Ocloud Changed: {}".format(
         event.id))
index 09241ca..e7fcb27 100644 (file)
@@ -24,7 +24,7 @@ def notify_resource_change(
     event: events.ResourceChanged,
     publish: Callable,
 ):
-    logger.info('In notify_resource_change')
+    logger.debug('In notify_resource_change')
     publish("ResourceChanged", event)
     logger.debug("published Resource Changed: {}".format(
         event.id))
index 07e6827..726307a 100644 (file)
@@ -24,7 +24,7 @@ def notify_resourcepool_change(
     event: events.ResourcePoolChanged,
     publish: Callable,
 ):
-    logger.info('In notify_resourcepool_change')
+    logger.debug('In notify_resourcepool_change')
     publish("ResourcePoolChanged", event)
     logger.debug("published Resource Pool Changed: {}".format(
         event.id))
diff --git a/o2ims/service/event/resource_type_event.py b/o2ims/service/event/resource_type_event.py
new file mode 100644 (file)
index 0000000..ad1037a
--- /dev/null
@@ -0,0 +1,30 @@
+# Copyright (C) 2021 Wind River Systems, Inc.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+from typing import Callable
+
+from o2ims.domain import events
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+def notify_resourcetype_change(
+    event: events.ResourceTypeChanged,
+    publish: Callable,
+):
+    logger.debug('In notify_resourcetype_change')
+    publish("ResourceTypeChanged", event)
+    logger.debug("published Resource Type Changed: {}".format(
+        event.id))