from o2ims.service.watcher.pserver_acc_watcher import PServerAccWatcher
from o2ims.adapter.clients.ocloud_client import StxAccClient
+from o2ims.service.watcher.agg_compute_watcher import ComputeAggWatcher
+from o2ims.service.watcher.agg_network_watcher import NetworkAggWatcher
+from o2ims.service.watcher.agg_storage_watcher import StorageAggWatcher
+from o2ims.service.watcher.agg_undefined_watcher import UndefinedAggWatcher
+from o2ims.adapter.clients.aggregate_client import ComputeAggClient, \
+ NetworkAggClient, StorageAggClient, UndefinedAggClient
+
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
child_respool = root.addchild(
ResourcePoolWatcher(StxResourcePoolClient(),
self.bus))
+ child_respool.addchild(
+ ComputeAggWatcher(ComputeAggClient(), self.bus))
+ child_respool.addchild(
+ NetworkAggWatcher(NetworkAggClient(), self.bus))
+ child_respool.addchild(
+ StorageAggWatcher(StorageAggClient(), self.bus))
+ child_respool.addchild(
+ UndefinedAggWatcher(UndefinedAggClient(), self.bus))
+
child_pserver = child_respool.addchild(
PServerWatcher(StxPserverClient(), self.bus))
child_pserver.addchild(
resourcepool_handler, pserver_handler, pserver_cpu_handler, \
pserver_mem_handler, pserver_port_handler, pserver_if_handler,\
pserver_eth_handler, pserver_acc_handler, alarm_handler, \
- pserver_dev_handler
+ pserver_dev_handler, agg_compute_handler, agg_network_handler,\
+ agg_storage_handler, agg_undefined_handler
from o2ims.service.command import notify_handler, registration_handler,\
notify_alarm_handler
from o2ims.service.event import ocloud_event, resource_event, \
commands.UpdateDms: dms_handler.update_dms,
commands.UpdateAlarm: alarm_handler.update_alarm,
commands.UpdateResourcePool: resourcepool_handler.update_resourcepool,
+ commands.UpdateComputeAgg: agg_compute_handler.update_compute_aggregate,
+ commands.UpdateNetworkAgg: agg_network_handler.update_network_aggregate,
+ commands.UpdateStorageAgg: agg_storage_handler.update_storage_aggregate,
+ commands.UpdateUndefinedAgg:
+ agg_undefined_handler.update_undefined_aggregate,
commands.UpdatePserver: pserver_handler.update_pserver,
commands.UpdatePserverCpu: pserver_cpu_handler.update_pserver_cpu,
commands.UpdatePserverMem: pserver_mem_handler.update_pserver_mem,
# See the License for the specific language governing permissions and
# limitations under the License.
+import traceback
# from logging import exception
# from cgtsclient import exc
+
from o2common.service.client.base_client import BaseClient
from o2common.domain import commands
from o2common.service.messagebus import MessageBus
except Exception as ex:
logger.warning("Failed to probe %s watcher due to: %s - %s" %
(self._targetname(), type(ex), str(ex)))
+ logger.debug(traceback.format_exc())
return []
def _probe(self, parent: object = None, tags: object = None) \
--- /dev/null
+# Copyright (C) 2022 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import uuid
+from typing import List
+
+from o2common.service.client.base_client import BaseClient
+from o2ims.domain import stx_object as ocloudModel
+from o2ims.domain.resource_type import ResourceTypeEnum
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+class ComputeAggClient(BaseClient):
+ def __init__(self, driver=None):
+ super().__init__()
+ self.driver = AggClientImp()
+
+ def _get(self, id) -> ocloudModel.StxGenericModel:
+ return self.driver.getComputeList(res_pool=self._pool_id)[0]
+
+ def _list(self, **filters):
+ return self.driver.getComputeList(res_pool=self._pool_id)
+
+ def _set_stx_client(self):
+ pass
+
+
+class NetworkAggClient(BaseClient):
+ def __init__(self, driver=None):
+ super().__init__()
+ self.driver = AggClientImp()
+
+ def _get(self, id) -> ocloudModel.StxGenericModel:
+ return self.driver.getNetworkList(res_pool=self._pool_id)[0]
+
+ def _list(self, **filters):
+ return self.driver.getNetworkList(res_pool=self._pool_id)
+
+ def _set_stx_client(self):
+ pass
+
+
+class StorageAggClient(BaseClient):
+ def __init__(self, driver=None):
+ super().__init__()
+ self.driver = AggClientImp()
+
+ def _get(self, id) -> ocloudModel.StxGenericModel:
+ return self.driver.getStorageList(res_pool=self._pool_id)[0]
+
+ def _list(self, **filters):
+ return self.driver.getStorageList(res_pool=self._pool_id)
+
+ def _set_stx_client(self):
+ pass
+
+
+class UndefinedAggClient(BaseClient):
+ def __init__(self, driver=None):
+ super().__init__()
+ self.driver = AggClientImp()
+
+ def _get(self, id) -> ocloudModel.StxGenericModel:
+ return self.driver.getUndefinedList(res_pool=self._pool_id)[0]
+
+ def _list(self, **filters):
+ return self.driver.getUndefinedList(res_pool=self._pool_id)
+
+ def _set_stx_client(self):
+ pass
+
+
+class AggClientImp(object):
+ def __init__(self):
+ super().__init__()
+
+ def getComputeList(self, **filters) -> List[ocloudModel.StxGenericModel]:
+ compute = ComputeAggregate(filters['res_pool'])
+ return [ocloudModel.StxGenericModel(
+ ResourceTypeEnum.COMPUTE_AGGREGATE, compute)]
+
+ def getNetworkList(self, **filters) -> List[ocloudModel.StxGenericModel]:
+ network = NetworkAggregate(filters['res_pool'])
+ return [ocloudModel.StxGenericModel(
+ ResourceTypeEnum.NETWORK_AGGREGATE, network)]
+
+ def getStorageList(self, **filters) -> List[ocloudModel.StxGenericModel]:
+ storage = StorageAggregate(filters['res_pool'])
+ return [ocloudModel.StxGenericModel(
+ ResourceTypeEnum.STORAGE_AGGREGATE, storage)]
+
+ def getUndefinedList(self, **filters) -> List[ocloudModel.StxGenericModel]:
+ undefined = UndefinedAggregate(filters['res_pool'])
+ return [ocloudModel.StxGenericModel(
+ ResourceTypeEnum.UNDEFINED_AGGREGATE, undefined)]
+
+
+class Aggregate:
+ def __init__(self, res_pool_id: str, name: str) -> None:
+ self.name = name
+ setattr(self, 'name', self.name)
+ setattr(self, 'uuid',
+ uuid.uuid3(uuid.NAMESPACE_URL, res_pool_id + self.name))
+ setattr(self, 'updated_at', None)
+ setattr(self, 'created_at', None)
+
+ def to_dict(self):
+ return {}
+
+
+class ComputeAggregate(Aggregate):
+ def __init__(self, res_pool_id: str) -> None:
+ super().__init__(res_pool_id, 'compute_aggregate')
+
+
+class NetworkAggregate(Aggregate):
+ def __init__(self, res_pool_id: str) -> None:
+ super().__init__(res_pool_id, 'network_aggregate')
+
+
+class StorageAggregate(Aggregate):
+ def __init__(self, res_pool_id: str) -> None:
+ super().__init__(res_pool_id, 'storage_aggregate')
+
+
+class UndefinedAggregate(Aggregate):
+ def __init__(self, res_pool_id: str) -> None:
+ super().__init__(res_pool_id, 'undefined_aggregate')
parentid: str
+@dataclass
+class UpdateComputeAgg(UpdateResource):
+ pass
+
+
+@dataclass
+class UpdateNetworkAgg(UpdateResource):
+ pass
+
+
+@dataclass
+class UpdateStorageAgg(UpdateResource):
+ pass
+
+
+@dataclass
+class UpdateUndefinedAgg(UpdateResource):
+ pass
+
+
@dataclass
class UpdatePserver(UpdateResource):
pass
PSERVER_ETH = 16
PSERVER_PCI_DEV = 17
PSERVER_ACC = 18
+ COMPUTE_AGGREGATE = 70
+ NETWORK_AGGREGATE = 80
+ STORAGE_AGGREGATE = 90
+ UNDEFINED_AGGREGATE = 100
class ResourceKindEnum(Enum):
--- /dev/null
+# Copyright (C) 2022 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# pylint: disable=unused-argument
+from __future__ import annotations
+import uuid
+# import json
+from typing import Callable
+
+from o2ims.domain import commands, events
+from o2ims.domain.stx_object import StxGenericModel
+from o2ims.domain.subscription_obj import NotificationEventEnum
+from o2common.service.unit_of_work import AbstractUnitOfWork
+from o2ims.domain.resource_type import MismatchedModel
+from o2ims.domain.ocloud import Resource, ResourceType
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+class InvalidResourceType(Exception):
+ pass
+
+
+def update_compute_aggregate(
+ cmd: commands.UpdateComputeAgg,
+ uow: AbstractUnitOfWork,
+ publish: Callable
+):
+ stxobj = cmd.data
+ with uow:
+ res = uow.session.execute(
+ '''
+ SELECT "resourceTypeId", "oCloudId", "name"
+ FROM "resourceType"
+ WHERE "resourceTypeEnum" = :resource_type_enum
+ ''',
+ dict(resource_type_enum=stxobj.type.name)
+ )
+ first = res.first()
+ if first is None:
+ resourcepool = uow.resource_pools.get(cmd.parentid)
+ res_type_name = 'compute_aggregate'
+ resourcetype_id = str(uuid.uuid3(
+ uuid.NAMESPACE_URL, res_type_name))
+ uow.resource_types.add(ResourceType(
+ resourcetype_id,
+ res_type_name, stxobj.type,
+ resourcepool.oCloudId,
+ description='The compute Aggregate resource type'))
+ else:
+ resourcetype_id = first['resourceTypeId']
+
+ resource = uow.resources.get(stxobj.id)
+ if not resource:
+ logger.info("Add compute aggregate:" + stxobj.name
+ + " update_at: " + str(stxobj.updatetime)
+ + " id: " + str(stxobj.id)
+ + " hash: " + str(stxobj.hash))
+ localmodel = create_by(stxobj, cmd.parentid, resourcetype_id)
+ uow.resources.add(localmodel)
+
+ logger.info("Add the compute aggregate: " + stxobj.id
+ + ", name: " + stxobj.name)
+ else:
+ localmodel = resource
+ if is_outdated(localmodel, stxobj):
+ logger.info("update compute aggregate:" + stxobj.name
+ + " update_at: " + str(stxobj.updatetime)
+ + " id: " + str(stxobj.id)
+ + " hash: " + str(stxobj.hash))
+ update_by(localmodel, stxobj, cmd.parentid)
+ uow.resources.update(localmodel)
+
+ logger.info("Update the compute aggregate: " + stxobj.id
+ + ", name: " + stxobj.name)
+ uow.commit()
+
+
+def is_outdated(resource: Resource, stxobj: StxGenericModel):
+ return True if resource.hash != stxobj.hash else False
+
+
+def create_by(stxobj: StxGenericModel, parentid: str, resourcetype_id: str) \
+ -> Resource:
+ # content = json.loads(stxobj.content)
+ resourcetype_id = resourcetype_id
+ resourcepool_id = parentid
+ parent_id = None # the root of the resource has no parent id
+ gAssetId = '' # TODO: global ID
+ description = "%s : A Compute Aggregate server resource" % stxobj.name
+ resource = Resource(stxobj.id, resourcetype_id, resourcepool_id,
+ stxobj.name, parent_id, gAssetId, stxobj.content,
+ description)
+ resource.createtime = stxobj.createtime
+ resource.updatetime = stxobj.updatetime
+ resource.hash = stxobj.hash
+
+ return resource
+
+
+def update_by(target: Resource, stxobj: StxGenericModel,
+ parentid: str) -> None:
+ if target.resourceId != stxobj.id:
+ raise MismatchedModel("Mismatched Id")
+ target.createtime = stxobj.createtime
+ target.updatetime = stxobj.updatetime
+ target.hash = stxobj.hash
+ target.version_number = target.version_number + 1
+ target.events.append(events.ResourceChanged(
+ id=stxobj.id,
+ resourcePoolId=target.resourcePoolId,
+ notificationEventType=NotificationEventEnum.MODIFY,
+ updatetime=stxobj.updatetime
+ ))
--- /dev/null
+# Copyright (C) 2022 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# pylint: disable=unused-argument
+from __future__ import annotations
+import uuid
+# import json
+from typing import Callable
+
+from o2ims.domain import commands, events
+from o2ims.domain.stx_object import StxGenericModel
+from o2ims.domain.subscription_obj import NotificationEventEnum
+from o2common.service.unit_of_work import AbstractUnitOfWork
+from o2ims.domain.resource_type import MismatchedModel
+from o2ims.domain.ocloud import Resource, ResourceType
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+class InvalidResourceType(Exception):
+ pass
+
+
+def update_network_aggregate(
+ cmd: commands.UpdateNetworkAgg,
+ uow: AbstractUnitOfWork,
+ publish: Callable
+):
+ stxobj = cmd.data
+ with uow:
+ res = uow.session.execute(
+ '''
+ SELECT "resourceTypeId", "oCloudId", "name"
+ FROM "resourceType"
+ WHERE "resourceTypeEnum" = :resource_type_enum
+ ''',
+ dict(resource_type_enum=stxobj.type.name)
+ )
+ first = res.first()
+ if first is None:
+ resourcepool = uow.resource_pools.get(cmd.parentid)
+ res_type_name = 'network_aggregate'
+ resourcetype_id = str(uuid.uuid3(
+ uuid.NAMESPACE_URL, res_type_name))
+ uow.resource_types.add(ResourceType(
+ resourcetype_id,
+ res_type_name, stxobj.type,
+ resourcepool.oCloudId,
+ description='The network Aggregate resource type'))
+ else:
+ resourcetype_id = first['resourceTypeId']
+
+ resource = uow.resources.get(stxobj.id)
+ if not resource:
+ logger.info("Add network aggregate:" + stxobj.name
+ + " update_at: " + str(stxobj.updatetime)
+ + " id: " + str(stxobj.id)
+ + " hash: " + str(stxobj.hash))
+ localmodel = create_by(stxobj, cmd.parentid, resourcetype_id)
+ uow.resources.add(localmodel)
+
+ logger.info("Add the network aggregate: " + stxobj.id
+ + ", name: " + stxobj.name)
+ else:
+ localmodel = resource
+ if is_outdated(localmodel, stxobj):
+ logger.info("update network aggregate:" + stxobj.name
+ + " update_at: " + str(stxobj.updatetime)
+ + " id: " + str(stxobj.id)
+ + " hash: " + str(stxobj.hash))
+ update_by(localmodel, stxobj, cmd.parentid)
+ uow.resources.update(localmodel)
+
+ logger.info("Update the network aggregate: " + stxobj.id
+ + ", name: " + stxobj.name)
+ uow.commit()
+
+
+def is_outdated(resource: Resource, stxobj: StxGenericModel):
+ return True if resource.hash != stxobj.hash else False
+
+
+def create_by(stxobj: StxGenericModel, parentid: str, resourcetype_id: str) \
+ -> Resource:
+ # content = json.loads(stxobj.content)
+ resourcetype_id = resourcetype_id
+ resourcepool_id = parentid
+ parent_id = None # the root of the resource has no parent id
+ gAssetId = '' # TODO: global ID
+ description = "%s : A Network Aggregate server resource" % stxobj.name
+ resource = Resource(stxobj.id, resourcetype_id, resourcepool_id,
+ stxobj.name, parent_id, gAssetId, stxobj.content,
+ description)
+ resource.createtime = stxobj.createtime
+ resource.updatetime = stxobj.updatetime
+ resource.hash = stxobj.hash
+
+ return resource
+
+
+def update_by(target: Resource, stxobj: StxGenericModel,
+ parentid: str) -> None:
+ if target.resourceId != stxobj.id:
+ raise MismatchedModel("Mismatched Id")
+ target.createtime = stxobj.createtime
+ target.updatetime = stxobj.updatetime
+ target.hash = stxobj.hash
+ target.version_number = target.version_number + 1
+ target.events.append(events.ResourceChanged(
+ id=stxobj.id,
+ resourcePoolId=target.resourcePoolId,
+ notificationEventType=NotificationEventEnum.MODIFY,
+ updatetime=stxobj.updatetime
+ ))
--- /dev/null
+# Copyright (C) 2022 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# pylint: disable=unused-argument
+from __future__ import annotations
+import uuid
+# import json
+from typing import Callable
+
+from o2ims.domain import commands, events
+from o2ims.domain.stx_object import StxGenericModel
+from o2ims.domain.subscription_obj import NotificationEventEnum
+from o2common.service.unit_of_work import AbstractUnitOfWork
+from o2ims.domain.resource_type import MismatchedModel
+from o2ims.domain.ocloud import Resource, ResourceType
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+class InvalidResourceType(Exception):
+ pass
+
+
+def update_storage_aggregate(
+ cmd: commands.UpdateStorageAgg,
+ uow: AbstractUnitOfWork,
+ publish: Callable
+):
+ stxobj = cmd.data
+ with uow:
+ res = uow.session.execute(
+ '''
+ SELECT "resourceTypeId", "oCloudId", "name"
+ FROM "resourceType"
+ WHERE "resourceTypeEnum" = :resource_type_enum
+ ''',
+ dict(resource_type_enum=stxobj.type.name)
+ )
+ first = res.first()
+ if first is None:
+ resourcepool = uow.resource_pools.get(cmd.parentid)
+ res_type_name = 'storage_aggregate'
+ resourcetype_id = str(uuid.uuid3(
+ uuid.NAMESPACE_URL, res_type_name))
+ uow.resource_types.add(ResourceType(
+ resourcetype_id,
+ res_type_name, stxobj.type,
+ resourcepool.oCloudId,
+ description='The storage Aggregate resource type'))
+ else:
+ resourcetype_id = first['resourceTypeId']
+
+ resource = uow.resources.get(stxobj.id)
+ if not resource:
+ logger.info("Add storage aggregate:" + stxobj.name
+ + " update_at: " + str(stxobj.updatetime)
+ + " id: " + str(stxobj.id)
+ + " hash: " + str(stxobj.hash))
+ localmodel = create_by(stxobj, cmd.parentid, resourcetype_id)
+ uow.resources.add(localmodel)
+
+ logger.info("Add the storage aggregate: " + stxobj.id
+ + ", name: " + stxobj.name)
+ else:
+ localmodel = resource
+ if is_outdated(localmodel, stxobj):
+ logger.info("update storage aggregate:" + stxobj.name
+ + " update_at: " + str(stxobj.updatetime)
+ + " id: " + str(stxobj.id)
+ + " hash: " + str(stxobj.hash))
+ update_by(localmodel, stxobj, cmd.parentid)
+ uow.resources.update(localmodel)
+
+ logger.info("Update the storage aggregate: " + stxobj.id
+ + ", name: " + stxobj.name)
+ uow.commit()
+
+
+def is_outdated(resource: Resource, stxobj: StxGenericModel):
+ return True if resource.hash != stxobj.hash else False
+
+
+def create_by(stxobj: StxGenericModel, parentid: str, resourcetype_id: str) \
+ -> Resource:
+ # content = json.loads(stxobj.content)
+ resourcetype_id = resourcetype_id
+ resourcepool_id = parentid
+ parent_id = None # the root of the resource has no parent id
+ gAssetId = '' # TODO: global ID
+ description = "%s : A Storage Aggregate server resource" % stxobj.name
+ resource = Resource(stxobj.id, resourcetype_id, resourcepool_id,
+ stxobj.name, parent_id, gAssetId, stxobj.content,
+ description)
+ resource.createtime = stxobj.createtime
+ resource.updatetime = stxobj.updatetime
+ resource.hash = stxobj.hash
+
+ return resource
+
+
+def update_by(target: Resource, stxobj: StxGenericModel,
+ parentid: str) -> None:
+ if target.resourceId != stxobj.id:
+ raise MismatchedModel("Mismatched Id")
+ target.createtime = stxobj.createtime
+ target.updatetime = stxobj.updatetime
+ target.hash = stxobj.hash
+ target.version_number = target.version_number + 1
+ target.events.append(events.ResourceChanged(
+ id=stxobj.id,
+ resourcePoolId=target.resourcePoolId,
+ notificationEventType=NotificationEventEnum.MODIFY,
+ updatetime=stxobj.updatetime
+ ))
--- /dev/null
+# Copyright (C) 2022 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# pylint: disable=unused-argument
+from __future__ import annotations
+import uuid
+# import json
+from typing import Callable
+
+from o2ims.domain import commands, events
+from o2ims.domain.stx_object import StxGenericModel
+from o2ims.domain.subscription_obj import NotificationEventEnum
+from o2common.service.unit_of_work import AbstractUnitOfWork
+from o2ims.domain.resource_type import MismatchedModel
+from o2ims.domain.ocloud import Resource, ResourceType
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+class InvalidResourceType(Exception):
+ pass
+
+
+def update_undefined_aggregate(
+ cmd: commands.UpdateUndefinedAgg,
+ uow: AbstractUnitOfWork,
+ publish: Callable
+):
+ stxobj = cmd.data
+ with uow:
+ res = uow.session.execute(
+ '''
+ SELECT "resourceTypeId", "oCloudId", "name"
+ FROM "resourceType"
+ WHERE "resourceTypeEnum" = :resource_type_enum
+ ''',
+ dict(resource_type_enum=stxobj.type.name)
+ )
+ first = res.first()
+ if first is None:
+ resourcepool = uow.resource_pools.get(cmd.parentid)
+ res_type_name = 'undefined_aggregate'
+ resourcetype_id = str(uuid.uuid3(
+ uuid.NAMESPACE_URL, res_type_name))
+ uow.resource_types.add(ResourceType(
+ resourcetype_id,
+ res_type_name, stxobj.type,
+ resourcepool.oCloudId,
+ description='The undefined Aggregate resource type'))
+ else:
+ resourcetype_id = first['resourceTypeId']
+
+ resource = uow.resources.get(stxobj.id)
+ if not resource:
+ logger.info("Add undefined aggregate:" + stxobj.name
+ + " update_at: " + str(stxobj.updatetime)
+ + " id: " + str(stxobj.id)
+ + " hash: " + str(stxobj.hash))
+ localmodel = create_by(stxobj, cmd.parentid, resourcetype_id)
+ uow.resources.add(localmodel)
+
+ logger.info("Add the undefined aggregate: " + stxobj.id
+ + ", name: " + stxobj.name)
+ else:
+ localmodel = resource
+ if is_outdated(localmodel, stxobj):
+ logger.info("update undefined aggregate:" + stxobj.name
+ + " update_at: " + str(stxobj.updatetime)
+ + " id: " + str(stxobj.id)
+ + " hash: " + str(stxobj.hash))
+ update_by(localmodel, stxobj, cmd.parentid)
+ uow.resources.update(localmodel)
+
+ logger.info("Update the undefined aggregate: " + stxobj.id
+ + ", name: " + stxobj.name)
+ uow.commit()
+
+
+def is_outdated(resource: Resource, stxobj: StxGenericModel):
+ return True if resource.hash != stxobj.hash else False
+
+
+def create_by(stxobj: StxGenericModel, parentid: str, resourcetype_id: str) \
+ -> Resource:
+ # content = json.loads(stxobj.content)
+ resourcetype_id = resourcetype_id
+ resourcepool_id = parentid
+ parent_id = None # the root of the resource has no parent id
+ gAssetId = '' # TODO: global ID
+ description = "%s : An Undefined Aggregate server resource" % stxobj.name
+ resource = Resource(stxobj.id, resourcetype_id, resourcepool_id,
+ stxobj.name, parent_id, gAssetId, stxobj.content,
+ description)
+ resource.createtime = stxobj.createtime
+ resource.updatetime = stxobj.updatetime
+ resource.hash = stxobj.hash
+
+ return resource
+
+
+def update_by(target: Resource, stxobj: StxGenericModel,
+ parentid: str) -> None:
+ if target.resourceId != stxobj.id:
+ raise MismatchedModel("Mismatched Id")
+ target.createtime = stxobj.createtime
+ target.updatetime = stxobj.updatetime
+ target.hash = stxobj.hash
+ target.version_number = target.version_number + 1
+ target.events.append(events.ResourceChanged(
+ id=stxobj.id,
+ resourcePoolId=target.resourcePoolId,
+ notificationEventType=NotificationEventEnum.MODIFY,
+ updatetime=stxobj.updatetime
+ ))
--- /dev/null
+# Copyright (C) 2022 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from o2ims.domain.stx_object import StxGenericModel
+from o2common.service.client.base_client import BaseClient
+from o2ims.service.watcher.aggregate_watcher import AggregateWatcher
+from o2ims.domain import commands
+from o2common.service.messagebus import MessageBus
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+class ComputeAggWatcher(AggregateWatcher):
+ def __init__(self, client: BaseClient,
+ bus: MessageBus) -> None:
+ super().__init__(client, bus)
+
+ def _targetname(self):
+ return "compute_aggregate"
+
+ def _probe(self, parent: StxGenericModel, tags=None):
+ # Set a tag for children resource
+ self._tags.pool = parent.res_pool_id
+ self._set_respool_client()
+
+ resourcepoolid = parent.id
+ newmodels = self._client.list(resourcepoolid=resourcepoolid)
+ return [commands.UpdateComputeAgg(data=m, parentid=resourcepoolid)
+ for m in newmodels]
--- /dev/null
+# Copyright (C) 2022 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from o2ims.domain.stx_object import StxGenericModel
+from o2common.service.client.base_client import BaseClient
+from o2ims.service.watcher.aggregate_watcher import AggregateWatcher
+from o2ims.domain import commands
+from o2common.service.messagebus import MessageBus
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+class NetworkAggWatcher(AggregateWatcher):
+ def __init__(self, client: BaseClient,
+ bus: MessageBus) -> None:
+ super().__init__(client, bus)
+
+ def _targetname(self):
+ return "network_aggregate"
+
+ def _probe(self, parent: StxGenericModel, tags=None):
+ # Set a tag for children resource
+ self._tags.pool = parent.res_pool_id
+ self._set_respool_client()
+
+ resourcepoolid = parent.id
+ newmodels = self._client.list(resourcepoolid=resourcepoolid)
+ return [commands.UpdateNetworkAgg(data=m, parentid=resourcepoolid)
+ for m in newmodels]
--- /dev/null
+# Copyright (C) 2022 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from o2ims.domain.stx_object import StxGenericModel
+from o2common.service.client.base_client import BaseClient
+from o2ims.service.watcher.aggregate_watcher import AggregateWatcher
+from o2ims.domain import commands
+from o2common.service.messagebus import MessageBus
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+class StorageAggWatcher(AggregateWatcher):
+ def __init__(self, client: BaseClient,
+ bus: MessageBus) -> None:
+ super().__init__(client, bus)
+
+ def _targetname(self):
+ return "storage_aggregate"
+
+ def _probe(self, parent: StxGenericModel, tags=None):
+ # Set a tag for children resource
+ self._tags.pool = parent.res_pool_id
+ self._set_respool_client()
+
+ resourcepoolid = parent.id
+ newmodels = self._client.list(resourcepoolid=resourcepoolid)
+ return [commands.UpdateStorageAgg(data=m, parentid=resourcepoolid)
+ for m in newmodels]
--- /dev/null
+# Copyright (C) 2022 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from o2ims.domain.stx_object import StxGenericModel
+from o2common.service.client.base_client import BaseClient
+from o2ims.service.watcher.aggregate_watcher import AggregateWatcher
+from o2ims.domain import commands
+from o2common.service.messagebus import MessageBus
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+class UndefinedAggWatcher(AggregateWatcher):
+ def __init__(self, client: BaseClient,
+ bus: MessageBus) -> None:
+ super().__init__(client, bus)
+
+ def _targetname(self):
+ return "undefined_aggregate"
+
+ def _probe(self, parent: StxGenericModel, tags=None):
+ # Set a tag for children resource
+ self._tags.pool = parent.res_pool_id
+ self._set_respool_client()
+
+ resourcepoolid = parent.id
+ newmodels = self._client.list(resourcepoolid=resourcepoolid)
+ return [commands.UpdateUndefinedAgg(data=m, parentid=resourcepoolid)
+ for m in newmodels]
--- /dev/null
+# Copyright (C) 2022 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from o2ims.domain.stx_object import StxGenericModel
+from o2common.service.client.base_client import BaseClient
+from o2common.service.watcher.base import BaseWatcher
+from o2ims.domain import commands
+from o2common.domain import tags
+from o2common.service.messagebus import MessageBus
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+class AggregateWatcher(BaseWatcher):
+ def __init__(self, client: BaseClient,
+ bus: MessageBus) -> None:
+ super().__init__(client, bus)
+ self._tags = tags.Tag()
+ self.poolid = None
+
+ def _targetname(self):
+ return "aggregate"
+
+ def _probe(self, parent: StxGenericModel, tags: object = None):
+ resourcepoolid = parent.id
+ newmodels = self._client.list(resourcepoolid=resourcepoolid)
+ logger.warning(newmodels[0])
+ return [commands.UpdateAggregate(data=m, parentid=resourcepoolid)
+ for m in newmodels]
+
+ def _set_respool_client(self):
+ self.poolid = self._tags.pool
+ self._client.set_pool_driver(self.poolid)