Refactor watchers 95/7095/3
authorBin Yang <bin.yang@windriver.com>
Mon, 22 Nov 2021 03:20:10 +0000 (11:20 +0800)
committerBin Yang <bin.yang@windriver.com>
Mon, 22 Nov 2021 07:15:53 +0000 (07:15 +0000)
integrate watcher with message bus and handler

Signed-off-by: Bin Yang <bin.yang@windriver.com>
Change-Id: I93f98a88526c866b142ab5da17e5e30839d300a0

20 files changed:
configs/log.yaml
o2ims/adapter/clients/orm_stx.py
o2ims/adapter/orm.py
o2ims/domain/base.py
o2ims/domain/commands.py
o2ims/domain/ocloud.py
o2ims/domain/resource_type.py
o2ims/domain/stx_object.py
o2ims/entrypoints/resource_watcher.py
o2ims/service/auditor/base.py [deleted file]
o2ims/service/auditor/ocloud_handler.py [new file with mode: 0644]
o2ims/service/handlers.py
o2ims/service/messagebus.py
o2ims/service/watcher/base.py
o2ims/service/watcher/ocloud_watcher.py
o2ims/service/watcher/pserver_cpu_watcher.py
o2ims/service/watcher/pserver_watcher.py
o2ims/service/watcher/resource_watcher.py
o2ims/service/watcher/resourcepool_watcher.py
tests/unit/test_watcher.py

index 5eaf85b..62d1a92 100644 (file)
@@ -26,11 +26,11 @@ loggers:
       propagate: False\r
     o2ims:\r
       handlers: [console_handler, file_handler]\r
-      level: "WARNING"\r
+      level: "DEBUG"\r
       propagate: False\r
     o2dms:\r
       handlers: [console_handler, file_handler]\r
-      level: "WARNING"\r
+      level: "DEBUG"\r
       propagate: False\r
 handlers:\r
     console_handler:\r
index c2d0e6f..46ebdb7 100644 (file)
@@ -27,12 +27,10 @@ from sqlalchemy import (
     Enum\r
 )\r
 \r
-from sqlalchemy.orm import mapper\r
-# from sqlalchemy.sql.sqltypes import Integer\r
-# from sqlalchemy.sql.expression import true\r
+from sqlalchemy.orm import mapper\r
+\r
+# from o2ims.domain import stx_object as ocloudModel\r
 \r
-from o2ims.domain import stx_object as ocloudModel\r
-# from o2ims.adapter.orm import metadata\r
 from o2ims.service.unit_of_work import AbstractUnitOfWork\r
 from o2ims.adapter.unit_of_work import SqlAlchemyUnitOfWork\r
 from o2ims.domain.resource_type import ResourceTypeEnum\r
