Add dms handler 96/7096/1
authorBin Yang <bin.yang@windriver.com>
Mon, 22 Nov 2021 11:29:48 +0000 (19:29 +0800)
committerBin Yang <bin.yang@windriver.com>
Mon, 22 Nov 2021 12:17:24 +0000 (20:17 +0800)
Issue: orm cannot restore non persistent properties of object
hence result object restored from orm comes without events properties
which is essential for domain event handling

Signed-off-by: Bin Yang <bin.yang@windriver.com>
Change-Id: I60e455ec0fe6072242de14d4d2e169178269c80f

15 files changed:
o2ims/config.py
o2ims/domain/commands.py
o2ims/domain/ocloud.py
o2ims/domain/stx_object.py
o2ims/entrypoints/redis_eventconsumer.py
o2ims/service/auditor/dms_handler.py [new file with mode: 0644]
o2ims/service/handlers.py
o2ims/service/unit_of_work.py
o2ims/service/watcher/ocloud_watcher.py
o2ims/service/watcher/pserver_cpu_watcher.py
o2ims/service/watcher/pserver_watcher.py
o2ims/service/watcher/resource_watcher.py
o2ims/service/watcher/resourcepool_watcher.py
tests/integration/test_ocloud_repository.py
tests/unit/test_ocloud.py

index 7db2d0a..7283383 100644 (file)
@@ -37,6 +37,10 @@ def get_o2ims_api_base():
     return '/o2ims_infrastructureInventory/v1'
 
 
+def get_o2dms_api_base(dmsid: str):
+    return "/" + dmsid + '/o2dms/v1'
+
+
 def get_redis_host_and_port():
     host = os.environ.get("REDIS_HOST", "localhost")
     port = 63791 if host == "localhost" else 6379
index 60e9db8..8383f47 100644 (file)
@@ -37,24 +37,24 @@ class UpdateOCloud(UpdateStxObject):
 
 @dataclass
 class UpdateDms(UpdateStxObject):
-    pass
+    parentid: str
 
 
 @dataclass
 class UpdateResourcePool(UpdateStxObject):
-    pass
+    parentid: str
 
 
 @dataclass
 class UpdateResource(UpdateStxObject):
-    pass
+    parentid: str
 
 
 @dataclass
-class UpdatePserverCpu(UpdateStxObject):
+class UpdatePserverCpu(UpdateResource):
     pass
 
 
 @dataclass
-class UpdatePserver(UpdateStxObject):
+class UpdatePserver(UpdateResource):
     pass
index 847ce4c..513907a 100644 (file)
@@ -25,6 +25,7 @@ from .resource_type import ResourceTypeEnum
 class Subscription(AgRoot):\r
     def __init__(self, id: str, callback: str, consumersubid: str = '',\r
                  filter: str = '') -> None:\r
+        super().__init__()\r
         self.subscriptionId = id\r
         self.version_number = 0\r
         self.callback = callback\r
@@ -32,11 +33,12 @@ class Subscription(AgRoot):
         self.filter = filter\r
 \r
 \r
-class DeploymentManager:\r
+class DeploymentManager(AgRoot):\r
     def __init__(self, id: str, name: str, ocloudid: str,\r
                  dmsendpoint: str, description: str = '',\r
                  supportedLocations: str = '', capabilities: str = '',\r
                  capacity: str = '') -> None:\r
+        super().__init__()\r
         self.deploymentManagerId = id\r
         self.version_number = 0\r
         self.oCloudId = ocloudid\r
@@ -53,6 +55,7 @@ class ResourcePool(AgRoot):
     def __init__(self, id: str, name: str, location: str,\r
                  ocloudid: str, gLocationId: str = '',\r
                  description: str = '') -> None:\r
+        super().__init__()\r
         self.resourcePoolId = id\r
         self.version_number = 0\r
         self.oCloudId = ocloudid\r
@@ -68,6 +71,7 @@ class ResourceType(AgRoot):
                  ocloudid: str, vender: str = '', model: str = '',\r
                  version: str = '',\r
                  description: str = '') -> None:\r
