From 8bfb3515ed5d58f117ea2d4527918269c65f23a0 Mon Sep 17 00:00:00 2001 From: "Zhang Rong(Jon)" Date: Tue, 15 Nov 2022 11:53:36 +0800 Subject: [PATCH] Fix INF-352 Add aggregate resource types Issue-ID: INF-352 Signed-off-by: Zhang Rong(Jon) Change-Id: I2e8b828b54d52e21a1325ae61825da574a2baeed --- o2app/entrypoints/resource_watcher.py | 16 +++ o2app/service/handlers.py | 8 +- o2common/service/watcher/base.py | 3 + o2ims/adapter/clients/aggregate_client.py | 140 +++++++++++++++++++++++++ o2ims/domain/commands.py | 20 ++++ o2ims/domain/resource_type.py | 4 + o2ims/service/auditor/agg_compute_handler.py | 126 ++++++++++++++++++++++ o2ims/service/auditor/agg_network_handler.py | 126 ++++++++++++++++++++++ o2ims/service/auditor/agg_storage_handler.py | 126 ++++++++++++++++++++++ o2ims/service/auditor/agg_undefined_handler.py | 126 ++++++++++++++++++++++ o2ims/service/watcher/agg_compute_watcher.py | 41 ++++++++ o2ims/service/watcher/agg_network_watcher.py | 41 ++++++++ o2ims/service/watcher/agg_storage_watcher.py | 41 ++++++++ o2ims/service/watcher/agg_undefined_watcher.py | 41 ++++++++ o2ims/service/watcher/aggregate_watcher.py | 45 ++++++++ 15 files changed, 903 insertions(+), 1 deletion(-) create mode 100644 o2ims/adapter/clients/aggregate_client.py create mode 100644 o2ims/service/auditor/agg_compute_handler.py create mode 100644 o2ims/service/auditor/agg_network_handler.py create mode 100644 o2ims/service/auditor/agg_storage_handler.py create mode 100644 o2ims/service/auditor/agg_undefined_handler.py create mode 100644 o2ims/service/watcher/agg_compute_watcher.py create mode 100644 o2ims/service/watcher/agg_network_watcher.py create mode 100644 o2ims/service/watcher/agg_storage_watcher.py create mode 100644 o2ims/service/watcher/agg_undefined_watcher.py create mode 100644 o2ims/service/watcher/aggregate_watcher.py diff --git a/o2app/entrypoints/resource_watcher.py b/o2app/entrypoints/resource_watcher.py index 909e86c..2e9e0c0 100644 --- a/o2app/entrypoints/resource_watcher.py +++ b/o2app/entrypoints/resource_watcher.py @@ -51,6 +51,13 @@ from o2ims.adapter.clients.ocloud_client import StxEthClient from o2ims.service.watcher.pserver_acc_watcher import PServerAccWatcher from o2ims.adapter.clients.ocloud_client import StxAccClient +from o2ims.service.watcher.agg_compute_watcher import ComputeAggWatcher +from o2ims.service.watcher.agg_network_watcher import NetworkAggWatcher +from o2ims.service.watcher.agg_storage_watcher import StorageAggWatcher +from o2ims.service.watcher.agg_undefined_watcher import UndefinedAggWatcher +from o2ims.adapter.clients.aggregate_client import ComputeAggClient, \ + NetworkAggClient, StorageAggClient, UndefinedAggClient + from o2common.helper import o2logging logger = o2logging.get_logger(__name__) @@ -76,6 +83,15 @@ class WatcherService(cotyledon.Service): child_respool = root.addchild( ResourcePoolWatcher(StxResourcePoolClient(), self.bus)) + child_respool.addchild( + ComputeAggWatcher(ComputeAggClient(), self.bus)) + child_respool.addchild( + NetworkAggWatcher(NetworkAggClient(), self.bus)) + child_respool.addchild( + StorageAggWatcher(StorageAggClient(), self.bus)) + child_respool.addchild( + UndefinedAggWatcher(UndefinedAggClient(), self.bus)) + child_pserver = child_respool.addchild( PServerWatcher(StxPserverClient(), self.bus)) child_pserver.addchild( diff --git a/o2app/service/handlers.py b/o2app/service/handlers.py index ad75098..1754f79 100644 --- a/o2app/service/handlers.py +++ b/o2app/service/handlers.py @@ -27,7 +27,8 @@ from o2ims.service.auditor import ocloud_handler, dms_handler, \ resourcepool_handler, pserver_handler, pserver_cpu_handler, \ pserver_mem_handler, pserver_port_handler, pserver_if_handler,\ pserver_eth_handler, pserver_acc_handler, alarm_handler, \ - pserver_dev_handler + pserver_dev_handler, agg_compute_handler, agg_network_handler,\ + agg_storage_handler, agg_undefined_handler from o2ims.service.command import notify_handler, registration_handler,\ notify_alarm_handler from o2ims.service.event import ocloud_event, resource_event, \ @@ -67,6 +68,11 @@ COMMAND_HANDLERS = { commands.UpdateDms: dms_handler.update_dms, commands.UpdateAlarm: alarm_handler.update_alarm, commands.UpdateResourcePool: resourcepool_handler.update_resourcepool, + commands.UpdateComputeAgg: agg_compute_handler.update_compute_aggregate, + commands.UpdateNetworkAgg: agg_network_handler.update_network_aggregate, + commands.UpdateStorageAgg: agg_storage_handler.update_storage_aggregate, + commands.UpdateUndefinedAgg: + agg_undefined_handler.update_undefined_aggregate, commands.UpdatePserver: pserver_handler.update_pserver, commands.UpdatePserverCpu: pserver_cpu_handler.update_pserver_cpu, commands.UpdatePserverMem: pserver_mem_handler.update_pserver_mem, diff --git a/o2common/service/watcher/base.py b/o2common/service/watcher/base.py index 0fc7853..0e5bc3a 100644 --- a/o2common/service/watcher/base.py +++ b/o2common/service/watcher/base.py @@ -12,8 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import traceback # from logging import exception # from cgtsclient import exc + from o2common.service.client.base_client import BaseClient from o2common.domain import commands from o2common.service.messagebus import MessageBus @@ -45,6 +47,7 @@ class BaseWatcher(object): except Exception as ex: logger.warning("Failed to probe %s watcher due to: %s - %s" % (self._targetname(), type(ex), str(ex))) + logger.debug(traceback.format_exc()) return [] def _probe(self, parent: object = None, tags: object = None) \ diff --git a/o2ims/adapter/clients/aggregate_client.py b/o2ims/adapter/clients/aggregate_client.py new file mode 100644 index 0000000..b46cf31 --- /dev/null +++ b/o2ims/adapter/clients/aggregate_client.py @@ -0,0 +1,140 @@ +# Copyright (C) 2022 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import uuid +from typing import List + +from o2common.service.client.base_client import BaseClient +from o2ims.domain import stx_object as ocloudModel +from o2ims.domain.resource_type import ResourceTypeEnum + +from o2common.helper import o2logging +logger = o2logging.get_logger(__name__) + + +class ComputeAggClient(BaseClient): + def __init__(self, driver=None): + super().__init__() + self.driver = AggClientImp() + + def _get(self, id) -> ocloudModel.StxGenericModel: + return self.driver.getComputeList(res_pool=self._pool_id)[0] + + def _list(self, **filters): + return self.driver.getComputeList(res_pool=self._pool_id) + + def _set_stx_client(self): + pass + + +class NetworkAggClient(BaseClient): + def __init__(self, driver=None): + super().__init__() + self.driver = AggClientImp() + + def _get(self, id) -> ocloudModel.StxGenericModel: + return self.driver.getNetworkList(res_pool=self._pool_id)[0] + + def _list(self, **filters): + return self.driver.getNetworkList(res_pool=self._pool_id) + + def _set_stx_client(self): + pass + + +class StorageAggClient(BaseClient): + def __init__(self, driver=None): + super().__init__() + self.driver = AggClientImp() + + def _get(self, id) -> ocloudModel.StxGenericModel: + return self.driver.getStorageList(res_pool=self._pool_id)[0] + + def _list(self, **filters): + return self.driver.getStorageList(res_pool=self._pool_id) + + def _set_stx_client(self): + pass + + +class UndefinedAggClient(BaseClient): + def __init__(self, driver=None): + super().__init__() + self.driver = AggClientImp() + + def _get(self, id) -> ocloudModel.StxGenericModel: + return self.driver.getUndefinedList(res_pool=self._pool_id)[0] + + def _list(self, **filters): + return self.driver.getUndefinedList(res_pool=self._pool_id) + + def _set_stx_client(self): + pass + + +class AggClientImp(object): + def __init__(self): + super().__init__() + + def getComputeList(self, **filters) -> List[ocloudModel.StxGenericModel]: + compute = ComputeAggregate(filters['res_pool']) + return [ocloudModel.StxGenericModel( + ResourceTypeEnum.COMPUTE_AGGREGATE, compute)] + + def getNetworkList(self, **filters) -> List[ocloudModel.StxGenericModel]: + network = NetworkAggregate(filters['res_pool']) + return [ocloudModel.StxGenericModel( + ResourceTypeEnum.NETWORK_AGGREGATE, network)] + + def getStorageList(self, **filters) -> List[ocloudModel.StxGenericModel]: + storage = StorageAggregate(filters['res_pool']) + return [ocloudModel.StxGenericModel( + ResourceTypeEnum.STORAGE_AGGREGATE, storage)] + + def getUndefinedList(self, **filters) -> List[ocloudModel.StxGenericModel]: + undefined = UndefinedAggregate(filters['res_pool']) + return [ocloudModel.StxGenericModel( + ResourceTypeEnum.UNDEFINED_AGGREGATE, undefined)] + + +class Aggregate: + def __init__(self, res_pool_id: str, name: str) -> None: + self.name = name + setattr(self, 'name', self.name) + setattr(self, 'uuid', + uuid.uuid3(uuid.NAMESPACE_URL, res_pool_id + self.name)) + setattr(self, 'updated_at', None) + setattr(self, 'created_at', None) + + def to_dict(self): + return {} + + +class ComputeAggregate(Aggregate): + def __init__(self, res_pool_id: str) -> None: + super().__init__(res_pool_id, 'compute_aggregate') + + +class NetworkAggregate(Aggregate): + def __init__(self, res_pool_id: str) -> None: + super().__init__(res_pool_id, 'network_aggregate') + + +class StorageAggregate(Aggregate): + def __init__(self, res_pool_id: str) -> None: + super().__init__(res_pool_id, 'storage_aggregate') + + +class UndefinedAggregate(Aggregate): + def __init__(self, res_pool_id: str) -> None: + super().__init__(res_pool_id, 'undefined_aggregate') diff --git a/o2ims/domain/commands.py b/o2ims/domain/commands.py index 5d58e4c..349188f 100644 --- a/o2ims/domain/commands.py +++ b/o2ims/domain/commands.py @@ -74,6 +74,26 @@ class UpdateResource(UpdateStxObject): parentid: str +@dataclass +class UpdateComputeAgg(UpdateResource): + pass + + +@dataclass +class UpdateNetworkAgg(UpdateResource): + pass + + +@dataclass +class UpdateStorageAgg(UpdateResource): + pass + + +@dataclass +class UpdateUndefinedAgg(UpdateResource): + pass + + @dataclass class UpdatePserver(UpdateResource): pass diff --git a/o2ims/domain/resource_type.py b/o2ims/domain/resource_type.py index 076d6ae..84b9285 100644 --- a/o2ims/domain/resource_type.py +++ b/o2ims/domain/resource_type.py @@ -14,6 +14,10 @@ class ResourceTypeEnum(Enum): PSERVER_ETH = 16 PSERVER_PCI_DEV = 17 PSERVER_ACC = 18 + COMPUTE_AGGREGATE = 70 + NETWORK_AGGREGATE = 80 + STORAGE_AGGREGATE = 90 + UNDEFINED_AGGREGATE = 100 class ResourceKindEnum(Enum): diff --git a/o2ims/service/auditor/agg_compute_handler.py b/o2ims/service/auditor/agg_compute_handler.py new file mode 100644 index 0000000..807f40b --- /dev/null +++ b/o2ims/service/auditor/agg_compute_handler.py @@ -0,0 +1,126 @@ +# Copyright (C) 2022 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=unused-argument +from __future__ import annotations +import uuid +# import json +from typing import Callable + +from o2ims.domain import commands, events +from o2ims.domain.stx_object import StxGenericModel +from o2ims.domain.subscription_obj import NotificationEventEnum +from o2common.service.unit_of_work import AbstractUnitOfWork +from o2ims.domain.resource_type import MismatchedModel +from o2ims.domain.ocloud import Resource, ResourceType + +from o2common.helper import o2logging +logger = o2logging.get_logger(__name__) + + +class InvalidResourceType(Exception): + pass + + +def update_compute_aggregate( + cmd: commands.UpdateComputeAgg, + uow: AbstractUnitOfWork, + publish: Callable +): + stxobj = cmd.data + with uow: + res = uow.session.execute( + ''' + SELECT "resourceTypeId", "oCloudId", "name" + FROM "resourceType" + WHERE "resourceTypeEnum" = :resource_type_enum + ''', + dict(resource_type_enum=stxobj.type.name) + ) + first = res.first() + if first is None: + resourcepool = uow.resource_pools.get(cmd.parentid) + res_type_name = 'compute_aggregate' + resourcetype_id = str(uuid.uuid3( + uuid.NAMESPACE_URL, res_type_name)) + uow.resource_types.add(ResourceType( + resourcetype_id, + res_type_name, stxobj.type, + resourcepool.oCloudId, + description='The compute Aggregate resource type')) + else: + resourcetype_id = first['resourceTypeId'] + + resource = uow.resources.get(stxobj.id) + if not resource: + logger.info("Add compute aggregate:" + stxobj.name + + " update_at: " + str(stxobj.updatetime) + + " id: " + str(stxobj.id) + + " hash: " + str(stxobj.hash)) + localmodel = create_by(stxobj, cmd.parentid, resourcetype_id) + uow.resources.add(localmodel) + + logger.info("Add the compute aggregate: " + stxobj.id + + ", name: " + stxobj.name) + else: + localmodel = resource + if is_outdated(localmodel, stxobj): + logger.info("update compute aggregate:" + stxobj.name + + " update_at: " + str(stxobj.updatetime) + + " id: " + str(stxobj.id) + + " hash: " + str(stxobj.hash)) + update_by(localmodel, stxobj, cmd.parentid) + uow.resources.update(localmodel) + + logger.info("Update the compute aggregate: " + stxobj.id + + ", name: " + stxobj.name) + uow.commit() + + +def is_outdated(resource: Resource, stxobj: StxGenericModel): + return True if resource.hash != stxobj.hash else False + + +def create_by(stxobj: StxGenericModel, parentid: str, resourcetype_id: str) \ + -> Resource: + # content = json.loads(stxobj.content) + resourcetype_id = resourcetype_id + resourcepool_id = parentid + parent_id = None # the root of the resource has no parent id + gAssetId = '' # TODO: global ID + description = "%s : A Compute Aggregate server resource" % stxobj.name + resource = Resource(stxobj.id, resourcetype_id, resourcepool_id, + stxobj.name, parent_id, gAssetId, stxobj.content, + description) + resource.createtime = stxobj.createtime + resource.updatetime = stxobj.updatetime + resource.hash = stxobj.hash + + return resource + + +def update_by(target: Resource, stxobj: StxGenericModel, + parentid: str) -> None: + if target.resourceId != stxobj.id: + raise MismatchedModel("Mismatched Id") + target.createtime = stxobj.createtime + target.updatetime = stxobj.updatetime + target.hash = stxobj.hash + target.version_number = target.version_number + 1 + target.events.append(events.ResourceChanged( + id=stxobj.id, + resourcePoolId=target.resourcePoolId, + notificationEventType=NotificationEventEnum.MODIFY, + updatetime=stxobj.updatetime + )) diff --git a/o2ims/service/auditor/agg_network_handler.py b/o2ims/service/auditor/agg_network_handler.py new file mode 100644 index 0000000..b4ab1a7 --- /dev/null +++ b/o2ims/service/auditor/agg_network_handler.py @@ -0,0 +1,126 @@ +# Copyright (C) 2022 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=unused-argument +from __future__ import annotations +import uuid +# import json +from typing import Callable + +from o2ims.domain import commands, events +from o2ims.domain.stx_object import StxGenericModel +from o2ims.domain.subscription_obj import NotificationEventEnum +from o2common.service.unit_of_work import AbstractUnitOfWork +from o2ims.domain.resource_type import MismatchedModel +from o2ims.domain.ocloud import Resource, ResourceType + +from o2common.helper import o2logging +logger = o2logging.get_logger(__name__) + + +class InvalidResourceType(Exception): + pass + + +def update_network_aggregate( + cmd: commands.UpdateNetworkAgg, + uow: AbstractUnitOfWork, + publish: Callable +): + stxobj = cmd.data + with uow: + res = uow.session.execute( + ''' + SELECT "resourceTypeId", "oCloudId", "name" + FROM "resourceType" + WHERE "resourceTypeEnum" = :resource_type_enum + ''', + dict(resource_type_enum=stxobj.type.name) + ) + first = res.first() + if first is None: + resourcepool = uow.resource_pools.get(cmd.parentid) + res_type_name = 'network_aggregate' + resourcetype_id = str(uuid.uuid3( + uuid.NAMESPACE_URL, res_type_name)) + uow.resource_types.add(ResourceType( + resourcetype_id, + res_type_name, stxobj.type, + resourcepool.oCloudId, + description='The network Aggregate resource type')) + else: + resourcetype_id = first['resourceTypeId'] + + resource = uow.resources.get(stxobj.id) + if not resource: + logger.info("Add network aggregate:" + stxobj.name + + " update_at: " + str(stxobj.updatetime) + + " id: " + str(stxobj.id) + + " hash: " + str(stxobj.hash)) + localmodel = create_by(stxobj, cmd.parentid, resourcetype_id) + uow.resources.add(localmodel) + + logger.info("Add the network aggregate: " + stxobj.id + + ", name: " + stxobj.name) + else: + localmodel = resource + if is_outdated(localmodel, stxobj): + logger.info("update network aggregate:" + stxobj.name + + " update_at: " + str(stxobj.updatetime) + + " id: " + str(stxobj.id) + + " hash: " + str(stxobj.hash)) + update_by(localmodel, stxobj, cmd.parentid) + uow.resources.update(localmodel) + + logger.info("Update the network aggregate: " + stxobj.id + + ", name: " + stxobj.name) + uow.commit() + + +def is_outdated(resource: Resource, stxobj: StxGenericModel): + return True if resource.hash != stxobj.hash else False + + +def create_by(stxobj: StxGenericModel, parentid: str, resourcetype_id: str) \ + -> Resource: + # content = json.loads(stxobj.content) + resourcetype_id = resourcetype_id + resourcepool_id = parentid + parent_id = None # the root of the resource has no parent id + gAssetId = '' # TODO: global ID + description = "%s : A Network Aggregate server resource" % stxobj.name + resource = Resource(stxobj.id, resourcetype_id, resourcepool_id, + stxobj.name, parent_id, gAssetId, stxobj.content, + description) + resource.createtime = stxobj.createtime + resource.updatetime = stxobj.updatetime + resource.hash = stxobj.hash + + return resource + + +def update_by(target: Resource, stxobj: StxGenericModel, + parentid: str) -> None: + if target.resourceId != stxobj.id: + raise MismatchedModel("Mismatched Id") + target.createtime = stxobj.createtime + target.updatetime = stxobj.updatetime + target.hash = stxobj.hash + target.version_number = target.version_number + 1 + target.events.append(events.ResourceChanged( + id=stxobj.id, + resourcePoolId=target.resourcePoolId, + notificationEventType=NotificationEventEnum.MODIFY, + updatetime=stxobj.updatetime + )) diff --git a/o2ims/service/auditor/agg_storage_handler.py b/o2ims/service/auditor/agg_storage_handler.py new file mode 100644 index 0000000..14ee152 --- /dev/null +++ b/o2ims/service/auditor/agg_storage_handler.py @@ -0,0 +1,126 @@ +# Copyright (C) 2022 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=unused-argument +from __future__ import annotations +import uuid +# import json +from typing import Callable + +from o2ims.domain import commands, events +from o2ims.domain.stx_object import StxGenericModel +from o2ims.domain.subscription_obj import NotificationEventEnum +from o2common.service.unit_of_work import AbstractUnitOfWork +from o2ims.domain.resource_type import MismatchedModel +from o2ims.domain.ocloud import Resource, ResourceType + +from o2common.helper import o2logging +logger = o2logging.get_logger(__name__) + + +class InvalidResourceType(Exception): + pass + + +def update_storage_aggregate( + cmd: commands.UpdateStorageAgg, + uow: AbstractUnitOfWork, + publish: Callable +): + stxobj = cmd.data + with uow: + res = uow.session.execute( + ''' + SELECT "resourceTypeId", "oCloudId", "name" + FROM "resourceType" + WHERE "resourceTypeEnum" = :resource_type_enum + ''', + dict(resource_type_enum=stxobj.type.name) + ) + first = res.first() + if first is None: + resourcepool = uow.resource_pools.get(cmd.parentid) + res_type_name = 'storage_aggregate' + resourcetype_id = str(uuid.uuid3( + uuid.NAMESPACE_URL, res_type_name)) + uow.resource_types.add(ResourceType( + resourcetype_id, + res_type_name, stxobj.type, + resourcepool.oCloudId, + description='The storage Aggregate resource type')) + else: + resourcetype_id = first['resourceTypeId'] + + resource = uow.resources.get(stxobj.id) + if not resource: + logger.info("Add storage aggregate:" + stxobj.name + + " update_at: " + str(stxobj.updatetime) + + " id: " + str(stxobj.id) + + " hash: " + str(stxobj.hash)) + localmodel = create_by(stxobj, cmd.parentid, resourcetype_id) + uow.resources.add(localmodel) + + logger.info("Add the storage aggregate: " + stxobj.id + + ", name: " + stxobj.name) + else: + localmodel = resource + if is_outdated(localmodel, stxobj): + logger.info("update storage aggregate:" + stxobj.name + + " update_at: " + str(stxobj.updatetime) + + " id: " + str(stxobj.id) + + " hash: " + str(stxobj.hash)) + update_by(localmodel, stxobj, cmd.parentid) + uow.resources.update(localmodel) + + logger.info("Update the storage aggregate: " + stxobj.id + + ", name: " + stxobj.name) + uow.commit() + + +def is_outdated(resource: Resource, stxobj: StxGenericModel): + return True if resource.hash != stxobj.hash else False + + +def create_by(stxobj: StxGenericModel, parentid: str, resourcetype_id: str) \ + -> Resource: + # content = json.loads(stxobj.content) + resourcetype_id = resourcetype_id + resourcepool_id = parentid + parent_id = None # the root of the resource has no parent id + gAssetId = '' # TODO: global ID + description = "%s : A Storage Aggregate server resource" % stxobj.name + resource = Resource(stxobj.id, resourcetype_id, resourcepool_id, + stxobj.name, parent_id, gAssetId, stxobj.content, + description) + resource.createtime = stxobj.createtime + resource.updatetime = stxobj.updatetime + resource.hash = stxobj.hash + + return resource + + +def update_by(target: Resource, stxobj: StxGenericModel, + parentid: str) -> None: + if target.resourceId != stxobj.id: + raise MismatchedModel("Mismatched Id") + target.createtime = stxobj.createtime + target.updatetime = stxobj.updatetime + target.hash = stxobj.hash + target.version_number = target.version_number + 1 + target.events.append(events.ResourceChanged( + id=stxobj.id, + resourcePoolId=target.resourcePoolId, + notificationEventType=NotificationEventEnum.MODIFY, + updatetime=stxobj.updatetime + )) diff --git a/o2ims/service/auditor/agg_undefined_handler.py b/o2ims/service/auditor/agg_undefined_handler.py new file mode 100644 index 0000000..aa477a7 --- /dev/null +++ b/o2ims/service/auditor/agg_undefined_handler.py @@ -0,0 +1,126 @@ +# Copyright (C) 2022 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=unused-argument +from __future__ import annotations +import uuid +# import json +from typing import Callable + +from o2ims.domain import commands, events +from o2ims.domain.stx_object import StxGenericModel +from o2ims.domain.subscription_obj import NotificationEventEnum +from o2common.service.unit_of_work import AbstractUnitOfWork +from o2ims.domain.resource_type import MismatchedModel +from o2ims.domain.ocloud import Resource, ResourceType + +from o2common.helper import o2logging +logger = o2logging.get_logger(__name__) + + +class InvalidResourceType(Exception): + pass + + +def update_undefined_aggregate( + cmd: commands.UpdateUndefinedAgg, + uow: AbstractUnitOfWork, + publish: Callable +): + stxobj = cmd.data + with uow: + res = uow.session.execute( + ''' + SELECT "resourceTypeId", "oCloudId", "name" + FROM "resourceType" + WHERE "resourceTypeEnum" = :resource_type_enum + ''', + dict(resource_type_enum=stxobj.type.name) + ) + first = res.first() + if first is None: + resourcepool = uow.resource_pools.get(cmd.parentid) + res_type_name = 'undefined_aggregate' + resourcetype_id = str(uuid.uuid3( + uuid.NAMESPACE_URL, res_type_name)) + uow.resource_types.add(ResourceType( + resourcetype_id, + res_type_name, stxobj.type, + resourcepool.oCloudId, + description='The undefined Aggregate resource type')) + else: + resourcetype_id = first['resourceTypeId'] + + resource = uow.resources.get(stxobj.id) + if not resource: + logger.info("Add undefined aggregate:" + stxobj.name + + " update_at: " + str(stxobj.updatetime) + + " id: " + str(stxobj.id) + + " hash: " + str(stxobj.hash)) + localmodel = create_by(stxobj, cmd.parentid, resourcetype_id) + uow.resources.add(localmodel) + + logger.info("Add the undefined aggregate: " + stxobj.id + + ", name: " + stxobj.name) + else: + localmodel = resource + if is_outdated(localmodel, stxobj): + logger.info("update undefined aggregate:" + stxobj.name + + " update_at: " + str(stxobj.updatetime) + + " id: " + str(stxobj.id) + + " hash: " + str(stxobj.hash)) + update_by(localmodel, stxobj, cmd.parentid) + uow.resources.update(localmodel) + + logger.info("Update the undefined aggregate: " + stxobj.id + + ", name: " + stxobj.name) + uow.commit() + + +def is_outdated(resource: Resource, stxobj: StxGenericModel): + return True if resource.hash != stxobj.hash else False + + +def create_by(stxobj: StxGenericModel, parentid: str, resourcetype_id: str) \ + -> Resource: + # content = json.loads(stxobj.content) + resourcetype_id = resourcetype_id + resourcepool_id = parentid + parent_id = None # the root of the resource has no parent id + gAssetId = '' # TODO: global ID + description = "%s : An Undefined Aggregate server resource" % stxobj.name + resource = Resource(stxobj.id, resourcetype_id, resourcepool_id, + stxobj.name, parent_id, gAssetId, stxobj.content, + description) + resource.createtime = stxobj.createtime + resource.updatetime = stxobj.updatetime + resource.hash = stxobj.hash + + return resource + + +def update_by(target: Resource, stxobj: StxGenericModel, + parentid: str) -> None: + if target.resourceId != stxobj.id: + raise MismatchedModel("Mismatched Id") + target.createtime = stxobj.createtime + target.updatetime = stxobj.updatetime + target.hash = stxobj.hash + target.version_number = target.version_number + 1 + target.events.append(events.ResourceChanged( + id=stxobj.id, + resourcePoolId=target.resourcePoolId, + notificationEventType=NotificationEventEnum.MODIFY, + updatetime=stxobj.updatetime + )) diff --git a/o2ims/service/watcher/agg_compute_watcher.py b/o2ims/service/watcher/agg_compute_watcher.py new file mode 100644 index 0000000..9fd4d3a --- /dev/null +++ b/o2ims/service/watcher/agg_compute_watcher.py @@ -0,0 +1,41 @@ +# Copyright (C) 2022 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from o2ims.domain.stx_object import StxGenericModel +from o2common.service.client.base_client import BaseClient +from o2ims.service.watcher.aggregate_watcher import AggregateWatcher +from o2ims.domain import commands +from o2common.service.messagebus import MessageBus + +from o2common.helper import o2logging +logger = o2logging.get_logger(__name__) + + +class ComputeAggWatcher(AggregateWatcher): + def __init__(self, client: BaseClient, + bus: MessageBus) -> None: + super().__init__(client, bus) + + def _targetname(self): + return "compute_aggregate" + + def _probe(self, parent: StxGenericModel, tags=None): + # Set a tag for children resource + self._tags.pool = parent.res_pool_id + self._set_respool_client() + + resourcepoolid = parent.id + newmodels = self._client.list(resourcepoolid=resourcepoolid) + return [commands.UpdateComputeAgg(data=m, parentid=resourcepoolid) + for m in newmodels] diff --git a/o2ims/service/watcher/agg_network_watcher.py b/o2ims/service/watcher/agg_network_watcher.py new file mode 100644 index 0000000..b932e93 --- /dev/null +++ b/o2ims/service/watcher/agg_network_watcher.py @@ -0,0 +1,41 @@ +# Copyright (C) 2022 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from o2ims.domain.stx_object import StxGenericModel +from o2common.service.client.base_client import BaseClient +from o2ims.service.watcher.aggregate_watcher import AggregateWatcher +from o2ims.domain import commands +from o2common.service.messagebus import MessageBus + +from o2common.helper import o2logging +logger = o2logging.get_logger(__name__) + + +class NetworkAggWatcher(AggregateWatcher): + def __init__(self, client: BaseClient, + bus: MessageBus) -> None: + super().__init__(client, bus) + + def _targetname(self): + return "network_aggregate" + + def _probe(self, parent: StxGenericModel, tags=None): + # Set a tag for children resource + self._tags.pool = parent.res_pool_id + self._set_respool_client() + + resourcepoolid = parent.id + newmodels = self._client.list(resourcepoolid=resourcepoolid) + return [commands.UpdateNetworkAgg(data=m, parentid=resourcepoolid) + for m in newmodels] diff --git a/o2ims/service/watcher/agg_storage_watcher.py b/o2ims/service/watcher/agg_storage_watcher.py new file mode 100644 index 0000000..d648b21 --- /dev/null +++ b/o2ims/service/watcher/agg_storage_watcher.py @@ -0,0 +1,41 @@ +# Copyright (C) 2022 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from o2ims.domain.stx_object import StxGenericModel +from o2common.service.client.base_client import BaseClient +from o2ims.service.watcher.aggregate_watcher import AggregateWatcher +from o2ims.domain import commands +from o2common.service.messagebus import MessageBus + +from o2common.helper import o2logging +logger = o2logging.get_logger(__name__) + + +class StorageAggWatcher(AggregateWatcher): + def __init__(self, client: BaseClient, + bus: MessageBus) -> None: + super().__init__(client, bus) + + def _targetname(self): + return "storage_aggregate" + + def _probe(self, parent: StxGenericModel, tags=None): + # Set a tag for children resource + self._tags.pool = parent.res_pool_id + self._set_respool_client() + + resourcepoolid = parent.id + newmodels = self._client.list(resourcepoolid=resourcepoolid) + return [commands.UpdateStorageAgg(data=m, parentid=resourcepoolid) + for m in newmodels] diff --git a/o2ims/service/watcher/agg_undefined_watcher.py b/o2ims/service/watcher/agg_undefined_watcher.py new file mode 100644 index 0000000..eda07af --- /dev/null +++ b/o2ims/service/watcher/agg_undefined_watcher.py @@ -0,0 +1,41 @@ +# Copyright (C) 2022 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from o2ims.domain.stx_object import StxGenericModel +from o2common.service.client.base_client import BaseClient +from o2ims.service.watcher.aggregate_watcher import AggregateWatcher +from o2ims.domain import commands +from o2common.service.messagebus import MessageBus + +from o2common.helper import o2logging +logger = o2logging.get_logger(__name__) + + +class UndefinedAggWatcher(AggregateWatcher): + def __init__(self, client: BaseClient, + bus: MessageBus) -> None: + super().__init__(client, bus) + + def _targetname(self): + return "undefined_aggregate" + + def _probe(self, parent: StxGenericModel, tags=None): + # Set a tag for children resource + self._tags.pool = parent.res_pool_id + self._set_respool_client() + + resourcepoolid = parent.id + newmodels = self._client.list(resourcepoolid=resourcepoolid) + return [commands.UpdateUndefinedAgg(data=m, parentid=resourcepoolid) + for m in newmodels] diff --git a/o2ims/service/watcher/aggregate_watcher.py b/o2ims/service/watcher/aggregate_watcher.py new file mode 100644 index 0000000..34e64da --- /dev/null +++ b/o2ims/service/watcher/aggregate_watcher.py @@ -0,0 +1,45 @@ +# Copyright (C) 2022 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from o2ims.domain.stx_object import StxGenericModel +from o2common.service.client.base_client import BaseClient +from o2common.service.watcher.base import BaseWatcher +from o2ims.domain import commands +from o2common.domain import tags +from o2common.service.messagebus import MessageBus + +from o2common.helper import o2logging +logger = o2logging.get_logger(__name__) + + +class AggregateWatcher(BaseWatcher): + def __init__(self, client: BaseClient, + bus: MessageBus) -> None: + super().__init__(client, bus) + self._tags = tags.Tag() + self.poolid = None + + def _targetname(self): + return "aggregate" + + def _probe(self, parent: StxGenericModel, tags: object = None): + resourcepoolid = parent.id + newmodels = self._client.list(resourcepoolid=resourcepoolid) + logger.warning(newmodels[0]) + return [commands.UpdateAggregate(data=m, parentid=resourcepoolid) + for m in newmodels] + + def _set_respool_client(self): + self.poolid = self._tags.pool + self._client.set_pool_driver(self.poolid) -- 2.16.6