@@ -56,10 +54,11 @@ stxobject = Table(
 \r
 \r
 def start_o2ims_stx_mappers(uow: AbstractUnitOfWork = SqlAlchemyUnitOfWork()):\r
-    logger.info("Starting O2 IMS Stx mappers")\r
-    mapper(ocloudModel.StxGenericModel, stxobject)\r
+    return\r
+    # logger.info("Starting O2 IMS Stx mappers")\r
+    # mapper(ocloudModel.StxGenericModel, stxobject)\r
 \r
-    with uow:\r
-        engine1 = uow.session.get_bind()\r
-        metadata.create_all(engine1)\r
-        uow.commit()\r
+    with uow:\r
+        engine1 = uow.session.get_bind()\r
+        metadata.create_all(engine1)\r
+        uow.commit()\r
index a9d2cea..61c3157 100644 (file)
@@ -16,15 +16,17 @@ from sqlalchemy import (
     Table,\r
     MetaData,\r
     Column,\r
-    Integer,\r
+    Integer,\r
     String,\r
     # Date,\r
+    DateTime,\r
     ForeignKey,\r
     # engine,\r
     # event,\r
 )\r
 \r
 from sqlalchemy.orm import mapper, relationship\r
+# from sqlalchemy.sql.sqltypes import Integer\r
 \r
 from o2ims.domain import ocloud as ocloudModel\r
 \r
@@ -36,6 +38,11 @@ metadata = MetaData()
 ocloud = Table(\r
     "ocloud",\r
     metadata,\r
+    Column("updatetime", DateTime),\r
+    Column("createtime", DateTime),\r
+    Column("hash", String(255)),\r
+    Column("version_number", Integer),\r
+\r
     Column("oCloudId", String(255), primary_key=True),\r
     Column("globalcloudId", String(255)),\r
     Column("name", String(255)),\r
@@ -47,6 +54,10 @@ ocloud = Table(
 resourcetype = Table(\r
     "resourcetype",\r
     metadata,\r
+    Column("updatetime", DateTime),\r
+    Column("createtime", DateTime),\r
+    Column("hash", String(255)),\r
+\r
     Column("resourceTypeId", String(255), primary_key=True),\r
     Column("oCloudId", ForeignKey("ocloud.oCloudId")),\r
     Column("name", String(255)),\r
@@ -60,6 +71,11 @@ resourcetype = Table(
 resourcepool = Table(\r
     "resourcepool",\r
     metadata,\r
+    Column("updatetime", DateTime),\r
+    Column("createtime", DateTime),\r
+    Column("hash", String(255)),\r
+    Column("version_number", Integer),\r
+\r
     Column("resourcePoolId", String(255), primary_key=True),\r
     Column("oCloudId", ForeignKey("ocloud.oCloudId")),\r
     Column("globalLocationId", String(255)),\r
@@ -73,6 +89,11 @@ resourcepool = Table(
 resource = Table(\r
     "resource",\r
     metadata,\r
+    Column("updatetime", DateTime),\r
+    Column("createtime", DateTime),\r
+    Column("hash", String(255)),\r
+    Column("version_number", Integer),\r
+\r
     Column("resourceId", String(255), primary_key=True),\r
     Column("resourceTypeId", ForeignKey("resourcetype.resourceTypeId")),\r
     Column("resourcePoolId", ForeignKey("resourcepool.resourcePoolId")),\r
@@ -87,6 +108,10 @@ resource = Table(
 deploymentmanager = Table(\r
     "deploymentmanager",\r
     metadata,\r
+    Column("updatetime", DateTime),\r
+    Column("createtime", DateTime),\r
+    Column("hash", String(255)),\r
+\r
     Column("deploymentManagerId", String(255), primary_key=True),\r
     Column("oCloudId", ForeignKey("ocloud.oCloudId")),\r
     Column("name", String(255)),\r
@@ -101,6 +126,11 @@ deploymentmanager = Table(
 subscription = Table(\r
     "subscription",\r
     metadata,\r
+    Column("updatetime", DateTime),\r
+    Column("createtime", DateTime),\r
+    Column("hash", String(255)),\r
+    Column("version_number", Integer),\r
+\r
     Column("subscriptionId", String(255), primary_key=True),\r
     Column("callback", String(255)),\r
     Column("consumerSubscriptionId", String(255)),\r
index f7eceb7..8a673da 100644 (file)
 #  See the License for the specific language governing permissions and\r
 #  limitations under the License.\r
 \r
+from datetime import datetime\r
 from typing import List\r
 from .events import Event\r
 \r
 \r
 class AgRoot:\r
     def __init__(self) -> None:\r
+        self.hash = ""\r
+        # self.id = ""\r
+        self.updatetime = datetime.now()\r
+        self.createtime = datetime.now()\r
         self.events = []  # type: List[Event]\r
index 869bf52..60e9db8 100644 (file)
@@ -16,6 +16,9 @@
 # from datetime import date
 # from typing import Optional
 from dataclasses import dataclass
+# from datetime import datetime
+# from o2ims.domain.resource_type import ResourceTypeEnum
+from o2ims.domain.stx_object import StxGenericModel
 
 
 class Command:
@@ -23,5 +26,35 @@ class Command:
 
 
 @dataclass
-class UpdateDms(Command):
-    ref: str
+class UpdateStxObject(Command):
+    data: StxGenericModel
+
+
+@dataclass
+class UpdateOCloud(UpdateStxObject):
+    pass
+
+
+@dataclass
+class UpdateDms(UpdateStxObject):
+    pass
+
+
+@dataclass
+class UpdateResourcePool(UpdateStxObject):
+    pass
+
+
+@dataclass
+class UpdateResource(UpdateStxObject):
+    pass
+
+
+@dataclass
+class UpdatePserverCpu(UpdateStxObject):
+    pass
+
+
+@dataclass
+class UpdatePserver(UpdateStxObject):
+    pass
index 49b2550..847ce4c 100644 (file)
@@ -26,6 +26,7 @@ class Subscription(AgRoot):
     def __init__(self, id: str, callback: str, consumersubid: str = '',\r
                  filter: str = '') -> None:\r
         self.subscriptionId = id\r
+        self.version_number = 0\r
         self.callback = callback\r
         self.consumerSubscriptionId = consumersubid\r
         self.filter = filter\r
@@ -37,6 +38,7 @@ class DeploymentManager:
                  supportedLocations: str = '', capabilities: str = '',\r
                  capacity: str = '') -> None:\r
         self.deploymentManagerId = id\r
+        self.version_number = 0\r
         self.oCloudId = ocloudid\r
         self.name = name\r
         self.description = description\r
@@ -52,6 +54,7 @@ class ResourcePool(AgRoot):
                  ocloudid: str, gLocationId: str = '',\r
                  description: str = '') -> None:\r
         self.resourcePoolId = id\r
+        self.version_number = 0\r
         self.oCloudId = ocloudid\r
         self.globalLocationId = gLocationId\r
         self.name = name\r
@@ -82,6 +85,7 @@ class Resource(AgRoot):
                  parentId: str = '', elements: list = [],\r
                  description: str = '') -> None:\r
         self.resourceId = resourceId\r
+        self.version_number = 0\r
         self.oCloudId = oCloudId\r
         self.resourceTypeId = resourceTypeId\r
         self.resourcePoolId = resourcePoolId\r
index d95eb18..79b36fd 100644 (file)
@@ -8,3 +8,11 @@ class ResourceTypeEnum(Enum):
     PSERVER = 11\r
     PSERVER_CPU = 12\r
     PSERVER_RAM = 13\r
+\r
+\r
+class InvalidOcloudState(Exception):\r
+    pass\r
+\r
+\r
+class MismatchedModel(Exception):\r
+    pass\r
index 90c166f..773bd3a 100644 (file)
@@ -17,15 +17,11 @@ import datetime
 import json\r
 from o2ims.domain.base import AgRoot\r
 \r
-from o2ims.domain.resource_type import ResourceTypeEnum\r
+from o2ims.domain.resource_type import ResourceTypeEnum, MismatchedModel\r
 from o2common.helper import o2logging\r
 logger = o2logging.get_logger(__name__)\r
 \r
 \r
-class MismatchedModel(Exception):\r
-    pass\r
-\r
-\r
 class StxGenericModel(AgRoot):\r
     def __init__(self, type: ResourceTypeEnum,\r
                  api_response: dict = None, content_hash=None) -> None:\r
index acde2d2..9e17485 100644 (file)
@@ -50,17 +50,17 @@ class WatcherService(cotyledon.Service):
     def run(self):\r
         try:\r
             root = WatcherTree(OcloudWatcher(\r
-                StxSaOcloudClient(), self.bus.uow))\r
+                StxSaOcloudClient(), self.bus))\r
             root.addchild(\r
-                DmsWatcher(StxSaDmsClient(), self.bus.uow))\r
+                DmsWatcher(StxSaDmsClient(), self.bus))\r
 \r
             child_respool = root.addchild(\r
                 ResourcePoolWatcher(StxSaResourcePoolClient(),\r
-                                    self.bus.uow))\r
+                                    self.bus))\r
             child_pserver = child_respool.addchild(\r
-                PServerWatcher(StxPserverClient(), self.bus.uow))\r
+                PServerWatcher(StxPserverClient(), self.bus))\r
             child_pserver.addchild(\r
-                PServerCpuWatcher(StxCpuClient(), self.bus.uow))\r
+                PServerCpuWatcher(StxCpuClient(), self.bus))\r
 \r
             self.worker.add_watcher(root)\r
 \r
diff --git a/o2ims/service/auditor/base.py b/o2ims/service/auditor/base.py
deleted file mode 100644 (file)
index b514342..0000000
+++ /dev/null
@@ -1,13 +0,0 @@
-# Copyright (C) 2021 Wind River Systems, Inc.\r
-#\r
-#  Licensed under the Apache License, Version 2.0 (the "License");\r
-#  you may not use this file except in compliance with the License.\r
-#  You may obtain a copy of the License at\r
-#\r
-#      http://www.apache.org/licenses/LICENSE-2.0\r
-#\r
-#  Unless required by applicable law or agreed to in writing, software\r
-#  distributed under the License is distributed on an "AS IS" BASIS,\r
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
-#  See the License for the specific language governing permissions and\r
-#  limitations under the License.\r
diff --git a/o2ims/service/auditor/ocloud_handler.py b/o2ims/service/auditor/ocloud_handler.py
new file mode 100644 (file)
index 0000000..9dd4f0e
--- /dev/null
@@ -0,0 +1,101 @@
+# Copyright (C) 2021 Wind River Systems, Inc.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+# pylint: disable=unused-argument
+from __future__ import annotations
+
+from o2ims.domain.stx_object import StxGenericModel
+# from dataclasses import asdict
+# from typing import List, Dict, Callable, Type
+# TYPE_CHECKING
+from o2ims.domain import commands
+from o2ims.service.unit_of_work import AbstractUnitOfWork
+from o2ims.domain.resource_type import InvalidOcloudState
+from o2ims.domain.resource_type import MismatchedModel
+from o2ims.domain.ocloud import Ocloud
+from o2ims import config
+# if TYPE_CHECKING:
+#     from . import unit_of_work
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+class InvalidResourceType(Exception):
+    pass
+
+
+def update_ocloud(
+    cmd: commands.UpdateOCloud,
+    uow: AbstractUnitOfWork
+):
+    stxobj = cmd.data
+    with uow:
+        oclouds = uow.oclouds.list()
+        if oclouds and len(oclouds) > 1:
+            raise InvalidOcloudState("More than 1 ocloud is found")
+        elif not oclouds or len(oclouds) == 0:
+            logger.info("add ocloud:" + stxobj.name
+                        + " update_at: " + str(stxobj.updatetime)
+                        + " id: " + str(stxobj.id)
+                        + " hash: " + str(stxobj.hash))
+            entry = create_by(stxobj)
+            uow.oclouds.add(entry)
+
+            logger.info("Add the ocloud: " + stxobj.id
+                        + ", name: " + stxobj.name)
+        else:
+            localmodel = oclouds.pop()
+            if is_outdated(localmodel, stxobj):
+                logger.info("update ocloud:" + stxobj.name
+                            + " update_at: " + str(stxobj.updatetime)
+                            + " id: " + str(stxobj.id)
+                            + " hash: " + str(stxobj.hash))
+                update_by(localmodel, stxobj)
+                uow.oclouds.update(localmodel)
+
+            logger.info("Update the ocloud: " + stxobj.id
+                        + ", name: " + stxobj.name)
+        uow.commit()
+
+
+def is_outdated(ocloud: Ocloud, stxobj: StxGenericModel):
+    # if stxobj.updatetime:
+    #     return True if Ocloud.updatetime < stxobj.updatetime else False
+    # else:
+    return True if ocloud.hash != stxobj.hash else False
+
+
+def create_by(stxobj: StxGenericModel) -> Ocloud:
+    imsendpoint = config.get_api_url() + config.get_o2ims_api_base()
+    globalcloudId = stxobj.id  # to be updated
+    description = "An ocloud"
+    ocloud = Ocloud(stxobj.id, stxobj.name, imsendpoint,
+                    globalcloudId, description, 1)
+    ocloud.createtime = stxobj.createtime
+    ocloud.updatetime = stxobj.updatetime
+    ocloud.hash = stxobj.hash
+
+    return ocloud
+
+
+def update_by(ocloud: Ocloud, stxobj: StxGenericModel) -> None:
+    if ocloud.oCloudId != stxobj.id:
+        raise MismatchedModel("More than 1 ocloud found")
+    ocloud.name = stxobj.name
+    ocloud.createtime = stxobj.createtime
+    ocloud.updatetime = stxobj.updatetime
+    # ocloud.content = stxobj.content
+    ocloud.hash = stxobj.hash
+    ocloud.version_number = ocloud.version_number + 1
index c80ea6f..505549c 100644 (file)
@@ -18,7 +18,7 @@ from __future__ import annotations
 from typing import List, Dict, Callable, Type
 # TYPE_CHECKING
 from o2ims.domain import commands, events
-# ocloud
+from o2ims.service.auditor import ocloud_handler
 
 # if TYPE_CHECKING:
 #     from . import unit_of_work
@@ -33,4 +33,5 @@ EVENT_HANDLERS = {
 
 
 COMMAND_HANDLERS = {
+    commands.UpdateOCloud: ocloud_handler.update_ocloud,
 }  # type: Dict[Type[commands.Command], Callable]
index c1970f8..297aa8f 100644 (file)
@@ -41,7 +41,9 @@ class MessageBus:
         self.queue = [message]
         while self.queue:
             message = self.queue.pop(0)
-            if isinstance(message, events.Event):
+            if not message:
+                continue
+            elif isinstance(message, events.Event):
                 self.handle_event(message)
             elif isinstance(message, commands.Command):
                 self.handle_command(message)
@@ -65,6 +67,6 @@ class MessageBus:
             handler = self.command_handlers[type(command)]
             handler(command)
             self.queue.extend(self.uow.collect_new_events())
-        except Exception:
+        except Exception as ex:
             logger.exception("Exception handling command %s", command)
-            raise
+            raise ex
index 6700eb3..0daf8d4 100644 (file)
 #  See the License for the specific language governing permissions and\r
 #  limitations under the License.\r
 \r
+# from logging import exception\r
+# from cgtsclient import exc\r
 from o2ims.service.client.base_client import BaseClient\r
-from o2ims.domain.stx_object import StxGenericModel\r
-from o2ims.service.unit_of_work import AbstractUnitOfWork\r
-\r
+# from o2ims.domain.stx_object import StxGenericModel\r
+# from o2ims.service.unit_of_work import AbstractUnitOfWork\r
+from o2ims.domain import commands\r
+from o2ims.service.messagebus import MessageBus\r
 from o2common.helper import o2logging\r
 logger = o2logging.get_logger(__name__)\r
 \r
 \r
 class BaseWatcher(object):\r
     def __init__(self, client: BaseClient,\r
-                 uow: AbstractUnitOfWork) -> None:\r
+                 bus: MessageBus) -> None:\r
         super().__init__()\r
         self._client = client\r
-        self._uow = uow\r
+        self._bus = bus\r
+        # self._uow = bus.uow\r
 \r
     def targetname(self) -> str:\r
         return self._targetname()\r
 \r
-    def probe(self, parent: object = None):\r
-        return self._probe(parent)\r
+    def probe(self, parent: commands.UpdateStxObject = None):\r
+        try:\r
+            cmds = self._probe(parent.data if parent else None)\r
+            for cmd in cmds:\r
+                self._bus.handle(cmd)\r
+\r
+            # return self._probe(parent)\r
+            return cmds\r
+        except Exception as ex:\r
+            logger.warning("Failed to probe resource due to: " + str(ex))\r
+            return []\r
 \r
-    def _probe(self, parent: object = None):\r
+    def _probe(self, parent: object = None) -> commands.UpdateStxObject:\r
         raise NotImplementedError\r
 \r
     def _targetname(self):\r
         raise NotImplementedError\r
 \r
-    def _compare_and_update(self, newmodel: StxGenericModel) -> bool:\r
-        with self._uow:\r
-            # localmodel = self._uow.stxobjects.get(ocloudmodel.id)\r
-            localmodel = self._uow.stxobjects.get(str(newmodel.id))\r
-            if not localmodel:\r
-                logger.info("add entry:" + newmodel.name)\r
-                self._uow.stxobjects.add(newmodel)\r
-            elif localmodel.is_outdated(newmodel):\r
-                logger.info("update entry:" + newmodel.name)\r
-                localmodel.update_by(newmodel)\r
-                self._uow.stxobjects.update(localmodel)\r
-            self._uow.commit()\r
+    def _compare_and_update(self, newmodel: StxGenericModel) -> bool:\r
+        with self._uow:\r
+            # localmodel = self._uow.stxobjects.get(ocloudmodel.id)\r
+            localmodel = self._uow.stxobjects.get(str(newmodel.id))\r
+            if not localmodel:\r
+                logger.info("add entry:" + newmodel.name)\r
+                self._uow.stxobjects.add(newmodel)\r
+            elif localmodel.is_outdated(newmodel):\r
+                logger.info("update entry:" + newmodel.name)\r
+                localmodel.update_by(newmodel)\r
+                self._uow.stxobjects.update(localmodel)\r
+            self._uow.commit()\r
 \r
 \r
 # node to organize watchers in tree hierachy\r
@@ -74,7 +87,7 @@ class WatcherTree(object):
                      + self.watcher.targetname())\r
         childdepth = depth - 1 if depth > 0 else 0\r
         resources = self.watcher.probe(parentresource)\r
-        logger.debug("probe returns " + str(len(resources)) + "resources")\r
+        logger.debug("probe returns " + str(len(resources)) + " resources")\r
 \r
         if depth == 1:\r
             # stop recursive\r
index 6a73f48..bf4ce43 100644 (file)
 #  See the License for the specific language governing permissions and\r
 #  limitations under the License.\r
 \r
-from o2ims.domain.resource_type import ResourceTypeEnum\r
+from o2ims.domain.resource_type import ResourceTypeEnum\r
 from o2ims.service.client.base_client import BaseClient\r
-from o2ims.domain.stx_object import StxGenericModel\r
-from o2ims.service.unit_of_work import AbstractUnitOfWork\r
+from o2ims.domain.stx_object import StxGenericModel\r
+from o2ims.service.unit_of_work import AbstractUnitOfWork\r
 from o2ims.service.watcher.base import BaseWatcher\r
+from o2ims.domain import commands\r
+from o2ims.service.messagebus import MessageBus\r
 \r
 from o2common.helper import o2logging\r
 logger = o2logging.get_logger(__name__)\r
 \r
 \r
-class InvalidOcloudState(Exception):\r
-    pass\r
-\r
-\r
 class OcloudWatcher(BaseWatcher):\r
     def __init__(self, ocloud_client: BaseClient,\r
-                 uow: AbstractUnitOfWork) -> None:\r
-        super().__init__(ocloud_client, uow)\r
+                 bus: MessageBus) -> None:\r
+        super().__init__(ocloud_client, bus)\r
 \r
     def _targetname(self):\r
         return "ocloud"\r
 \r
     def _probe(self, parent: object = None):\r