+        super().__init__()\r
         self.resourceTypeId = typeid\r
         self.oCloudId = ocloudid\r
         self.resourceTypeEnum = typeEnum.value\r
@@ -84,6 +88,7 @@ class Resource(AgRoot):
                  resourcePoolId: str, oCloudId: str = '',\r
                  parentId: str = '', elements: list = [],\r
                  description: str = '') -> None:\r
+        super().__init__()\r
         self.resourceId = resourceId\r
         self.version_number = 0\r
         self.oCloudId = oCloudId\r
@@ -112,14 +117,14 @@ class Ocloud(AgRoot):
         self.extensions = []\r
         # self.events = []\r
 \r
-    def addDeploymentManager(self,\r
-                             deploymentManager: DeploymentManager):\r
-\r
-        deploymentManager.oCloudId = self.oCloudId\r
-        old = filter(\r
-            lambda x: x.deploymentManagerId ==\r
-            deploymentManager.deploymentManagerId,\r
-            self.deploymentManagers)\r
-        for o in old or []:\r
-            self.deploymentManagers.remove(o)\r
-        self.deploymentManagers.append(deploymentManager)\r
+    def addDeploymentManager(self,\r
+                             deploymentManager: DeploymentManager):\r
+\r
+        deploymentManager.oCloudId = self.oCloudId\r
+        old = filter(\r
+            lambda x: x.deploymentManagerId ==\r
+            deploymentManager.deploymentManagerId,\r
+            self.deploymentManagers)\r
+        for o in old or []:\r
+            self.deploymentManagers.remove(o)\r
+        self.deploymentManagers.append(deploymentManager)\r
index 773bd3a..9629652 100644 (file)
@@ -26,7 +26,7 @@ class StxGenericModel(AgRoot):
     def __init__(self, type: ResourceTypeEnum,\r
                  api_response: dict = None, content_hash=None) -> None:\r
         if api_response:\r
-            self.id = api_response.uuid\r
+            self.id = str(api_response.uuid)\r
             self.type = type\r
             self.updatetime = datetime.datetime.strptime(\r
                 api_response.updated_at.split('.')[0], "%Y-%m-%dT%H:%M:%S") \\r
index 7ca87d2..15312bf 100644 (file)
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
-import json
+import json
 import redis
 
 from o2ims import bootstrap, config
-from o2ims.domain import commands
+from o2ims.domain import commands
 
 from o2common.helper import o2logging
 logger = o2logging.get_logger(__name__)
@@ -36,9 +36,9 @@ def main():
 
 def handle_dms_changed(m, bus):
     logger.info("handling %s", m)
-    data = json.loads(m["data"])
-    cmd = commands.UpdateDms(ref=data["dmsid"])
-    bus.handle(cmd)
+    data = json.loads(m["data"])
+    cmd = commands.UpdateDms(ref=data["dmsid"])
+    bus.handle(cmd)
 
 
 if __name__ == "__main__":
