Fix INF-352 Add aggregate resource types 99/9799/1
authorZhang Rong(Jon) <rong.zhang@windriver.com>
Tue, 15 Nov 2022 03:53:36 +0000 (11:53 +0800)
committerJackie Huang <jackie.huang@windriver.com>
Fri, 25 Nov 2022 08:19:24 +0000 (16:19 +0800)
Issue-ID: INF-352
Signed-off-by: Zhang Rong(Jon) <rong.zhang@windriver.com>
Change-Id: I2e8b828b54d52e21a1325ae61825da574a2baeed

15 files changed:
o2app/entrypoints/resource_watcher.py
o2app/service/handlers.py
o2common/service/watcher/base.py
o2ims/adapter/clients/aggregate_client.py [new file with mode: 0644]
o2ims/domain/commands.py
o2ims/domain/resource_type.py
o2ims/service/auditor/agg_compute_handler.py [new file with mode: 0644]
o2ims/service/auditor/agg_network_handler.py [new file with mode: 0644]
o2ims/service/auditor/agg_storage_handler.py [new file with mode: 0644]
o2ims/service/auditor/agg_undefined_handler.py [new file with mode: 0644]
o2ims/service/watcher/agg_compute_watcher.py [new file with mode: 0644]
o2ims/service/watcher/agg_network_watcher.py [new file with mode: 0644]
o2ims/service/watcher/agg_storage_watcher.py [new file with mode: 0644]
o2ims/service/watcher/agg_undefined_watcher.py [new file with mode: 0644]
o2ims/service/watcher/aggregate_watcher.py [new file with mode: 0644]

index 909e86c..2e9e0c0 100644 (file)
@@ -51,6 +51,13 @@ from o2ims.adapter.clients.ocloud_client import StxEthClient
 from o2ims.service.watcher.pserver_acc_watcher import PServerAccWatcher
 from o2ims.adapter.clients.ocloud_client import StxAccClient
 
+from o2ims.service.watcher.agg_compute_watcher import ComputeAggWatcher
+from o2ims.service.watcher.agg_network_watcher import NetworkAggWatcher
+from o2ims.service.watcher.agg_storage_watcher import StorageAggWatcher
+from o2ims.service.watcher.agg_undefined_watcher import UndefinedAggWatcher
+from o2ims.adapter.clients.aggregate_client import ComputeAggClient, \
+    NetworkAggClient, StorageAggClient, UndefinedAggClient
+
 from o2common.helper import o2logging
 logger = o2logging.get_logger(__name__)
 
@@ -76,6 +83,15 @@ class WatcherService(cotyledon.Service):
             child_respool = root.addchild(
                 ResourcePoolWatcher(StxResourcePoolClient(),
                                     self.bus))