-        ocloudmodel = self._client.get(None)\r
-        if ocloudmodel:\r
-            self._compare_and_update(ocloudmodel)\r
-        return [ocloudmodel]\r
+        newmodel = self._client.get(None)\r
+        if newmodel:\r
+            logger.debug("found ocloud: " + newmodel.name)\r
+        else:\r
+            logger.warning("Failed to find out any ocloud")\r
+        #     self._compare_and_update(ocloudmodel)\r
+        return [commands.UpdateOCloud(newmodel)] if newmodel else []\r
 \r
-    def _compare_and_update(self, ocloudmodel: StxGenericModel) -> bool:\r
-        with self._uow:\r
-            # localmodel = self._uow.stxobjects.get(str(ocloudmodel.id))\r
-            oclouds = self._uow.stxobjects.list(ResourceTypeEnum.OCLOUD)\r
-            if len(oclouds) > 1:\r
-                raise InvalidOcloudState("More than 1 ocloud is found")\r
-            if len(oclouds) == 0:\r
-                logger.info("add ocloud:" + ocloudmodel.name\r
-                            + " update_at: " + str(ocloudmodel.updatetime)\r
-                            + " id: " + str(ocloudmodel.id)\r
-                            + " hash: " + str(ocloudmodel.hash))\r
-                self._uow.stxobjects.add(ocloudmodel)\r
-            else:\r
-                localmodel = oclouds.pop()\r
-                if localmodel.is_outdated(ocloudmodel):\r
-                    logger.info("update ocloud:" + ocloudmodel.name\r
-                                + " update_at: " + str(ocloudmodel.updatetime)\r
-                                + " id: " + str(ocloudmodel.id)\r
-                                + " hash: " + str(ocloudmodel.hash))\r
-                    localmodel.update_by(ocloudmodel)\r
-                    self._uow.stxobjects.update(localmodel)\r
-            self._uow.commit()\r
+# def _compare_and_update(self, ocloudmodel: StxGenericModel) -> bool:\r
+#     with self._uow:\r
+#         # localmodel = self._uow.stxobjects.get(str(ocloudmodel.id))\r
+#         oclouds = self._uow.stxobjects.list(ResourceTypeEnum.OCLOUD)\r
+#         if len(oclouds) > 1:\r
+#             raise InvalidOcloudState("More than 1 ocloud is found")\r
+#         if len(oclouds) == 0:\r
+#             logger.info("add ocloud:" + ocloudmodel.name\r
+#                         + " update_at: " + str(ocloudmodel.updatetime)\r
+#                         + " id: " + str(ocloudmodel.id)\r
+#                         + " hash: " + str(ocloudmodel.hash))\r
+#             self._uow.stxobjects.add(ocloudmodel)\r
+#         else:\r
+#             localmodel = oclouds.pop()\r
+#             if localmodel.is_outdated(ocloudmodel):\r
+#                 logger.info("update ocloud:" + ocloudmodel.name\r
+#                             + " update_at: " + str(ocloudmodel.updatetime)\r
+#                             + " id: " + str(ocloudmodel.id)\r
+#                             + " hash: " + str(ocloudmodel.hash))\r
+#                 localmodel.update_by(ocloudmodel)\r
+#                 self._uow.stxobjects.update(localmodel)\r
+#         self._uow.commit()\r
 \r
 \r
 class DmsWatcher(BaseWatcher):\r
     def __init__(self, client: BaseClient,\r
-                 uow: AbstractUnitOfWork) -> None:\r
-        super().__init__(client, uow)\r
+                 bus: MessageBus) -> None:\r
+        super().__init__(client, bus)\r
 \r
     def _targetname(self):\r
         return "dms"\r