diff --git a/o2ims/service/auditor/dms_handler.py b/o2ims/service/auditor/dms_handler.py
new file mode 100644 (file)
index 0000000..b7d8e83
--- /dev/null
@@ -0,0 +1,105 @@
+# Copyright (C) 2021 Wind River Systems, Inc.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+# pylint: disable=unused-argument
+from __future__ import annotations
+
+from o2ims.domain.stx_object import StxGenericModel
+# from dataclasses import asdict
+# from typing import List, Dict, Callable, Type
+# TYPE_CHECKING
+from o2ims.domain import commands
+from o2ims.service.unit_of_work import AbstractUnitOfWork
+from o2ims.domain.resource_type import MismatchedModel
+from o2ims.domain.ocloud import DeploymentManager
+from o2ims import config
+# if TYPE_CHECKING:
+#     from . import unit_of_work
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
+
+
+class InvalidResourceType(Exception):
+    pass
+
+
+def update_dms(
+    cmd: commands.UpdateDms,
+    uow: AbstractUnitOfWork
+):
+    stxobj = cmd.data
+    with uow:
+        dms = uow.deployment_managers.get(stxobj.id)
+        if not dms:
+            logger.info("add dms:" + stxobj.name
+                        + " update_at: " + str(stxobj.updatetime)
+                        + " id: " + str(stxobj.id)
+                        + " hash: " + str(stxobj.hash))
+            # ocloud = uow.oclouds.get(cmd.parent.oCloudId)
+            localmodel = create_by(stxobj, cmd.parentid)
+            uow.deployment_managers.add(localmodel)
+
+            logger.info("Add a dms: " + stxobj.id
+                        + ", name: " + stxobj.name)
+        else:
+            localmodel = dms
+            if is_outdated(localmodel, stxobj):
+                logger.info("update a dms:" + stxobj.name
+                            + " update_at: " + str(stxobj.updatetime)
+                            + " id: " + str(stxobj.id)
+                            + " hash: " + str(stxobj.hash))
+                update_by(localmodel, stxobj, cmd.parentid)
+                uow.deployment_managers.update(localmodel)
+
+            logger.info("Update a dms: " + stxobj.id
+                        + ", name: " + stxobj.name)
+        uow.commit()
+
+
+def is_outdated(ocloud: DeploymentManager, stxobj: StxGenericModel):
+    # if stxobj.updatetime:
+    #     return True if Ocloud.updatetime < stxobj.updatetime else False
+    # else:
+    return True if ocloud.hash != stxobj.hash else False
+
+
+def create_by(stxobj: StxGenericModel, parentid: str) -> DeploymentManager:
+    dmsendpoint = config.get_api_url() + config.get_o2dms_api_base(stxobj.id)
+    description = "A DMS"
+    ocloudid = parentid
+    supportedLocations = ''
+    capabilities = ''
+    capacity = ''
+    localmodel = DeploymentManager(
+        stxobj.id, stxobj.name, ocloudid, dmsendpoint, description,
+        supportedLocations, capabilities, capacity)
+    localmodel.createtime = stxobj.createtime
+    localmodel.updatetime = stxobj.updatetime
+    localmodel.hash = stxobj.hash
+
+    return localmodel
+
+
+def update_by(target: DeploymentManager, stxobj: StxGenericModel,
+              parentid: str) -> None:
+    if target.deploymentManagerId != stxobj.id:
+        raise MismatchedModel("Mismatched Id")
+    target.name = stxobj.name
+    target.createtime = stxobj.createtime
+    target.updatetime = stxobj.updatetime
+    # ocloud.content = stxobj.content
+    target.hash = stxobj.hash
+    target.oCloudId = parentid
+    target.version_number = target.version_number + 1
index 505549c..830e1ff 100644 (file)
@@ -16,6 +16,7 @@
 from __future__ import annotations
 # from dataclasses import asdict
 from typing import List, Dict, Callable, Type
+from o2ims.service.auditor import dms_handler
 # TYPE_CHECKING
 from o2ims.domain import commands, events
 from o2ims.service.auditor import ocloud_handler
