From: Bin Yang Date: Mon, 22 Nov 2021 03:20:10 +0000 (+0800) Subject: Refactor watchers X-Git-Tag: 1.0.0~25 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=62f8863960ebd439c714b0ceed204731d9b31266;p=pti%2Fo2.git Refactor watchers integrate watcher with message bus and handler Signed-off-by: Bin Yang Change-Id: I93f98a88526c866b142ab5da17e5e30839d300a0 --- diff --git a/configs/log.yaml b/configs/log.yaml index 5eaf85b..62d1a92 100644 --- a/configs/log.yaml +++ b/configs/log.yaml @@ -26,11 +26,11 @@ loggers: propagate: False o2ims: handlers: [console_handler, file_handler] - level: "WARNING" + level: "DEBUG" propagate: False o2dms: handlers: [console_handler, file_handler] - level: "WARNING" + level: "DEBUG" propagate: False handlers: console_handler: diff --git a/o2ims/adapter/clients/orm_stx.py b/o2ims/adapter/clients/orm_stx.py index c2d0e6f..46ebdb7 100644 --- a/o2ims/adapter/clients/orm_stx.py +++ b/o2ims/adapter/clients/orm_stx.py @@ -27,12 +27,10 @@ from sqlalchemy import ( Enum ) -from sqlalchemy.orm import mapper -# from sqlalchemy.sql.sqltypes import Integer -# from sqlalchemy.sql.expression import true +# from sqlalchemy.orm import mapper + +# from o2ims.domain import stx_object as ocloudModel -from o2ims.domain import stx_object as ocloudModel -# from o2ims.adapter.orm import metadata from o2ims.service.unit_of_work import AbstractUnitOfWork from o2ims.adapter.unit_of_work import SqlAlchemyUnitOfWork from o2ims.domain.resource_type import ResourceTypeEnum @@ -56,10 +54,11 @@ stxobject = Table( def start_o2ims_stx_mappers(uow: AbstractUnitOfWork = SqlAlchemyUnitOfWork()): - logger.info("Starting O2 IMS Stx mappers") - mapper(ocloudModel.StxGenericModel, stxobject) + return + # logger.info("Starting O2 IMS Stx mappers") + # mapper(ocloudModel.StxGenericModel, stxobject) - with uow: - engine1 = uow.session.get_bind() - metadata.create_all(engine1) - uow.commit() + # with uow: + # engine1 = uow.session.get_bind() + # metadata.create_all(engine1) + # uow.commit() diff --git a/o2ims/adapter/orm.py b/o2ims/adapter/orm.py index a9d2cea..61c3157 100644 --- a/o2ims/adapter/orm.py +++ b/o2ims/adapter/orm.py @@ -16,15 +16,17 @@ from sqlalchemy import ( Table, MetaData, Column, - # Integer, + Integer, String, # Date, + DateTime, ForeignKey, # engine, # event, ) from sqlalchemy.orm import mapper, relationship +# from sqlalchemy.sql.sqltypes import Integer from o2ims.domain import ocloud as ocloudModel @@ -36,6 +38,11 @@ metadata = MetaData() ocloud = Table( "ocloud", metadata, + Column("updatetime", DateTime), + Column("createtime", DateTime), + Column("hash", String(255)), + Column("version_number", Integer), + Column("oCloudId", String(255), primary_key=True), Column("globalcloudId", String(255)), Column("name", String(255)), @@ -47,6 +54,10 @@ ocloud = Table( resourcetype = Table( "resourcetype", metadata, + Column("updatetime", DateTime), + Column("createtime", DateTime), + Column("hash", String(255)), + Column("resourceTypeId", String(255), primary_key=True), Column("oCloudId", ForeignKey("ocloud.oCloudId")), Column("name", String(255)), @@ -60,6 +71,11 @@ resourcetype = Table( resourcepool = Table( "resourcepool", metadata, + Column("updatetime", DateTime), + Column("createtime", DateTime), + Column("hash", String(255)), + Column("version_number", Integer), + Column("resourcePoolId", String(255), primary_key=True), Column("oCloudId", ForeignKey("ocloud.oCloudId")), Column("globalLocationId", String(255)), @@ -73,6 +89,11 @@ resourcepool = Table( resource = Table( "resource", metadata, + Column("updatetime", DateTime), + Column("createtime", DateTime), + Column("hash", String(255)), + Column("version_number", Integer), + Column("resourceId", String(255), primary_key=True), Column("resourceTypeId", ForeignKey("resourcetype.resourceTypeId")), Column("resourcePoolId", ForeignKey("resourcepool.resourcePoolId")), @@ -87,6 +108,10 @@ resource = Table( deploymentmanager = Table( "deploymentmanager", metadata, + Column("updatetime", DateTime), + Column("createtime", DateTime), + Column("hash", String(255)), + Column("deploymentManagerId", String(255), primary_key=True), Column("oCloudId", ForeignKey("ocloud.oCloudId")), Column("name", String(255)), @@ -101,6 +126,11 @@ deploymentmanager = Table( subscription = Table( "subscription", metadata, + Column("updatetime", DateTime), + Column("createtime", DateTime), + Column("hash", String(255)), + Column("version_number", Integer), + Column("subscriptionId", String(255), primary_key=True), Column("callback", String(255)), Column("consumerSubscriptionId", String(255)), diff --git a/o2ims/domain/base.py b/o2ims/domain/base.py index f7eceb7..8a673da 100644 --- a/o2ims/domain/base.py +++ b/o2ims/domain/base.py @@ -12,10 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +from datetime import datetime from typing import List from .events import Event class AgRoot: def __init__(self) -> None: + self.hash = "" + # self.id = "" + self.updatetime = datetime.now() + self.createtime = datetime.now() self.events = [] # type: List[Event] diff --git a/o2ims/domain/commands.py b/o2ims/domain/commands.py index 869bf52..60e9db8 100644 --- a/o2ims/domain/commands.py +++ b/o2ims/domain/commands.py @@ -16,6 +16,9 @@ # from datetime import date # from typing import Optional from dataclasses import dataclass +# from datetime import datetime +# from o2ims.domain.resource_type import ResourceTypeEnum +from o2ims.domain.stx_object import StxGenericModel class Command: @@ -23,5 +26,35 @@ class Command: @dataclass -class UpdateDms(Command): - ref: str +class UpdateStxObject(Command): + data: StxGenericModel + + +@dataclass +class UpdateOCloud(UpdateStxObject): + pass + + +@dataclass +class UpdateDms(UpdateStxObject): + pass + + +@dataclass +class UpdateResourcePool(UpdateStxObject): + pass + + +@dataclass +class UpdateResource(UpdateStxObject): + pass + + +@dataclass +class UpdatePserverCpu(UpdateStxObject): + pass + + +@dataclass +class UpdatePserver(UpdateStxObject): + pass diff --git a/o2ims/domain/ocloud.py b/o2ims/domain/ocloud.py index 49b2550..847ce4c 100644 --- a/o2ims/domain/ocloud.py +++ b/o2ims/domain/ocloud.py @@ -26,6 +26,7 @@ class Subscription(AgRoot): def __init__(self, id: str, callback: str, consumersubid: str = '', filter: str = '') -> None: self.subscriptionId = id + self.version_number = 0 self.callback = callback self.consumerSubscriptionId = consumersubid self.filter = filter @@ -37,6 +38,7 @@ class DeploymentManager: supportedLocations: str = '', capabilities: str = '', capacity: str = '') -> None: self.deploymentManagerId = id + self.version_number = 0 self.oCloudId = ocloudid self.name = name self.description = description @@ -52,6 +54,7 @@ class ResourcePool(AgRoot): ocloudid: str, gLocationId: str = '', description: str = '') -> None: self.resourcePoolId = id + self.version_number = 0 self.oCloudId = ocloudid self.globalLocationId = gLocationId self.name = name @@ -82,6 +85,7 @@ class Resource(AgRoot): parentId: str = '', elements: list = [], description: str = '') -> None: self.resourceId = resourceId + self.version_number = 0 self.oCloudId = oCloudId self.resourceTypeId = resourceTypeId self.resourcePoolId = resourcePoolId diff --git a/o2ims/domain/resource_type.py b/o2ims/domain/resource_type.py index d95eb18..79b36fd 100644 --- a/o2ims/domain/resource_type.py +++ b/o2ims/domain/resource_type.py @@ -8,3 +8,11 @@ class ResourceTypeEnum(Enum): PSERVER = 11 PSERVER_CPU = 12 PSERVER_RAM = 13 + + +class InvalidOcloudState(Exception): + pass + + +class MismatchedModel(Exception): + pass diff --git a/o2ims/domain/stx_object.py b/o2ims/domain/stx_object.py index 90c166f..773bd3a 100644 --- a/o2ims/domain/stx_object.py +++ b/o2ims/domain/stx_object.py @@ -17,15 +17,11 @@ import datetime import json from o2ims.domain.base import AgRoot -from o2ims.domain.resource_type import ResourceTypeEnum +from o2ims.domain.resource_type import ResourceTypeEnum, MismatchedModel from o2common.helper import o2logging logger = o2logging.get_logger(__name__) -class MismatchedModel(Exception): - pass - - class StxGenericModel(AgRoot): def __init__(self, type: ResourceTypeEnum, api_response: dict = None, content_hash=None) -> None: diff --git a/o2ims/entrypoints/resource_watcher.py b/o2ims/entrypoints/resource_watcher.py index acde2d2..9e17485 100644 --- a/o2ims/entrypoints/resource_watcher.py +++ b/o2ims/entrypoints/resource_watcher.py @@ -50,17 +50,17 @@ class WatcherService(cotyledon.Service): def run(self): try: root = WatcherTree(OcloudWatcher( - StxSaOcloudClient(), self.bus.uow)) + StxSaOcloudClient(), self.bus)) root.addchild( - DmsWatcher(StxSaDmsClient(), self.bus.uow)) + DmsWatcher(StxSaDmsClient(), self.bus)) child_respool = root.addchild( ResourcePoolWatcher(StxSaResourcePoolClient(), - self.bus.uow)) + self.bus)) child_pserver = child_respool.addchild( - PServerWatcher(StxPserverClient(), self.bus.uow)) + PServerWatcher(StxPserverClient(), self.bus)) child_pserver.addchild( - PServerCpuWatcher(StxCpuClient(), self.bus.uow)) + PServerCpuWatcher(StxCpuClient(), self.bus)) self.worker.add_watcher(root) diff --git a/o2ims/service/auditor/base.py b/o2ims/service/auditor/base.py deleted file mode 100644 index b514342..0000000 --- a/o2ims/service/auditor/base.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright (C) 2021 Wind River Systems, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/o2ims/service/auditor/ocloud_handler.py b/o2ims/service/auditor/ocloud_handler.py new file mode 100644 index 0000000..9dd4f0e --- /dev/null +++ b/o2ims/service/auditor/ocloud_handler.py @@ -0,0 +1,101 @@ +# Copyright (C) 2021 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=unused-argument +from __future__ import annotations + +from o2ims.domain.stx_object import StxGenericModel +# from dataclasses import asdict +# from typing import List, Dict, Callable, Type +# TYPE_CHECKING +from o2ims.domain import commands +from o2ims.service.unit_of_work import AbstractUnitOfWork +from o2ims.domain.resource_type import InvalidOcloudState +from o2ims.domain.resource_type import MismatchedModel +from o2ims.domain.ocloud import Ocloud +from o2ims import config +# if TYPE_CHECKING: +# from . import unit_of_work + +from o2common.helper import o2logging +logger = o2logging.get_logger(__name__) + + +class InvalidResourceType(Exception): + pass + + +def update_ocloud( + cmd: commands.UpdateOCloud, + uow: AbstractUnitOfWork +): + stxobj = cmd.data + with uow: + oclouds = uow.oclouds.list() + if oclouds and len(oclouds) > 1: + raise InvalidOcloudState("More than 1 ocloud is found") + elif not oclouds or len(oclouds) == 0: + logger.info("add ocloud:" + stxobj.name + + " update_at: " + str(stxobj.updatetime) + + " id: " + str(stxobj.id) + + " hash: " + str(stxobj.hash)) + entry = create_by(stxobj) + uow.oclouds.add(entry) + + logger.info("Add the ocloud: " + stxobj.id + + ", name: " + stxobj.name) + else: + localmodel = oclouds.pop() + if is_outdated(localmodel, stxobj): + logger.info("update ocloud:" + stxobj.name + + " update_at: " + str(stxobj.updatetime) + + " id: " + str(stxobj.id) + + " hash: " + str(stxobj.hash)) + update_by(localmodel, stxobj) + uow.oclouds.update(localmodel) + + logger.info("Update the ocloud: " + stxobj.id + + ", name: " + stxobj.name) + uow.commit() + + +def is_outdated(ocloud: Ocloud, stxobj: StxGenericModel): + # if stxobj.updatetime: + # return True if Ocloud.updatetime < stxobj.updatetime else False + # else: + return True if ocloud.hash != stxobj.hash else False + + +def create_by(stxobj: StxGenericModel) -> Ocloud: + imsendpoint = config.get_api_url() + config.get_o2ims_api_base() + globalcloudId = stxobj.id # to be updated + description = "An ocloud" + ocloud = Ocloud(stxobj.id, stxobj.name, imsendpoint, + globalcloudId, description, 1) + ocloud.createtime = stxobj.createtime + ocloud.updatetime = stxobj.updatetime + ocloud.hash = stxobj.hash + + return ocloud + + +def update_by(ocloud: Ocloud, stxobj: StxGenericModel) -> None: + if ocloud.oCloudId != stxobj.id: + raise MismatchedModel("More than 1 ocloud found") + ocloud.name = stxobj.name + ocloud.createtime = stxobj.createtime + ocloud.updatetime = stxobj.updatetime + # ocloud.content = stxobj.content + ocloud.hash = stxobj.hash + ocloud.version_number = ocloud.version_number + 1 diff --git a/o2ims/service/handlers.py b/o2ims/service/handlers.py index c80ea6f..505549c 100644 --- a/o2ims/service/handlers.py +++ b/o2ims/service/handlers.py @@ -18,7 +18,7 @@ from __future__ import annotations from typing import List, Dict, Callable, Type # TYPE_CHECKING from o2ims.domain import commands, events -# ocloud +from o2ims.service.auditor import ocloud_handler # if TYPE_CHECKING: # from . import unit_of_work @@ -33,4 +33,5 @@ EVENT_HANDLERS = { COMMAND_HANDLERS = { + commands.UpdateOCloud: ocloud_handler.update_ocloud, } # type: Dict[Type[commands.Command], Callable] diff --git a/o2ims/service/messagebus.py b/o2ims/service/messagebus.py index c1970f8..297aa8f 100644 --- a/o2ims/service/messagebus.py +++ b/o2ims/service/messagebus.py @@ -41,7 +41,9 @@ class MessageBus: self.queue = [message] while self.queue: message = self.queue.pop(0) - if isinstance(message, events.Event): + if not message: + continue + elif isinstance(message, events.Event): self.handle_event(message) elif isinstance(message, commands.Command): self.handle_command(message) @@ -65,6 +67,6 @@ class MessageBus: handler = self.command_handlers[type(command)] handler(command) self.queue.extend(self.uow.collect_new_events()) - except Exception: + except Exception as ex: logger.exception("Exception handling command %s", command) - raise + raise ex diff --git a/o2ims/service/watcher/base.py b/o2ims/service/watcher/base.py index 6700eb3..0daf8d4 100644 --- a/o2ims/service/watcher/base.py +++ b/o2ims/service/watcher/base.py @@ -12,45 +12,58 @@ # See the License for the specific language governing permissions and # limitations under the License. +# from logging import exception +# from cgtsclient import exc from o2ims.service.client.base_client import BaseClient -from o2ims.domain.stx_object import StxGenericModel -from o2ims.service.unit_of_work import AbstractUnitOfWork - +# from o2ims.domain.stx_object import StxGenericModel +# from o2ims.service.unit_of_work import AbstractUnitOfWork +from o2ims.domain import commands +from o2ims.service.messagebus import MessageBus from o2common.helper import o2logging logger = o2logging.get_logger(__name__) class BaseWatcher(object): def __init__(self, client: BaseClient, - uow: AbstractUnitOfWork) -> None: + bus: MessageBus) -> None: super().__init__() self._client = client - self._uow = uow + self._bus = bus + # self._uow = bus.uow def targetname(self) -> str: return self._targetname() - def probe(self, parent: object = None): - return self._probe(parent) + def probe(self, parent: commands.UpdateStxObject = None): + try: + cmds = self._probe(parent.data if parent else None) + for cmd in cmds: + self._bus.handle(cmd) + + # return self._probe(parent) + return cmds + except Exception as ex: + logger.warning("Failed to probe resource due to: " + str(ex)) + return [] - def _probe(self, parent: object = None): + def _probe(self, parent: object = None) -> commands.UpdateStxObject: raise NotImplementedError def _targetname(self): raise NotImplementedError - def _compare_and_update(self, newmodel: StxGenericModel) -> bool: - with self._uow: - # localmodel = self._uow.stxobjects.get(ocloudmodel.id) - localmodel = self._uow.stxobjects.get(str(newmodel.id)) - if not localmodel: - logger.info("add entry:" + newmodel.name) - self._uow.stxobjects.add(newmodel) - elif localmodel.is_outdated(newmodel): - logger.info("update entry:" + newmodel.name) - localmodel.update_by(newmodel) - self._uow.stxobjects.update(localmodel) - self._uow.commit() + # def _compare_and_update(self, newmodel: StxGenericModel) -> bool: + # with self._uow: + # # localmodel = self._uow.stxobjects.get(ocloudmodel.id) + # localmodel = self._uow.stxobjects.get(str(newmodel.id)) + # if not localmodel: + # logger.info("add entry:" + newmodel.name) + # self._uow.stxobjects.add(newmodel) + # elif localmodel.is_outdated(newmodel): + # logger.info("update entry:" + newmodel.name) + # localmodel.update_by(newmodel) + # self._uow.stxobjects.update(localmodel) + # self._uow.commit() # node to organize watchers in tree hierachy @@ -74,7 +87,7 @@ class WatcherTree(object): + self.watcher.targetname()) childdepth = depth - 1 if depth > 0 else 0 resources = self.watcher.probe(parentresource) - logger.debug("probe returns " + str(len(resources)) + "resources") + logger.debug("probe returns " + str(len(resources)) + " resources") if depth == 1: # stop recursive diff --git a/o2ims/service/watcher/ocloud_watcher.py b/o2ims/service/watcher/ocloud_watcher.py index 6a73f48..bf4ce43 100644 --- a/o2ims/service/watcher/ocloud_watcher.py +++ b/o2ims/service/watcher/ocloud_watcher.py @@ -12,62 +12,63 @@ # See the License for the specific language governing permissions and # limitations under the License. -from o2ims.domain.resource_type import ResourceTypeEnum +# from o2ims.domain.resource_type import ResourceTypeEnum from o2ims.service.client.base_client import BaseClient -from o2ims.domain.stx_object import StxGenericModel -from o2ims.service.unit_of_work import AbstractUnitOfWork +# from o2ims.domain.stx_object import StxGenericModel +# from o2ims.service.unit_of_work import AbstractUnitOfWork from o2ims.service.watcher.base import BaseWatcher +from o2ims.domain import commands +from o2ims.service.messagebus import MessageBus from o2common.helper import o2logging logger = o2logging.get_logger(__name__) -class InvalidOcloudState(Exception): - pass - - class OcloudWatcher(BaseWatcher): def __init__(self, ocloud_client: BaseClient, - uow: AbstractUnitOfWork) -> None: - super().__init__(ocloud_client, uow) + bus: MessageBus) -> None: + super().__init__(ocloud_client, bus) def _targetname(self): return "ocloud" def _probe(self, parent: object = None): - ocloudmodel = self._client.get(None) - if ocloudmodel: - self._compare_and_update(ocloudmodel) - return [ocloudmodel] + newmodel = self._client.get(None) + if newmodel: + logger.debug("found ocloud: " + newmodel.name) + else: + logger.warning("Failed to find out any ocloud") + # self._compare_and_update(ocloudmodel) + return [commands.UpdateOCloud(newmodel)] if newmodel else [] - def _compare_and_update(self, ocloudmodel: StxGenericModel) -> bool: - with self._uow: - # localmodel = self._uow.stxobjects.get(str(ocloudmodel.id)) - oclouds = self._uow.stxobjects.list(ResourceTypeEnum.OCLOUD) - if len(oclouds) > 1: - raise InvalidOcloudState("More than 1 ocloud is found") - if len(oclouds) == 0: - logger.info("add ocloud:" + ocloudmodel.name - + " update_at: " + str(ocloudmodel.updatetime) - + " id: " + str(ocloudmodel.id) - + " hash: " + str(ocloudmodel.hash)) - self._uow.stxobjects.add(ocloudmodel) - else: - localmodel = oclouds.pop() - if localmodel.is_outdated(ocloudmodel): - logger.info("update ocloud:" + ocloudmodel.name - + " update_at: " + str(ocloudmodel.updatetime) - + " id: " + str(ocloudmodel.id) - + " hash: " + str(ocloudmodel.hash)) - localmodel.update_by(ocloudmodel) - self._uow.stxobjects.update(localmodel) - self._uow.commit() +# def _compare_and_update(self, ocloudmodel: StxGenericModel) -> bool: +# with self._uow: +# # localmodel = self._uow.stxobjects.get(str(ocloudmodel.id)) +# oclouds = self._uow.stxobjects.list(ResourceTypeEnum.OCLOUD) +# if len(oclouds) > 1: +# raise InvalidOcloudState("More than 1 ocloud is found") +# if len(oclouds) == 0: +# logger.info("add ocloud:" + ocloudmodel.name +# + " update_at: " + str(ocloudmodel.updatetime) +# + " id: " + str(ocloudmodel.id) +# + " hash: " + str(ocloudmodel.hash)) +# self._uow.stxobjects.add(ocloudmodel) +# else: +# localmodel = oclouds.pop() +# if localmodel.is_outdated(ocloudmodel): +# logger.info("update ocloud:" + ocloudmodel.name +# + " update_at: " + str(ocloudmodel.updatetime) +# + " id: " + str(ocloudmodel.id) +# + " hash: " + str(ocloudmodel.hash)) +# localmodel.update_by(ocloudmodel) +# self._uow.stxobjects.update(localmodel) +# self._uow.commit() class DmsWatcher(BaseWatcher): def __init__(self, client: BaseClient, - uow: AbstractUnitOfWork) -> None: - super().__init__(client, uow) + bus: MessageBus) -> None: + super().__init__(client, bus) def _targetname(self): return "dms" @@ -75,6 +76,7 @@ class DmsWatcher(BaseWatcher): def _probe(self, parent: object = None): ocloudid = parent.id if parent else None newmodels = self._client.list(ocloudid=ocloudid) - for newmodel in newmodels: - super()._compare_and_update(newmodel) - return newmodels + # for newmodel in newmodels: + # super()._compare_and_update(newmodel) + # return newmodels + return [commands.UpdateDms(m) for m in newmodels] diff --git a/o2ims/service/watcher/pserver_cpu_watcher.py b/o2ims/service/watcher/pserver_cpu_watcher.py index 3add20b..b18ead6 100644 --- a/o2ims/service/watcher/pserver_cpu_watcher.py +++ b/o2ims/service/watcher/pserver_cpu_watcher.py @@ -13,8 +13,10 @@ # limitations under the License. from o2ims.service.client.base_client import BaseClient -from o2ims.service.unit_of_work import AbstractUnitOfWork +# from o2ims.service.unit_of_work import AbstractUnitOfWork from o2ims.service.watcher.resource_watcher import ResourceWatcher +from o2ims.domain import commands +from o2ims.service.messagebus import MessageBus from o2common.helper import o2logging logger = o2logging.get_logger(__name__) @@ -22,8 +24,8 @@ logger = o2logging.get_logger(__name__) class PServerCpuWatcher(ResourceWatcher): def __init__(self, client: BaseClient, - uow: AbstractUnitOfWork) -> None: - super().__init__(client, uow) + bus: MessageBus) -> None: + super().__init__(client, bus) def _targetname(self): return "pserver_cpu" @@ -31,6 +33,7 @@ class PServerCpuWatcher(ResourceWatcher): def _probe(self, parent: object = None): hostid = parent.id if parent else None newmodels = self._client.list(hostid=hostid) - for newmodel in newmodels: - super()._compare_and_update(newmodel) - return newmodels + # for newmodel in newmodels: + # super()._compare_and_update(newmodel) + # return newmodels + return [commands.UpdatePserverCpu(m) for m in newmodels] diff --git a/o2ims/service/watcher/pserver_watcher.py b/o2ims/service/watcher/pserver_watcher.py index 4e2f330..c4ac72c 100644 --- a/o2ims/service/watcher/pserver_watcher.py +++ b/o2ims/service/watcher/pserver_watcher.py @@ -13,8 +13,10 @@ # limitations under the License. from o2ims.service.client.base_client import BaseClient -from o2ims.service.unit_of_work import AbstractUnitOfWork +# from o2ims.service.unit_of_work import AbstractUnitOfWork from o2ims.service.watcher.resource_watcher import ResourceWatcher +from o2ims.domain import commands +from o2ims.service.messagebus import MessageBus from o2common.helper import o2logging logger = o2logging.get_logger(__name__) @@ -22,8 +24,8 @@ logger = o2logging.get_logger(__name__) class PServerWatcher(ResourceWatcher): def __init__(self, client: BaseClient, - uow: AbstractUnitOfWork) -> None: - super().__init__(client, uow) + bus: MessageBus) -> None: + super().__init__(client, bus) def _targetname(self): return "pserver" @@ -31,6 +33,7 @@ class PServerWatcher(ResourceWatcher): def _probe(self, parent: object = None): resourcepoolid = parent.id if parent else None newmodels = self._client.list(resourcepoolid=resourcepoolid) - for newmodel in newmodels: - super()._compare_and_update(newmodel) - return newmodels + # for newmodel in newmodels: + # super()._compare_and_update(newmodel) + # return newmodels + return [commands.UpdatePserverCpu(m) for m in newmodels] diff --git a/o2ims/service/watcher/resource_watcher.py b/o2ims/service/watcher/resource_watcher.py index a424dfb..4d2555b 100644 --- a/o2ims/service/watcher/resource_watcher.py +++ b/o2ims/service/watcher/resource_watcher.py @@ -13,8 +13,10 @@ # limitations under the License. from o2ims.service.client.base_client import BaseClient -from o2ims.service.unit_of_work import AbstractUnitOfWork +# from o2ims.service.unit_of_work import AbstractUnitOfWork from o2ims.service.watcher.base import BaseWatcher +from o2ims.domain import commands +from o2ims.service.messagebus import MessageBus from o2common.helper import o2logging logger = o2logging.get_logger(__name__) @@ -22,8 +24,8 @@ logger = o2logging.get_logger(__name__) class ResourceWatcher(BaseWatcher): def __init__(self, client: BaseClient, - uow: AbstractUnitOfWork) -> None: - super().__init__(client, uow) + bus: MessageBus) -> None: + super().__init__(client, bus) def _targetname(self): return "resource" @@ -31,6 +33,7 @@ class ResourceWatcher(BaseWatcher): def _probe(self, parent: object = None): parentid = parent.id if parent else None newmodels = self._client.get(parentid=parentid) - for newmodel in newmodels: - super()._compare_and_update(newmodel) - return newmodels + # for newmodel in newmodels: + # super()._compare_and_update(newmodel) + # return newmodels + return [commands.UpdateResource(m) for m in newmodels] diff --git a/o2ims/service/watcher/resourcepool_watcher.py b/o2ims/service/watcher/resourcepool_watcher.py index 6343292..dc7745a 100644 --- a/o2ims/service/watcher/resourcepool_watcher.py +++ b/o2ims/service/watcher/resourcepool_watcher.py @@ -13,8 +13,10 @@ # limitations under the License. from o2ims.service.client.base_client import BaseClient -from o2ims.service.unit_of_work import AbstractUnitOfWork +# from o2ims.service.unit_of_work import AbstractUnitOfWork from o2ims.service.watcher.base import BaseWatcher +from o2ims.domain import commands +from o2ims.service.messagebus import MessageBus from o2common.helper import o2logging logger = o2logging.get_logger(__name__) @@ -22,8 +24,8 @@ logger = o2logging.get_logger(__name__) class ResourcePoolWatcher(BaseWatcher): def __init__(self, client: BaseClient, - uow: AbstractUnitOfWork) -> None: - super().__init__(client, uow) + bus: MessageBus) -> None: + super().__init__(client, bus) def _targetname(self): return "resourcepool" @@ -31,7 +33,8 @@ class ResourcePoolWatcher(BaseWatcher): def _probe(self, parent: object = None): ocloudid = parent.id if parent else None newmodels = self._client.list(ocloudid=ocloudid) - for newmodel in newmodels: - logger.info("detect ocloudmodel:" + newmodel.name) - super()._compare_and_update(newmodel) - return newmodels + # for newmodel in newmodels: + # logger.info("detect ocloudmodel:" + newmodel.name) + # super()._compare_and_update(newmodel) + # return newmodels + return [commands.UpdateResourcePool(m) for m in newmodels] diff --git a/tests/unit/test_watcher.py b/tests/unit/test_watcher.py index b0aded4..69c3e91 100644 --- a/tests/unit/test_watcher.py +++ b/tests/unit/test_watcher.py @@ -16,6 +16,7 @@ import time from datetime import datetime import json from typing import List +from o2ims.service import handlers from o2ims.domain.resource_type import ResourceTypeEnum from o2ims.service.client.base_client import BaseClient from o2ims.domain import ocloud @@ -28,7 +29,9 @@ from o2ims.domain.stx_repo import StxObjectRepository from o2ims.service.watcher import worker from o2ims.service.unit_of_work import AbstractUnitOfWork from o2ims.service.watcher.ocloud_watcher import OcloudWatcher - +from o2ims.service import messagebus +from o2ims import bootstrap +from o2ims.domain import commands class FakeOcloudClient(BaseClient): def __init__(self): @@ -116,15 +119,37 @@ class FakeUnitOfWork(AbstractUnitOfWork): pass # self.session.rollback() + def collect_new_events(self): + yield + # return super().collect_new_events() + + +def create_fake_bus(uow): + def update_ocloud( + cmd: commands.UpdateOCloud, + uow: AbstractUnitOfWork): + return + + fakeuow = FakeUnitOfWork() + handlers.EVENT_HANDLERS = {} + handlers.COMMAND_HANDLERS = { + commands.UpdateOCloud: update_ocloud, + } + bus = bootstrap.bootstrap(False, fakeuow) + return bus + def test_probe_new_ocloud(): - # fakeRepo = FakeOcloudRepo() fakeuow = FakeUnitOfWork() + bus = create_fake_bus(fakeuow) fakeClient = FakeOcloudClient() - ocloudwatcher = OcloudWatcher(fakeClient, fakeuow) - ocloudwatcher.probe() - assert len(fakeuow.stxobjects.oclouds) == 1 - assert fakeuow.stxobjects.oclouds[0].name == "stx1" + ocloudwatcher = OcloudWatcher(fakeClient, bus) + cmds = ocloudwatcher.probe() + assert cmds is not None + assert len(cmds) == 1 + assert cmds[0].data.name == "stx1" + # assert len(fakeuow.stxobjects.oclouds) == 1 + # assert fakeuow.stxobjects.oclouds[0].name == "stx1" def test_watchers_worker(): @@ -132,11 +157,11 @@ def test_watchers_worker(): class FakeOCloudWatcher(BaseWatcher): def __init__(self, client: BaseClient, - repo: OcloudRepository) -> None: + bus: messagebus) -> None: super().__init__(client, None) self.fakeOcloudWatcherCounter = 0 self._client = client - self._repo = repo + self._bus = bus def _targetname(self): return "fakeocloudwatcher" @@ -152,9 +177,10 @@ def test_watchers_worker(): # fakeRepo = FakeOcloudRepo() fakeuow = FakeUnitOfWork() + bus = create_fake_bus(fakeuow) fakeClient = FakeOcloudClient() - fakewatcher = FakeOCloudWatcher(fakeClient, fakeuow) + fakewatcher = FakeOCloudWatcher(fakeClient, bus) root = WatcherTree(fakewatcher)