@@ -75,6 +76,7 @@ class DmsWatcher(BaseWatcher):
     def _probe(self, parent: object = None):\r
         ocloudid = parent.id if parent else None\r
         newmodels = self._client.list(ocloudid=ocloudid)\r
-        for newmodel in newmodels:\r
-            super()._compare_and_update(newmodel)\r
-        return newmodels\r
+        # for newmodel in newmodels:\r
+        #     super()._compare_and_update(newmodel)\r
+        # return newmodels\r
+        return [commands.UpdateDms(m) for m in newmodels]\r
index 3add20b..b18ead6 100644 (file)
 #  limitations under the License.\r
 \r
 from o2ims.service.client.base_client import BaseClient\r
-from o2ims.service.unit_of_work import AbstractUnitOfWork\r
+from o2ims.service.unit_of_work import AbstractUnitOfWork\r
 from o2ims.service.watcher.resource_watcher import ResourceWatcher\r
+from o2ims.domain import commands\r
+from o2ims.service.messagebus import MessageBus\r
 \r
 from o2common.helper import o2logging\r
 logger = o2logging.get_logger(__name__)\r
@@ -22,8 +24,8 @@ logger = o2logging.get_logger(__name__)
 \r
 class PServerCpuWatcher(ResourceWatcher):\r
     def __init__(self, client: BaseClient,\r
-                 uow: AbstractUnitOfWork) -> None:\r
-        super().__init__(client, uow)\r
+                 bus: MessageBus) -> None:\r
+        super().__init__(client, bus)\r
 \r
     def _targetname(self):\r
         return "pserver_cpu"\r