@@ -34,4 +35,5 @@ EVENT_HANDLERS = {
 
 COMMAND_HANDLERS = {
     commands.UpdateOCloud: ocloud_handler.update_ocloud,
+    commands.UpdateDms: dms_handler.update_dms
 }  # type: Dict[Type[commands.Command], Callable]
index f329ebc..0445996 100644 (file)
@@ -17,7 +17,8 @@ from __future__ import annotations
 import abc
 
 from o2ims.domain.ocloud_repo import OcloudRepository,\
-    ResourcePoolRepository, ResourceRepository, ResourceTypeRepository
+    ResourcePoolRepository, ResourceRepository, ResourceTypeRepository,\
+    DeploymentManagerRepository
 from o2ims.domain.stx_repo import StxObjectRepository
 
 
@@ -26,6 +27,7 @@ class AbstractUnitOfWork(abc.ABC):
     resource_types: ResourceTypeRepository
     resource_pools: ResourcePoolRepository
     resources: ResourceRepository
+    deployment_managers: DeploymentManagerRepository
     stxobjects: StxObjectRepository
 
     def __enter__(self):
@@ -50,7 +52,10 @@ class AbstractUnitOfWork(abc.ABC):
         for entry in self.resource_types.seen:
             while entry.events:
                 yield entry.events.pop(0)
-        for entry in self.stxobjects.seen:
+        # for entry in self.stxobjects.seen:
+        #     while entry.events:
+        #         yield entry.events.pop(0)
+        for entry in self.deployment_managers.seen:
             while entry.events:
                 yield entry.events.pop(0)
 
index bf4ce43..31e2f1b 100644 (file)
@@ -14,7 +14,7 @@
 \r
 # from o2ims.domain.resource_type import ResourceTypeEnum\r
 from o2ims.service.client.base_client import BaseClient\r
-from o2ims.domain.stx_object import StxGenericModel\r
+from o2ims.domain.stx_object import StxGenericModel\r
 # from o2ims.service.unit_of_work import AbstractUnitOfWork\r
 from o2ims.service.watcher.base import BaseWatcher\r
 from o2ims.domain import commands\r
@@ -73,10 +73,11 @@ class DmsWatcher(BaseWatcher):
     def _targetname(self):\r
         return "dms"\r
 \r
-    def _probe(self, parent: object = None):\r
-        ocloudid = parent.id if parent else None\r
+    def _probe(self, parent: StxGenericModel):\r
+        ocloudid = parent.id\r
         newmodels = self._client.list(ocloudid=ocloudid)\r
         # for newmodel in newmodels:\r
         #     super()._compare_and_update(newmodel)\r
         # return newmodels\r
-        return [commands.UpdateDms(m) for m in newmodels]\r
+        return [commands.UpdateDms(data=m, parentid=ocloudid)\r
+                for m in newmodels]\r
index b18ead6..66f2294 100644 (file)
@@ -12,6 +12,7 @@
 #  See the License for the specific language governing permissions and\r
 #  limitations under the License.\r
 \r
+from o2ims.domain.stx_object import StxGenericModel\r
 from o2ims.service.client.base_client import BaseClient\r
 # from o2ims.service.unit_of_work import AbstractUnitOfWork\r
 from o2ims.service.watcher.resource_watcher import ResourceWatcher\r
@@ -30,10 +31,11 @@ class PServerCpuWatcher(ResourceWatcher):
     def _targetname(self):\r
         return "pserver_cpu"\r
 \r
-    def _probe(self, parent: object = None):\r
-        hostid = parent.id if parent else None\r
+    def _probe(self, parent: StxGenericModel):\r
+        hostid = parent.id\r
         newmodels = self._client.list(hostid=hostid)\r
         # for newmodel in newmodels:\r
         #     super()._compare_and_update(newmodel)\r
         # return newmodels\r
-        return [commands.UpdatePserverCpu(m) for m in newmodels]\r
+        return [commands.UpdatePserverCpu(data=m, parentid=hostid)\r
+                for m in newmodels]\r
index c4ac72c..befc2f6 100644 (file)
@@ -12,6 +12,7 @@
 #  See the License for the specific language governing permissions and\r
 #  limitations under the License.\r
 \r
+from o2ims.domain.stx_object import StxGenericModel\r
 from o2ims.service.client.base_client import BaseClient\r
 # from o2ims.service.unit_of_work import AbstractUnitOfWork\r
 from o2ims.service.watcher.resource_watcher import ResourceWatcher\r
@@ -30,10 +31,8 @@ class PServerWatcher(ResourceWatcher):
     def _targetname(self):\r
         return "pserver"\r
 \r
-    def _probe(self, parent: object = None):\r
-        resourcepoolid = parent.id if parent else None\r
+    def _probe(self, parent: StxGenericModel):\r
+        resourcepoolid = parent.id\r
         newmodels = self._client.list(resourcepoolid=resourcepoolid)\r
-        # for newmodel in newmodels:\r
-        #     super()._compare_and_update(newmodel)\r
-        # return newmodels\r
-        return [commands.UpdatePserverCpu(m) for m in newmodels]\r
+        return [commands.UpdatePserverCpu(data=m, parentid=resourcepoolid)\r
+                for m in newmodels]\r
index 4d2555b..7ca4093 100644 (file)
@@ -12,6 +12,7 @@
 #  See the License for the specific language governing permissions and\r
 #  limitations under the License.\r
 \r
+from o2ims.domain.stx_object import StxGenericModel\r
 from o2ims.service.client.base_client import BaseClient\r
 # from o2ims.service.unit_of_work import AbstractUnitOfWork\r
 from o2ims.service.watcher.base import BaseWatcher\r
@@ -30,10 +31,8 @@ class ResourceWatcher(BaseWatcher):
     def _targetname(self):\r
         return "resource"\r
 \r
-    def _probe(self, parent: object = None):\r
-        parentid = parent.id if parent else None\r
+    def _probe(self, parent: StxGenericModel):\r
+        parentid = parent.id\r
         newmodels = self._client.get(parentid=parentid)\r
-        # for newmodel in newmodels:\r
-        #     super()._compare_and_update(newmodel)\r
-        # return newmodels\r
-        return [commands.UpdateResource(m) for m in newmodels]\r
+        return [commands.UpdateResource(data=m, parentid=parentid)\r
+                for m in newmodels]\r
index dc7745a..a8fd4f6 100644 (file)
@@ -12,6 +12,7 @@
 #  See the License for the specific language governing permissions and\r
 #  limitations under the License.\r
 \r
+from o2ims.domain.stx_object import StxGenericModel\r
 from o2ims.service.client.base_client import BaseClient\r
 # from o2ims.service.unit_of_work import AbstractUnitOfWork\r
 from o2ims.service.watcher.base import BaseWatcher\r
@@ -30,11 +31,12 @@ class ResourcePoolWatcher(BaseWatcher):
     def _targetname(self):\r
         return "resourcepool"\r
 \r
-    def _probe(self, parent: object = None):\r
-        ocloudid = parent.id if parent else None\r
+    def _probe(self, parent: StxGenericModel):\r
+        ocloudid = parent.id\r
         newmodels = self._client.list(ocloudid=ocloudid)\r
         # for newmodel in newmodels:\r
         #     logger.info("detect ocloudmodel:" + newmodel.name)\r
         #     super()._compare_and_update(newmodel)\r
         # return newmodels\r
-        return [commands.UpdateResourcePool(m) for m in newmodels]\r
+        return [commands.UpdateResourcePool(data=m, parentid=ocloudid)\r
+                for m in newmodels]\r
index 1adef04..75800a3 100644 (file)
@@ -63,47 +63,47 @@ def test_get_ocloud(sqlite_session_factory):
     assert ocloud2 != ocloud1 and ocloud2.oCloudId == ocloud1.oCloudId
 
 
-def test_add_ocloud_with_dms(sqlite_session_factory):
-    session = sqlite_session_factory()
-    repo = repository.OcloudSqlAlchemyRepository(session)
-    ocloud1 = setup_ocloud()
-    dmsid = str(uuid.uuid4())
-    dms = ocloud.DeploymentManager(
-        dmsid, "k8s1", ocloud1.oCloudId, config.get_api_url()+"/k8s1")
-    ocloud1.addDeploymentManager(dms)
-    repo.add(ocloud1)
-    session.flush()
-    # seperate session to confirm ocloud is updated into repo
-    session2 = sqlite_session_factory()
-    repo2 = repository.OcloudSqlAlchemyRepository(session2)
-    ocloud2 = repo2.get(ocloud1.oCloudId)
-    assert ocloud2 is not None
-    assert ocloud2 != ocloud1 and ocloud2.oCloudId == ocloud1.oCloudId
-    assert len(ocloud2.deploymentManagers) == 1
-
-
-def test_update_ocloud_with_dms(sqlite_session_factory):
-    session = sqlite_session_factory()
-    repo = repository.OcloudSqlAlchemyRepository(session)
-    ocloud1 = setup_ocloud()
-    repo.add(ocloud1)
-    session.flush()
-    dmsid = str(uuid.uuid4())
-    dms = ocloud.DeploymentManager(
-        dmsid, "k8s1", ocloud1.oCloudId, config.get_api_url()+"/k8s1")
-    ocloud1.addDeploymentManager(dms)
-    repo.update(ocloud1)
-    # repo.update(ocloud1.oCloudId, {"deploymentManagers":
-    # ocloud1.deploymentManagers})
-    session.flush()
-
-    # seperate session to confirm ocloud is updated into repo
-    session2 = sqlite_session_factory()
-    repo2 = repository.OcloudSqlAlchemyRepository(session2)
-    ocloud2 = repo2.get(ocloud1.oCloudId)
-    assert ocloud2 is not None
-    assert ocloud2 != ocloud1 and ocloud2.oCloudId == ocloud1.oCloudId
-    assert len(ocloud2.deploymentManagers) == 1
+def test_add_ocloud_with_dms(sqlite_session_factory):
+    session = sqlite_session_factory()
+    repo = repository.OcloudSqlAlchemyRepository(session)
+    ocloud1 = setup_ocloud()
+    dmsid = str(uuid.uuid4())
+    dms = ocloud.DeploymentManager(
+        dmsid, "k8s1", ocloud1.oCloudId, config.get_api_url()+"/k8s1")
+    ocloud1.addDeploymentManager(dms)
+    repo.add(ocloud1)
+    session.flush()
+    # seperate session to confirm ocloud is updated into repo
+    session2 = sqlite_session_factory()
+    repo2 = repository.OcloudSqlAlchemyRepository(session2)
+    ocloud2 = repo2.get(ocloud1.oCloudId)
+    assert ocloud2 is not None
+    assert ocloud2 != ocloud1 and ocloud2.oCloudId == ocloud1.oCloudId
+    assert len(ocloud2.deploymentManagers) == 1
+
+
+def test_update_ocloud_with_dms(sqlite_session_factory):
+    session = sqlite_session_factory()
+    repo = repository.OcloudSqlAlchemyRepository(session)
+    ocloud1 = setup_ocloud()
+    repo.add(ocloud1)
+    session.flush()
+    dmsid = str(uuid.uuid4())
+    dms = ocloud.DeploymentManager(
+        dmsid, "k8s1", ocloud1.oCloudId, config.get_api_url()+"/k8s1")
+    ocloud1.addDeploymentManager(dms)
+    repo.update(ocloud1)
+    # repo.update(ocloud1.oCloudId, {"deploymentManagers":
+    # ocloud1.deploymentManagers})
+    session.flush()
+
+    # seperate session to confirm ocloud is updated into repo
+    session2 = sqlite_session_factory()
+    repo2 = repository.OcloudSqlAlchemyRepository(session2)
+    ocloud2 = repo2.get(ocloud1.oCloudId)
+    assert ocloud2 is not None
+    assert ocloud2 != ocloud1 and ocloud2.oCloudId == ocloud1.oCloudId
+    assert len(ocloud2.deploymentManagers) == 1
 
 
 def test_add_resource_type(sqlite_session_factory):
index a98f0db..5c9a26d 100644 (file)
@@ -34,16 +34,16 @@ def test_new_ocloud():
     assert ocloudid1 is not None and ocloud1.oCloudId == ocloudid1
 
 
-def test_add_ocloud_with_dms():
-    ocloud1 = setup_ocloud()
-    dmsid = str(uuid.uuid4())
-    dms = ocloud.DeploymentManager(
-        dmsid, "k8s1", ocloud1.oCloudId, config.get_api_url()+"/k8s1")
-    ocloud1.addDeploymentManager(dms)
-    ocloud1.addDeploymentManager(dms)
-    assert len(ocloud1.deploymentManagers) == 1
-    # repo.update(ocloud1.oCloudId, {
-    #             "deploymentManagers": ocloud1.deploymentManagers})
+def test_add_ocloud_with_dms():
+    ocloud1 = setup_ocloud()
+    dmsid = str(uuid.uuid4())
+    dms = ocloud.DeploymentManager(
+        dmsid, "k8s1", ocloud1.oCloudId, config.get_api_url()+"/k8s1")
+    ocloud1.addDeploymentManager(dms)
+    ocloud1.addDeploymentManager(dms)
+    assert len(ocloud1.deploymentManagers) == 1
+    # repo.update(ocloud1.oCloudId, {
+    #             "deploymentManagers": ocloud1.deploymentManagers})
 
 
 def test_new_resource_type():