+            child_respool.addchild(
+                ComputeAggWatcher(ComputeAggClient(), self.bus))
+            child_respool.addchild(
+                NetworkAggWatcher(NetworkAggClient(), self.bus))
+            child_respool.addchild(
+                StorageAggWatcher(StorageAggClient(), self.bus))
+            child_respool.addchild(
+                UndefinedAggWatcher(UndefinedAggClient(), self.bus))
+
             child_pserver = child_respool.addchild(
                 PServerWatcher(StxPserverClient(), self.bus))
             child_pserver.addchild(
index ad75098..1754f79 100644 (file)
@@ -27,7 +27,8 @@ from o2ims.service.auditor import ocloud_handler, dms_handler, \
     resourcepool_handler, pserver_handler, pserver_cpu_handler, \
     pserver_mem_handler, pserver_port_handler, pserver_if_handler,\
     pserver_eth_handler, pserver_acc_handler, alarm_handler, \
-    pserver_dev_handler
+    pserver_dev_handler, agg_compute_handler, agg_network_handler,\
+    agg_storage_handler, agg_undefined_handler
 from o2ims.service.command import notify_handler, registration_handler,\
     notify_alarm_handler
 from o2ims.service.event import ocloud_event, resource_event, \
@@ -67,6 +68,11 @@ COMMAND_HANDLERS = {
     commands.UpdateDms: dms_handler.update_dms,
     commands.UpdateAlarm: alarm_handler.update_alarm,
     commands.UpdateResourcePool: resourcepool_handler.update_resourcepool,
+    commands.UpdateComputeAgg: agg_compute_handler.update_compute_aggregate,
+    commands.UpdateNetworkAgg: agg_network_handler.update_network_aggregate,
+    commands.UpdateStorageAgg: agg_storage_handler.update_storage_aggregate,
+    commands.UpdateUndefinedAgg:
+    agg_undefined_handler.update_undefined_aggregate,
     commands.UpdatePserver: pserver_handler.update_pserver,
     commands.UpdatePserverCpu: pserver_cpu_handler.update_pserver_cpu,
     commands.UpdatePserverMem: pserver_mem_handler.update_pserver_mem,
index 0fc7853..0e5bc3a 100644 (file)
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
+import traceback
 # from logging import exception
 # from cgtsclient import exc
+
 from o2common.service.client.base_client import BaseClient
 from o2common.domain import commands
 from o2common.service.messagebus import MessageBus
@@ -45,6 +47,7 @@ class BaseWatcher(object):
         except Exception as ex:
             logger.warning("Failed to probe %s watcher due to: %s - %s" %
                            (self._targetname(), type(ex), str(ex)))
+            logger.debug(traceback.format_exc())
             return []
 
     def _probe(self, parent: object = None, tags: object = None) \
diff --git a/o2ims/adapter/clients/aggregate_client.py b/o2ims/adapter/clients/aggregate_client.py
new file mode 100644 (file)
index 0000000..b46cf31
--- /dev/null
@@ -0,0 +1,140 @@
+# Copyright (C) 2022 Wind River Systems, Inc.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+import uuid
+from typing import List
+
+from o2common.service.client.base_client import BaseClient
+from o2ims.domain import stx_object as ocloudModel
+from o2ims.domain.resource_type import ResourceTypeEnum
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+class ComputeAggClient(BaseClient):
+    def __init__(self, driver=None):
+        super().__init__()
+        self.driver = AggClientImp()
+
+    def _get(self, id) -> ocloudModel.StxGenericModel:
+        return self.driver.getComputeList(res_pool=self._pool_id)[0]
+
+    def _list(self, **filters):
+        return self.driver.getComputeList(res_pool=self._pool_id)
+
+    def _set_stx_client(self):
+        pass
+
+
+class NetworkAggClient(BaseClient):
+    def __init__(self, driver=None):
+        super().__init__()
+        self.driver = AggClientImp()
+
+    def _get(self, id) -> ocloudModel.StxGenericModel:
+        return self.driver.getNetworkList(res_pool=self._pool_id)[0]
+
+    def _list(self, **filters):
+        return self.driver.getNetworkList(res_pool=self._pool_id)
+
+    def _set_stx_client(self):
+        pass
+
+
+class StorageAggClient(BaseClient):
+    def __init__(self, driver=None):
+        super().__init__()
+        self.driver = AggClientImp()
+
+    def _get(self, id) -> ocloudModel.StxGenericModel:
+        return self.driver.getStorageList(res_pool=self._pool_id)[0]
+
+    def _list(self, **filters):
+        return self.driver.getStorageList(res_pool=self._pool_id)
+
+    def _set_stx_client(self):
+        pass
+
+
+class UndefinedAggClient(BaseClient):
+    def __init__(self, driver=None):
+        super().__init__()
+        self.driver = AggClientImp()
+
+    def _get(self, id) -> ocloudModel.StxGenericModel:
+        return self.driver.getUndefinedList(res_pool=self._pool_id)[0]
+
+    def _list(self, **filters):
+        return self.driver.getUndefinedList(res_pool=self._pool_id)
+
+    def _set_stx_client(self):
+        pass
+
+
+class AggClientImp(object):
+    def __init__(self):
+        super().__init__()
+
+    def getComputeList(self, **filters) -> List[ocloudModel.StxGenericModel]:
+        compute = ComputeAggregate(filters['res_pool'])
+        return [ocloudModel.StxGenericModel(
+            ResourceTypeEnum.COMPUTE_AGGREGATE, compute)]
+
+    def getNetworkList(self, **filters) -> List[ocloudModel.StxGenericModel]:
+        network = NetworkAggregate(filters['res_pool'])
+        return [ocloudModel.StxGenericModel(
+            ResourceTypeEnum.NETWORK_AGGREGATE, network)]
+
+    def getStorageList(self, **filters) -> List[ocloudModel.StxGenericModel]:
+        storage = StorageAggregate(filters['res_pool'])
+        return [ocloudModel.StxGenericModel(
+            ResourceTypeEnum.STORAGE_AGGREGATE, storage)]
+
+    def getUndefinedList(self, **filters) -> List[ocloudModel.StxGenericModel]:
+        undefined = UndefinedAggregate(filters['res_pool'])
+        return [ocloudModel.StxGenericModel(
+            ResourceTypeEnum.UNDEFINED_AGGREGATE, undefined)]
+
+
+class Aggregate:
+    def __init__(self, res_pool_id: str, name: str) -> None:
+        self.name = name
+        setattr(self, 'name', self.name)
+        setattr(self, 'uuid',
+                uuid.uuid3(uuid.NAMESPACE_URL, res_pool_id + self.name))
+        setattr(self, 'updated_at', None)
+        setattr(self, 'created_at', None)
+
+    def to_dict(self):
+        return {}
+
+
+class ComputeAggregate(Aggregate):
+    def __init__(self, res_pool_id: str) -> None:
+        super().__init__(res_pool_id, 'compute_aggregate')
+
+
+class NetworkAggregate(Aggregate):
+    def __init__(self, res_pool_id: str) -> None:
+        super().__init__(res_pool_id, 'network_aggregate')
+
+
+class StorageAggregate(Aggregate):
+    def __init__(self, res_pool_id: str) -> None:
+        super().__init__(res_pool_id, 'storage_aggregate')
+
+
+class UndefinedAggregate(Aggregate):
+    def __init__(self, res_pool_id: str) -> None:
+        super().__init__(res_pool_id, 'undefined_aggregate')
index 5d58e4c..349188f 100644 (file)
@@ -74,6 +74,26 @@ class UpdateResource(UpdateStxObject):
     parentid: str
 
 
+@dataclass
+class UpdateComputeAgg(UpdateResource):
+    pass
+
+
+@dataclass
+class UpdateNetworkAgg(UpdateResource):
+    pass
+
+
+@dataclass
+class UpdateStorageAgg(UpdateResource):
+    pass
+
+
+@dataclass
+class UpdateUndefinedAgg(UpdateResource):
+    pass
+
+
 @dataclass
 class UpdatePserver(UpdateResource):
     pass
index 076d6ae..84b9285 100644 (file)
@@ -14,6 +14,10 @@ class ResourceTypeEnum(Enum):
     PSERVER_ETH = 16
     PSERVER_PCI_DEV = 17
     PSERVER_ACC = 18
+    COMPUTE_AGGREGATE = 70
+    NETWORK_AGGREGATE = 80
+    STORAGE_AGGREGATE = 90
+    UNDEFINED_AGGREGATE = 100
 
 
 class ResourceKindEnum(Enum):
diff --git a/o2ims/service/auditor/agg_compute_handler.py b/o2ims/service/auditor/agg_compute_handler.py
new file mode 100644 (file)
index 0000000..807f40b
--- /dev/null
@@ -0,0 +1,126 @@
+# Copyright (C) 2022 Wind River Systems, Inc.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+# pylint: disable=unused-argument
+from __future__ import annotations
+import uuid
+# import json
+from typing import Callable
+
+from o2ims.domain import commands, events
+from o2ims.domain.stx_object import StxGenericModel
+from o2ims.domain.subscription_obj import NotificationEventEnum
+from o2common.service.unit_of_work import AbstractUnitOfWork
+from o2ims.domain.resource_type import MismatchedModel
+from o2ims.domain.ocloud import Resource, ResourceType
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+class InvalidResourceType(Exception):
+    pass
+
+
+def update_compute_aggregate(
+    cmd: commands.UpdateComputeAgg,
+    uow: AbstractUnitOfWork,
+    publish: Callable
+):
+    stxobj = cmd.data
+    with uow:
+        res = uow.session.execute(
+            '''
+            SELECT "resourceTypeId", "oCloudId", "name"
+            FROM "resourceType"
+            WHERE "resourceTypeEnum" = :resource_type_enum
+            ''',
+            dict(resource_type_enum=stxobj.type.name)
+        )
+        first = res.first()
+        if first is None:
+            resourcepool = uow.resource_pools.get(cmd.parentid)
+            res_type_name = 'compute_aggregate'
+            resourcetype_id = str(uuid.uuid3(
+                uuid.NAMESPACE_URL, res_type_name))
+            uow.resource_types.add(ResourceType(
+                resourcetype_id,
+                res_type_name, stxobj.type,
+                resourcepool.oCloudId,
+                description='The compute Aggregate resource type'))
+        else:
+            resourcetype_id = first['resourceTypeId']
+
+        resource = uow.resources.get(stxobj.id)
+        if not resource:
+            logger.info("Add compute aggregate:" + stxobj.name
+                        + " update_at: " + str(stxobj.updatetime)
+                        + " id: " + str(stxobj.id)
+                        + " hash: " + str(stxobj.hash))
+            localmodel = create_by(stxobj, cmd.parentid, resourcetype_id)
+            uow.resources.add(localmodel)
+
+            logger.info("Add the compute aggregate: " + stxobj.id
+                        + ", name: " + stxobj.name)
+        else:
+            localmodel = resource
+            if is_outdated(localmodel, stxobj):
+                logger.info("update compute aggregate:" + stxobj.name
+                            + " update_at: " + str(stxobj.updatetime)
+                            + " id: " + str(stxobj.id)
+                            + " hash: " + str(stxobj.hash))
+                update_by(localmodel, stxobj, cmd.parentid)
+                uow.resources.update(localmodel)
+
+            logger.info("Update the compute aggregate: " + stxobj.id
+                        + ", name: " + stxobj.name)
+        uow.commit()
+
+
+def is_outdated(resource: Resource, stxobj: StxGenericModel):
+    return True if resource.hash != stxobj.hash else False
+
+
+def create_by(stxobj: StxGenericModel, parentid: str, resourcetype_id: str) \
+        -> Resource:
+    # content = json.loads(stxobj.content)
+    resourcetype_id = resourcetype_id
+    resourcepool_id = parentid
+    parent_id = None  # the root of the resource has no parent id
+    gAssetId = ''  # TODO: global ID
+    description = "%s : A Compute Aggregate server resource" % stxobj.name
+    resource = Resource(stxobj.id, resourcetype_id, resourcepool_id,
+                        stxobj.name, parent_id, gAssetId, stxobj.content,
+                        description)
+    resource.createtime = stxobj.createtime
+    resource.updatetime = stxobj.updatetime
+    resource.hash = stxobj.hash
+
+    return resource
+
+
+def update_by(target: Resource, stxobj: StxGenericModel,
+              parentid: str) -> None:
+    if target.resourceId != stxobj.id:
+        raise MismatchedModel("Mismatched Id")
+    target.createtime = stxobj.createtime
+    target.updatetime = stxobj.updatetime
+    target.hash = stxobj.hash
+    target.version_number = target.version_number + 1
+    target.events.append(events.ResourceChanged(
+        id=stxobj.id,
+        resourcePoolId=target.resourcePoolId,
+        notificationEventType=NotificationEventEnum.MODIFY,
+        updatetime=stxobj.updatetime
+    ))
diff --git a/o2ims/service/auditor/agg_network_handler.py b/o2ims/service/auditor/agg_network_handler.py
new file mode 100644 (file)
index 0000000..b4ab1a7
--- /dev/null
@@ -0,0 +1,126 @@
+# Copyright (C) 2022 Wind River Systems, Inc.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+# pylint: disable=unused-argument
+from __future__ import annotations
+import uuid
+# import json
+from typing import Callable
+
+from o2ims.domain import commands, events
+from o2ims.domain.stx_object import StxGenericModel
+from o2ims.domain.subscription_obj import NotificationEventEnum
+from o2common.service.unit_of_work import AbstractUnitOfWork
+from o2ims.domain.resource_type import MismatchedModel
+from o2ims.domain.ocloud import Resource, ResourceType
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+class InvalidResourceType(Exception):
+    pass
+
+
+def update_network_aggregate(
+    cmd: commands.UpdateNetworkAgg,
+    uow: AbstractUnitOfWork,
+    publish: Callable
+):
+    stxobj = cmd.data
+    with uow:
+        res = uow.session.execute(
+            '''
+            SELECT "resourceTypeId", "oCloudId", "name"
+            FROM "resourceType"
+            WHERE "resourceTypeEnum" = :resource_type_enum
+            ''',
+            dict(resource_type_enum=stxobj.type.name)
+        )
+        first = res.first()
+        if first is None:
+            resourcepool = uow.resource_pools.get(cmd.parentid)
+            res_type_name = 'network_aggregate'
+            resourcetype_id = str(uuid.uuid3(
+                uuid.NAMESPACE_URL, res_type_name))
+            uow.resource_types.add(ResourceType(
+                resourcetype_id,
+                res_type_name, stxobj.type,
+                resourcepool.oCloudId,
+                description='The network Aggregate resource type'))
+        else:
+            resourcetype_id = first['resourceTypeId']
+
+        resource = uow.resources.get(stxobj.id)
+        if not resource:
+            logger.info("Add network aggregate:" + stxobj.name
+                        + " update_at: " + str(stxobj.updatetime)
+                        + " id: " + str(stxobj.id)
+                        + " hash: " + str(stxobj.hash))
+            localmodel = create_by(stxobj, cmd.parentid, resourcetype_id)
+            uow.resources.add(localmodel)
+
+            logger.info("Add the network aggregate: " + stxobj.id
+                        + ", name: " + stxobj.name)
+        else:
+            localmodel = resource
+            if is_outdated(localmodel, stxobj):
+                logger.info("update network aggregate:" + stxobj.name
+                            + " update_at: " + str(stxobj.updatetime)
+                            + " id: " + str(stxobj.id)
+                            + " hash: " + str(stxobj.hash))
+                update_by(localmodel, stxobj, cmd.parentid)
+                uow.resources.update(localmodel)
+
+            logger.info("Update the network aggregate: " + stxobj.id
+                        + ", name: " + stxobj.name)
+        uow.commit()
+
+
+def is_outdated(resource: Resource, stxobj: StxGenericModel):
+    return True if resource.hash != stxobj.hash else False
+
+
+def create_by(stxobj: StxGenericModel, parentid: str, resourcetype_id: str) \
+        -> Resource:
+    # content = json.loads(stxobj.content)
+    resourcetype_id = resourcetype_id
+    resourcepool_id = parentid
+    parent_id = None  # the root of the resource has no parent id
+    gAssetId = ''  # TODO: global ID
+    description = "%s : A Network Aggregate server resource" % stxobj.name
+    resource = Resource(stxobj.id, resourcetype_id, resourcepool_id,
+                        stxobj.name, parent_id, gAssetId, stxobj.content,
+                        description)
+    resource.createtime = stxobj.createtime
+    resource.updatetime = stxobj.updatetime
+    resource.hash = stxobj.hash
+
+    return resource
+
+
+def update_by(target: Resource, stxobj: StxGenericModel,
+              parentid: str) -> None:
+    if target.resourceId != stxobj.id:
+        raise MismatchedModel("Mismatched Id")
+    target.createtime = stxobj.createtime
+    target.updatetime = stxobj.updatetime
+    target.hash = stxobj.hash
+    target.version_number = target.version_number + 1
+    target.events.append(events.ResourceChanged(
+        id=stxobj.id,
+        resourcePoolId=target.resourcePoolId,
+        notificationEventType=NotificationEventEnum.MODIFY,
+        updatetime=stxobj.updatetime
+    ))
diff --git a/o2ims/service/auditor/agg_storage_handler.py b/o2ims/service/auditor/agg_storage_handler.py
new file mode 100644 (file)
index 0000000..14ee152
--- /dev/null
@@ -0,0 +1,126 @@
+# Copyright (C) 2022 Wind River Systems, Inc.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+# pylint: disable=unused-argument
+from __future__ import annotations
+import uuid
+# import json
+from typing import Callable
+
+from o2ims.domain import commands, events
+from o2ims.domain.stx_object import StxGenericModel
+from o2ims.domain.subscription_obj import NotificationEventEnum
+from o2common.service.unit_of_work import AbstractUnitOfWork
+from o2ims.domain.resource_type import MismatchedModel
+from o2ims.domain.ocloud import Resource, ResourceType
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+class InvalidResourceType(Exception):
+    pass
+
+
+def update_storage_aggregate(
+    cmd: commands.UpdateStorageAgg,
+    uow: AbstractUnitOfWork,
+    publish: Callable
+):
+    stxobj = cmd.data
+    with uow:
+        res = uow.session.execute(
+            '''
+            SELECT "resourceTypeId", "oCloudId", "name"
+            FROM "resourceType"
+            WHERE "resourceTypeEnum" = :resource_type_enum
+            ''',
+            dict(resource_type_enum=stxobj.type.name)
+        )
+        first = res.first()
+        if first is None:
+            resourcepool = uow.resource_pools.get(cmd.parentid)
+            res_type_name = 'storage_aggregate'
+            resourcetype_id = str(uuid.uuid3(
+                uuid.NAMESPACE_URL, res_type_name))
+            uow.resource_types.add(ResourceType(
+                resourcetype_id,
+                res_type_name, stxobj.type,
+                resourcepool.oCloudId,
+                description='The storage Aggregate resource type'))
+        else:
+            resourcetype_id = first['resourceTypeId']
+
+        resource = uow.resources.get(stxobj.id)
+        if not resource:
+            logger.info("Add storage aggregate:" + stxobj.name
+                        + " update_at: " + str(stxobj.updatetime)
+                        + " id: " + str(stxobj.id)
+                        + " hash: " + str(stxobj.hash))
+            localmodel = create_by(stxobj, cmd.parentid, resourcetype_id)
+            uow.resources.add(localmodel)
+
+            logger.info("Add the storage aggregate: " + stxobj.id
+                        + ", name: " + stxobj.name)
+        else:
+            localmodel = resource
+            if is_outdated(localmodel, stxobj):
+                logger.info("update storage aggregate:" + stxobj.name
+                            + " update_at: " + str(stxobj.updatetime)
+                            + " id: " + str(stxobj.id)
+                            + " hash: " + str(stxobj.hash))
+                update_by(localmodel, stxobj, cmd.parentid)
+                uow.resources.update(localmodel)
+
+            logger.info("Update the storage aggregate: " + stxobj.id
+                        + ", name: " + stxobj.name)
+        uow.commit()
+
+
+def is_outdated(resource: Resource, stxobj: StxGenericModel):
+    return True if resource.hash != stxobj.hash else False
+
+
+def create_by(stxobj: StxGenericModel, parentid: str, resourcetype_id: str) \
+        -> Resource:
+    # content = json.loads(stxobj.content)
+    resourcetype_id = resourcetype_id
+    resourcepool_id = parentid
+    parent_id = None  # the root of the resource has no parent id
+    gAssetId = ''  # TODO: global ID
+    description = "%s : A Storage Aggregate server resource" % stxobj.name
+    resource = Resource(stxobj.id, resourcetype_id, resourcepool_id,
+                        stxobj.name, parent_id, gAssetId, stxobj.content,
+                        description)
+    resource.createtime = stxobj.createtime
+    resource.updatetime = stxobj.updatetime
+    resource.hash = stxobj.hash
+
+    return resource
+
+
+def update_by(target: Resource, stxobj: StxGenericModel,
+              parentid: str) -> None:
+    if target.resourceId != stxobj.id:
+        raise MismatchedModel("Mismatched Id")
+    target.createtime = stxobj.createtime
+    target.updatetime = stxobj.updatetime
+    target.hash = stxobj.hash
+    target.version_number = target.version_number + 1
+    target.events.append(events.ResourceChanged(
+        id=stxobj.id,
+        resourcePoolId=target.resourcePoolId,
+        notificationEventType=NotificationEventEnum.MODIFY,
+        updatetime=stxobj.updatetime
+    ))
diff --git a/o2ims/service/auditor/agg_undefined_handler.py b/o2ims/service/auditor/agg_undefined_handler.py
new file mode 100644 (file)
index 0000000..aa477a7
--- /dev/null
@@ -0,0 +1,126 @@
+# Copyright (C) 2022 Wind River Systems, Inc.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+# pylint: disable=unused-argument
+from __future__ import annotations
+import uuid
+# import json
+from typing import Callable
+
+from o2ims.domain import commands, events
+from o2ims.domain.stx_object import StxGenericModel
+from o2ims.domain.subscription_obj import NotificationEventEnum
+from o2common.service.unit_of_work import AbstractUnitOfWork
+from o2ims.domain.resource_type import MismatchedModel
+from o2ims.domain.ocloud import Resource, ResourceType
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+class InvalidResourceType(Exception):
+    pass
+
+
+def update_undefined_aggregate(
+    cmd: commands.UpdateUndefinedAgg,
+    uow: AbstractUnitOfWork,
+    publish: Callable
+):
+    stxobj = cmd.data
+    with uow:
+        res = uow.session.execute(
+            '''
+            SELECT "resourceTypeId", "oCloudId", "name"
+            FROM "resourceType"
+            WHERE "resourceTypeEnum" = :resource_type_enum
+            ''',
+            dict(resource_type_enum=stxobj.type.name)
+        )
+        first = res.first()
+        if first is None:
+            resourcepool = uow.resource_pools.get(cmd.parentid)
+            res_type_name = 'undefined_aggregate'
+            resourcetype_id = str(uuid.uuid3(
+                uuid.NAMESPACE_URL, res_type_name))
+            uow.resource_types.add(ResourceType(
+                resourcetype_id,
+                res_type_name, stxobj.type,
+                resourcepool.oCloudId,
+                description='The undefined Aggregate resource type'))
+        else:
+            resourcetype_id = first['resourceTypeId']
+
+        resource = uow.resources.get(stxobj.id)
+        if not resource:
+            logger.info("Add undefined aggregate:" + stxobj.name
+                        + " update_at: " + str(stxobj.updatetime)
+                        + " id: " + str(stxobj.id)
+                        + " hash: " + str(stxobj.hash))
+            localmodel = create_by(stxobj, cmd.parentid, resourcetype_id)
+            uow.resources.add(localmodel)
+
+            logger.info("Add the undefined aggregate: " + stxobj.id
+                        + ", name: " + stxobj.name)
+        else:
+            localmodel = resource
+            if is_outdated(localmodel, stxobj):
+                logger.info("update undefined aggregate:" + stxobj.name
+                            + " update_at: " + str(stxobj.updatetime)
+                            + " id: " + str(stxobj.id)
+                            + " hash: " + str(stxobj.hash))
+                update_by(localmodel, stxobj, cmd.parentid)
+                uow.resources.update(localmodel)
+
+            logger.info("Update the undefined aggregate: " + stxobj.id
+                        + ", name: " + stxobj.name)
+        uow.commit()
+
+
+def is_outdated(resource: Resource, stxobj: StxGenericModel):
+    return True if resource.hash != stxobj.hash else False
+
+
+def create_by(stxobj: StxGenericModel, parentid: str, resourcetype_id: str) \
+        -> Resource:
+    # content = json.loads(stxobj.content)
+    resourcetype_id = resourcetype_id
+    resourcepool_id = parentid
+    parent_id = None  # the root of the resource has no parent id
+    gAssetId = ''  # TODO: global ID
+    description = "%s : An Undefined Aggregate server resource" % stxobj.name
+    resource = Resource(stxobj.id, resourcetype_id, resourcepool_id,
+                        stxobj.name, parent_id, gAssetId, stxobj.content,
+                        description)
+    resource.createtime = stxobj.createtime
+    resource.updatetime = stxobj.updatetime
+    resource.hash = stxobj.hash
+
+    return resource
+
+
+def update_by(target: Resource, stxobj: StxGenericModel,
+              parentid: str) -> None:
+    if target.resourceId != stxobj.id:
+        raise MismatchedModel("Mismatched Id")
+    target.createtime = stxobj.createtime
+    target.updatetime = stxobj.updatetime
+    target.hash = stxobj.hash
+    target.version_number = target.version_number + 1
+    target.events.append(events.ResourceChanged(
+        id=stxobj.id,
+        resourcePoolId=target.resourcePoolId,
+        notificationEventType=NotificationEventEnum.MODIFY,
+        updatetime=stxobj.updatetime
+    ))
diff --git a/o2ims/service/watcher/agg_compute_watcher.py b/o2ims/service/watcher/agg_compute_watcher.py
new file mode 100644 (file)
index 0000000..9fd4d3a
--- /dev/null
@@ -0,0 +1,41 @@
+# Copyright (C) 2022 Wind River Systems, Inc.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+from o2ims.domain.stx_object import StxGenericModel
+from o2common.service.client.base_client import BaseClient
+from o2ims.service.watcher.aggregate_watcher import AggregateWatcher
+from o2ims.domain import commands
+from o2common.service.messagebus import MessageBus
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+class ComputeAggWatcher(AggregateWatcher):
+    def __init__(self, client: BaseClient,
+                 bus: MessageBus) -> None:
+        super().__init__(client, bus)
+
+    def _targetname(self):
+        return "compute_aggregate"
+
+    def _probe(self, parent: StxGenericModel, tags=None):
+        # Set a tag for children resource
+        self._tags.pool = parent.res_pool_id
+        self._set_respool_client()
+
+        resourcepoolid = parent.id
+        newmodels = self._client.list(resourcepoolid=resourcepoolid)
+        return [commands.UpdateComputeAgg(data=m, parentid=resourcepoolid)
+                for m in newmodels]
diff --git a/o2ims/service/watcher/agg_network_watcher.py b/o2ims/service/watcher/agg_network_watcher.py
new file mode 100644 (file)
index 0000000..b932e93
--- /dev/null
@@ -0,0 +1,41 @@
+# Copyright (C) 2022 Wind River Systems, Inc.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+from o2ims.domain.stx_object import StxGenericModel
+from o2common.service.client.base_client import BaseClient
+from o2ims.service.watcher.aggregate_watcher import AggregateWatcher
+from o2ims.domain import commands
+from o2common.service.messagebus import MessageBus
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+class NetworkAggWatcher(AggregateWatcher):
+    def __init__(self, client: BaseClient,
+                 bus: MessageBus) -> None:
+        super().__init__(client, bus)
+
+    def _targetname(self):
+        return "network_aggregate"
+
+    def _probe(self, parent: StxGenericModel, tags=None):
+        # Set a tag for children resource
+        self._tags.pool = parent.res_pool_id
+        self._set_respool_client()
+
+        resourcepoolid = parent.id
+        newmodels = self._client.list(resourcepoolid=resourcepoolid)
+        return [commands.UpdateNetworkAgg(data=m, parentid=resourcepoolid)
+                for m in newmodels]
diff --git a/o2ims/service/watcher/agg_storage_watcher.py b/o2ims/service/watcher/agg_storage_watcher.py
new file mode 100644 (file)
index 0000000..d648b21
--- /dev/null
@@ -0,0 +1,41 @@
+# Copyright (C) 2022 Wind River Systems, Inc.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+from o2ims.domain.stx_object import StxGenericModel
+from o2common.service.client.base_client import BaseClient
+from o2ims.service.watcher.aggregate_watcher import AggregateWatcher
+from o2ims.domain import commands
+from o2common.service.messagebus import MessageBus
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+class StorageAggWatcher(AggregateWatcher):
+    def __init__(self, client: BaseClient,
+                 bus: MessageBus) -> None:
+        super().__init__(client, bus)
+
+    def _targetname(self):
+        return "storage_aggregate"
+
+    def _probe(self, parent: StxGenericModel, tags=None):
+        # Set a tag for children resource
+        self._tags.pool = parent.res_pool_id
+        self._set_respool_client()
+
+        resourcepoolid = parent.id
+        newmodels = self._client.list(resourcepoolid=resourcepoolid)
+        return [commands.UpdateStorageAgg(data=m, parentid=resourcepoolid)
+                for m in newmodels]
diff --git a/o2ims/service/watcher/agg_undefined_watcher.py b/o2ims/service/watcher/agg_undefined_watcher.py
new file mode 100644 (file)
index 0000000..eda07af
--- /dev/null
@@ -0,0 +1,41 @@
+# Copyright (C) 2022 Wind River Systems, Inc.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+from o2ims.domain.stx_object import StxGenericModel
+from o2common.service.client.base_client import BaseClient
+from o2ims.service.watcher.aggregate_watcher import AggregateWatcher
+from o2ims.domain import commands
+from o2common.service.messagebus import MessageBus
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+class UndefinedAggWatcher(AggregateWatcher):
+    def __init__(self, client: BaseClient,
+                 bus: MessageBus) -> None:
+        super().__init__(client, bus)
+
+    def _targetname(self):
+        return "undefined_aggregate"
+
+    def _probe(self, parent: StxGenericModel, tags=None):
+        # Set a tag for children resource
+        self._tags.pool = parent.res_pool_id
+        self._set_respool_client()
+
+        resourcepoolid = parent.id
+        newmodels = self._client.list(resourcepoolid=resourcepoolid)
+        return [commands.UpdateUndefinedAgg(data=m, parentid=resourcepoolid)
+                for m in newmodels]
diff --git a/o2ims/service/watcher/aggregate_watcher.py b/o2ims/service/watcher/aggregate_watcher.py
new file mode 100644 (file)
index 0000000..34e64da
--- /dev/null
@@ -0,0 +1,45 @@
+# Copyright (C) 2022 Wind River Systems, Inc.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+from o2ims.domain.stx_object import StxGenericModel
+from o2common.service.client.base_client import BaseClient
+from o2common.service.watcher.base import BaseWatcher
+from o2ims.domain import commands
+from o2common.domain import tags
+from o2common.service.messagebus import MessageBus
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+class AggregateWatcher(BaseWatcher):
+    def __init__(self, client: BaseClient,
+                 bus: MessageBus) -> None:
+        super().__init__(client, bus)
+        self._tags = tags.Tag()
+        self.poolid = None
+
+    def _targetname(self):
+        return "aggregate"
+
+    def _probe(self, parent: StxGenericModel, tags: object = None):
+        resourcepoolid = parent.id
+        newmodels = self._client.list(resourcepoolid=resourcepoolid)
+        logger.warning(newmodels[0])
+        return [commands.UpdateAggregate(data=m, parentid=resourcepoolid)
+                for m in newmodels]
+
+    def _set_respool_client(self):
+        self.poolid = self._tags.pool
+        self._client.set_pool_driver(self.poolid)