@@ -31,6 +33,7 @@ class PServerCpuWatcher(ResourceWatcher):
     def _probe(self, parent: object = None):\r
         hostid = parent.id if parent else None\r
         newmodels = self._client.list(hostid=hostid)\r
-        for newmodel in newmodels:\r
-            super()._compare_and_update(newmodel)\r
-        return newmodels\r
+        # for newmodel in newmodels:\r
+        #     super()._compare_and_update(newmodel)\r
+        # return newmodels\r
+        return [commands.UpdatePserverCpu(m) for m in newmodels]\r
index 4e2f330..c4ac72c 100644 (file)
 #  limitations under the License.\r
 \r
 from o2ims.service.client.base_client import BaseClient\r
-from o2ims.service.unit_of_work import AbstractUnitOfWork\r
+from o2ims.service.unit_of_work import AbstractUnitOfWork\r
 from o2ims.service.watcher.resource_watcher import ResourceWatcher\r
+from o2ims.domain import commands\r
+from o2ims.service.messagebus import MessageBus\r
 \r
 from o2common.helper import o2logging\r
 logger = o2logging.get_logger(__name__)\r
@@ -22,8 +24,8 @@ logger = o2logging.get_logger(__name__)
 \r
 class PServerWatcher(ResourceWatcher):\r
     def __init__(self, client: BaseClient,\r
-                 uow: AbstractUnitOfWork) -> None:\r
-        super().__init__(client, uow)\r
+                 bus: MessageBus) -> None:\r
+        super().__init__(client, bus)\r
 \r
     def _targetname(self):\r
         return "pserver"\r
@@ -31,6 +33,7 @@ class PServerWatcher(ResourceWatcher):
     def _probe(self, parent: object = None):\r
         resourcepoolid = parent.id if parent else None\r
         newmodels = self._client.list(resourcepoolid=resourcepoolid)\r
-        for newmodel in newmodels:\r
-            super()._compare_and_update(newmodel)\r
-        return newmodels\r
+        # for newmodel in newmodels:\r
+        #     super()._compare_and_update(newmodel)\r
+        # return newmodels\r
+        return [commands.UpdatePserverCpu(m) for m in newmodels]\r
index a424dfb..4d2555b 100644 (file)
 #  limitations under the License.\r
 \r
 from o2ims.service.client.base_client import BaseClient\r
-from o2ims.service.unit_of_work import AbstractUnitOfWork\r
+from o2ims.service.unit_of_work import AbstractUnitOfWork\r
 from o2ims.service.watcher.base import BaseWatcher\r
+from o2ims.domain import commands\r
+from o2ims.service.messagebus import MessageBus\r
 \r
 from o2common.helper import o2logging\r
 logger = o2logging.get_logger(__name__)\r
@@ -22,8 +24,8 @@ logger = o2logging.get_logger(__name__)
 \r
 class ResourceWatcher(BaseWatcher):\r
     def __init__(self, client: BaseClient,\r
-                 uow: AbstractUnitOfWork) -> None:\r
-        super().__init__(client, uow)\r
+                 bus: MessageBus) -> None:\r
+        super().__init__(client, bus)\r
 \r
     def _targetname(self):\r
         return "resource"\r
@@ -31,6 +33,7 @@ class ResourceWatcher(BaseWatcher):
     def _probe(self, parent: object = None):\r
         parentid = parent.id if parent else None\r
         newmodels = self._client.get(parentid=parentid)\r
-        for newmodel in newmodels:\r
-            super()._compare_and_update(newmodel)\r
-        return newmodels\r
+        # for newmodel in newmodels:\r
+        #     super()._compare_and_update(newmodel)\r
+        # return newmodels\r
+        return [commands.UpdateResource(m) for m in newmodels]\r
index 6343292..dc7745a 100644 (file)
 #  limitations under the License.\r
 \r
 from o2ims.service.client.base_client import BaseClient\r
-from o2ims.service.unit_of_work import AbstractUnitOfWork\r
+from o2ims.service.unit_of_work import AbstractUnitOfWork\r
 from o2ims.service.watcher.base import BaseWatcher\r
+from o2ims.domain import commands\r
+from o2ims.service.messagebus import MessageBus\r
 \r
 from o2common.helper import o2logging\r
 logger = o2logging.get_logger(__name__)\r
@@ -22,8 +24,8 @@ logger = o2logging.get_logger(__name__)
 \r
 class ResourcePoolWatcher(BaseWatcher):\r
     def __init__(self, client: BaseClient,\r
-                 uow: AbstractUnitOfWork) -> None:\r
-        super().__init__(client, uow)\r
+                 bus: MessageBus) -> None:\r
+        super().__init__(client, bus)\r
 \r
     def _targetname(self):\r
         return "resourcepool"\r
@@ -31,7 +33,8 @@ class ResourcePoolWatcher(BaseWatcher):
     def _probe(self, parent: object = None):\r
         ocloudid = parent.id if parent else None\r
         newmodels = self._client.list(ocloudid=ocloudid)\r
-        for newmodel in newmodels:\r
-            logger.info("detect ocloudmodel:" + newmodel.name)\r
-            super()._compare_and_update(newmodel)\r
-        return newmodels\r
+        # for newmodel in newmodels:\r
+        #     logger.info("detect ocloudmodel:" + newmodel.name)\r
+        #     super()._compare_and_update(newmodel)\r
+        # return newmodels\r
+        return [commands.UpdateResourcePool(m) for m in newmodels]\r
index b0aded4..69c3e91 100644 (file)
@@ -16,6 +16,7 @@ import time
 from datetime import datetime\r
 import json\r
 from typing import List\r
+from o2ims.service import handlers\r
 from o2ims.domain.resource_type import ResourceTypeEnum\r
 from o2ims.service.client.base_client import BaseClient\r
 from o2ims.domain import ocloud\r
@@ -28,7 +29,9 @@ from o2ims.domain.stx_repo import StxObjectRepository
 from o2ims.service.watcher import worker\r
 from o2ims.service.unit_of_work import AbstractUnitOfWork\r
 from o2ims.service.watcher.ocloud_watcher import OcloudWatcher\r
-\r
+from o2ims.service import messagebus\r
+from o2ims import bootstrap\r
+from o2ims.domain import commands\r
 \r
 class FakeOcloudClient(BaseClient):\r
     def __init__(self):\r
@@ -116,15 +119,37 @@ class FakeUnitOfWork(AbstractUnitOfWork):
         pass\r
         # self.session.rollback()\r
 \r
+    def collect_new_events(self):\r
+        yield\r
+        # return super().collect_new_events()\r
+\r
+\r
+def create_fake_bus(uow):\r
+    def update_ocloud(\r
+        cmd: commands.UpdateOCloud,\r
+        uow: AbstractUnitOfWork):\r
+        return\r
+\r
+    fakeuow = FakeUnitOfWork()\r
+    handlers.EVENT_HANDLERS = {}\r
+    handlers.COMMAND_HANDLERS = {\r
+        commands.UpdateOCloud: update_ocloud,\r
+    }\r
+    bus = bootstrap.bootstrap(False, fakeuow)\r
+    return bus\r
+\r
 \r
 def test_probe_new_ocloud():\r
-    # fakeRepo = FakeOcloudRepo()\r
     fakeuow = FakeUnitOfWork()\r
+    bus = create_fake_bus(fakeuow)\r
     fakeClient = FakeOcloudClient()\r
-    ocloudwatcher = OcloudWatcher(fakeClient, fakeuow)\r
-    ocloudwatcher.probe()\r
-    assert len(fakeuow.stxobjects.oclouds) == 1\r
-    assert fakeuow.stxobjects.oclouds[0].name == "stx1"\r
+    ocloudwatcher = OcloudWatcher(fakeClient, bus)\r
+    cmds = ocloudwatcher.probe()\r
+    assert cmds is not None\r
+    assert len(cmds) == 1\r
+    assert cmds[0].data.name == "stx1"\r
+    # assert len(fakeuow.stxobjects.oclouds) == 1\r
+    # assert fakeuow.stxobjects.oclouds[0].name == "stx1"\r
 \r
 \r
 def test_watchers_worker():\r
@@ -132,11 +157,11 @@ def test_watchers_worker():
 \r
     class FakeOCloudWatcher(BaseWatcher):\r
         def __init__(self, client: BaseClient,\r
-                     repo: OcloudRepository) -> None:\r
+                     bus: messagebus) -> None:\r
             super().__init__(client, None)\r
             self.fakeOcloudWatcherCounter = 0\r
             self._client = client\r
-            self._repo = repo\r
+            self._bus = bus\r
 \r
         def _targetname(self):\r
             return "fakeocloudwatcher"\r
@@ -152,9 +177,10 @@ def test_watchers_worker():
 \r
     # fakeRepo = FakeOcloudRepo()\r
     fakeuow = FakeUnitOfWork()\r
+    bus = create_fake_bus(fakeuow)\r
 \r
     fakeClient = FakeOcloudClient()\r
-    fakewatcher = FakeOCloudWatcher(fakeClient, fakeuow)\r
+    fakewatcher = FakeOCloudWatcher(fakeClient, bus)\r
 \r
     root = WatcherTree(fakewatcher)\r
 \r