Update watcher worker 05/7005/7
authorBin Yang <bin.yang@windriver.com>
Sat, 6 Nov 2021 01:38:59 +0000 (09:38 +0800)
committerBin Yang <bin.yang@windriver.com>
Sat, 6 Nov 2021 06:36:37 +0000 (14:36 +0800)
Issue-ID: INF-196
Signed-off-by: Bin Yang <bin.yang@windriver.com>
Change-Id: Ifcfea06b3f6263018e1d43d595f8b2f1545a54fe

28 files changed:
.gitignore
Dockerfile
Dockerfile.localtest [new file with mode: 0644]
README.md
docker-compose.yml
o2ims/adapter/clients/ocloud_sa_client.py
o2ims/adapter/clients/orm_stx.py
o2ims/adapter/ocloud_repository.py
o2ims/adapter/stx_repository.py [new file with mode: 0644]
o2ims/adapter/unit_of_work.py [new file with mode: 0644]
o2ims/bootstrap.py
o2ims/domain/ocloud_repo.py [new file with mode: 0644]
o2ims/domain/resource_type.py
o2ims/domain/stx_object.py
o2ims/domain/stx_repo.py [new file with mode: 0644]
o2ims/entrypoints/flask_application.py
o2ims/entrypoints/o2ims-watcher-entry.sh [new file with mode: 0644]
o2ims/entrypoints/resource_watcher.py [moved from o2ims/service/watcher/executor.py with 50% similarity]
o2ims/service/unit_of_work.py
o2ims/service/watcher/base.py
o2ims/service/watcher/worker.py
o2ims/views/ocloud_view.py
requirements-stx.txt [new file with mode: 0644]
requirements.txt
tests/integration-ocloud/test_watcher_w_stx_sa.py [new file with mode: 0644]
tests/integration/test_clientdriver_fake_stx_sa.py
tests/unit/test_ocloud.py
tests/unit/test_watcher.py

index c46d404..91ffe40 100644 (file)
@@ -5,3 +5,4 @@ __pycache__
 *.pyc
 .tox
 *.log
+temp
\ No newline at end of file
index 593e21f..75e2363 100644 (file)
@@ -1,14 +1,32 @@
 FROM python:3.10-slim-buster\r
 \r
 RUN apt-get update; apt-get install -y git gcc\r
+\r
+# in case git repo is not accessable\r
+# RUN mkdir -p /cgtsclient\r
+# COPY temp/config /cgtsclient/\r
+RUN git clone https://opendev.org/starlingx/config.git /cgtsclient\r
+RUN pip install -e /cgtsclient/sysinv/cgts-client/cgts-client/\r
+\r
+# RUN mkdir -p /distcloud-client\r
+# COPY temp/distcloud-client /distcloud-client/\r
+RUN git clone https://opendev.org/starlingx/distcloud-client.git /distcloud-client/\r
+RUN pip install -e /distcloud-client/distributedcloud-client\r
+# in case git repo is not accessable\r
+\r
+\r
 COPY requirements.txt /tmp/\r
+COPY requirements-stx.txt /tmp/\r
 COPY constraints.txt /tmp/\r
 \r
-RUN pip install -r /tmp/requirements.txt -c /tmp/constraints.txt\r
+RUN  pip install -r /tmp/requirements.txt -c /tmp/constraints.txt\r
+\r
+# RUN  pip install -r /tmp/requirements-stx.txt\r
 \r
 COPY requirements-test.txt /tmp/\r
 RUN pip install -r /tmp/requirements-test.txt\r
 \r
+\r
 RUN mkdir -p /src\r
 COPY o2ims/ /src/o2ims/\r
 COPY o2dms/ /src/o2dms/\r
@@ -19,4 +37,6 @@ RUN pip install -e /src
 \r
 COPY tests/ /tests/\r
 \r
+RUN apt-get install -y procps vim\r
+\r
 WORKDIR /src\r
diff --git a/Dockerfile.localtest b/Dockerfile.localtest
new file mode 100644 (file)
index 0000000..c61c1db
--- /dev/null
@@ -0,0 +1,37 @@
+FROM python:3.10-slim-buster\r
+\r
+RUN apt-get update; apt-get install -y git gcc\r
+\r
+# in case git repo is not accessable\r
+RUN mkdir -p /cgtsclient\r
+COPY temp/config /cgtsclient/\r
+RUN pip install -e cgtsclient/sysinv/cgts-client/cgts-client/\r
+\r
+RUN mkdir -p /distcloud-client\r
+COPY temp/distcloud-client /distcloud-client/\r
+RUN pip install -e /distcloud-client/distributedcloud-client\r
+# in case git repo is not accessable\r
+\r
+\r
+COPY requirements.txt /tmp/\r
+COPY constraints.txt /tmp/\r
+\r
+RUN  pip install -r /tmp/requirements.txt -c /tmp/constraints.txt\r
+\r
+COPY requirements-test.txt /tmp/\r
+RUN pip install -r /tmp/requirements-test.txt\r
+\r
+\r
+RUN mkdir -p /src\r
+COPY o2ims/ /src/o2ims/\r
+COPY o2dms/ /src/o2dms/\r
+COPY o2common/ /src/o2common/\r
+COPY setup.py /src/\r
+\r
+# RUN pip install -e /src\r
+\r
+COPY tests/ /tests/\r
+\r
+RUN apt-get install -y procps vim\r
+\r
+WORKDIR /src\r
index fdc1d39..1612e8e 100644 (file)
--- a/README.md
+++ b/README.md
@@ -1,5 +1,15 @@
 ## Building containers\r
 \r
+To accommodate the git repo access issue, the cgts-client and distributed client are\r
+cloned into temp before docker building\r
+\r
+```sh\r
+mkdir -p temp\r
+cd temp\r
+git clone https://opendev.org/starlingx/config.git\r
+git clone https://opendev.org/starlingx/distcloud-client.git\r
+cd -\r
+```\r
 \r
 ```sh\r
 docker-compose build\r
index 4ffc2f1..c7ca8b7 100644 (file)
@@ -5,7 +5,7 @@ services:
   redis_pubsub:
     build:
       context: .
-      dockerfile: Dockerfile
+      dockerfile: Dockerfile.localtest
     image: o2imsdms-image
     depends_on:
       - postgres
@@ -56,6 +56,31 @@ services:
     ports:
       - "5005:80"
 
+  watcher:
+    build:
+      context: .
+      dockerfile: Dockerfile.localtest
+    image: o2imsdms-image
+    depends_on:
+      - postgres
+      - redis
+    environment:
+      - DB_HOST=postgres
+      - DB_PASSWORD=o2ims123
+      - REDIS_HOST=redis
+      - PYTHONDONTWRITEBYTECODE=1
+      - OS_AUTH_URL=${OS_AUTH_URL}
+      - OS_USERNAME=${OS_USERNAME}
+      - OS_PASSWORD=${OS_PASSWORD}
+    volumes:
+      - ./o2ims:/o2ims
+      - ./o2dms:/o2dms
+      - ./o2common:/o2common
+      - ./tests:/tests
+    entrypoint:
+      - /bin/sh
+      - /o2ims/entrypoints/o2ims-watcher-entry.sh
+
   postgres:
     image: postgres:9.6
     environment:
index 5e9e64c..e8c48a3 100644 (file)
@@ -20,6 +20,7 @@ from typing import List
 # Optional,  Set\r
 from o2ims.domain import stx_object as ocloudModel\r
 from o2ims import config\r
+from o2ims.domain.resource_type import ResourceTypeEnum\r
 \r
 # from dcmanagerclient.api import client\r
 from cgtsclient.client import get_client\r
@@ -101,7 +102,7 @@ class StxSaClientImp(object):
         super().__init__()\r
         self.stxclient = stx_client if stx_client else self.getStxClient()\r
 \r
-    def getStxClient():\r
+    def getStxClient(self):\r
         os_client_args = config.get_stx_access_info()\r
         config_client = get_client(**os_client_args)\r
         return config_client\r
@@ -109,39 +110,55 @@ class StxSaClientImp(object):
     def getInstanceInfo(self) -> ocloudModel.StxGenericModel:\r
         systems = self.stxclient.isystem.list()\r
         logger.debug("systems:" + str(systems[0].to_dict()))\r
-        return ocloudModel.StxGenericModel(systems[0]) if systems else None\r
+        return ocloudModel.StxGenericModel(\r
+            ResourceTypeEnum.OCLOUD, systems[0]) if systems else None\r
 \r
     def getPserverList(self) -> List[ocloudModel.StxGenericModel]:\r
         hosts = self.stxclient.ihost.list()\r
         logger.debug("host 1:" + str(hosts[0].to_dict()))\r
-        return [ocloudModel.StxGenericModel(self._hostconverter(host))\r
+        return [ocloudModel.StxGenericModel(\r
+            ResourceTypeEnum.PSERVER, self._hostconverter(host))\r
                 for host in hosts if host]\r
 \r
     def getPserver(self, id) -> ocloudModel.StxGenericModel:\r
         host = self.stxclient.ihost.get(id)\r
         logger.debug("host:" + str(host.to_dict()))\r
-        return ocloudModel.StxGenericModel(self._hostconverter(host))\r
+        return ocloudModel.StxGenericModel(\r
+            ResourceTypeEnum.PSERVER, self._hostconverter(host))\r
 \r
     def getK8sList(self) -> List[ocloudModel.StxGenericModel]:\r
         k8sclusters = self.stxclient.kube_cluster.list()\r
-        logger.debug("k8sresources:" + str(k8sclusters[0].to_dict()))\r
-\r
-        return [ocloudModel.StxGenericModel(self._k8sconverter(k8sres))\r
-                for k8sres in k8sclusters if k8sres]\r
+        logger.debug("k8sresources[0]:" + str(k8sclusters[0].to_dict()))\r
+        return [ocloudModel.StxGenericModel(\r
+            ResourceTypeEnum.DMS,\r
+            self._k8sconverter(k8sres), self._k8shasher(k8sres))\r
+            for k8sres in k8sclusters if k8sres]\r
 \r
     def getK8sDetail(self, name) -> ocloudModel.StxGenericModel:\r
-        k8scluster = self.stxclient.kube_cluster.get(name)\r
+        if not name:\r
+            k8sclusters = self.stxclient.kube_cluster.list()\r
+            # logger.debug("k8sresources[0]:" + str(k8sclusters[0].to_dict()))\r
+            k8scluster = k8sclusters.pop()\r
+        else:\r
+            k8scluster = self.stxclient.kube_cluster.get(name)\r
+\r
+        if not k8scluster:\r
+            return None\r
         logger.debug("k8sresource:" + str(k8scluster.to_dict()))\r
-        return ocloudModel.StxGenericModel(self._k8sconverter(k8scluster))\r
+        return ocloudModel.StxGenericModel(\r
+            ResourceTypeEnum.DMS,\r
+            self._k8sconverter(k8scluster), self._k8shasher(k8scluster))\r
 \r
     def getCpuList(self, hostid) -> List[ocloudModel.StxGenericModel]:\r
         cpulist = self.stxclient.icpu.list(hostid)\r
-        return [ocloudModel.StxGenericModel(self._cpuconverter(cpures))\r
-                for cpures in cpulist if cpures]\r
+        return [ocloudModel.StxGenericModel(\r
+            ResourceTypeEnum.OCLOUD,\r
+            self._cpuconverter(cpures)) for cpures in cpulist if cpures]\r
 \r
     def getCpu(self, id) -> ocloudModel.StxGenericModel:\r
         cpuinfo = self.stxclient.icpu.get(id)\r
-        return ocloudModel.StxGenericModel(self._cpuconverter(cpuinfo))\r
+        return ocloudModel.StxGenericModel(\r
+            ResourceTypeEnum.OCLOUD, self._cpuconverter(cpuinfo))\r
 \r
     def _getIsystems(self):\r
         return self.stxclient.isystem.list()\r
@@ -167,12 +184,17 @@ class StxSaClientImp(object):
         return cpu\r
 \r
     @staticmethod\r
-    def _k8sconverter(host):\r
-        setattr(host, "name", host.cluster_name)\r
-        setattr(host, "uuid",\r
-                uuid.uuid3(uuid.NAMESPACE_URL, host.cluster_name))\r
-        setattr(host, 'updated_at', None)\r
-        setattr(host, 'created_at', None)\r
+    def _k8sconverter(cluster):\r
+        setattr(cluster, "name", cluster.cluster_name)\r
+        setattr(cluster, "uuid",\r
+                uuid.uuid3(uuid.NAMESPACE_URL, cluster.cluster_name))\r
+        setattr(cluster, 'updated_at', None)\r
+        setattr(cluster, 'created_at', None)\r
         logger.debug("k8s cluster name/uuid:" +\r
-                     host.name + "/" + str(host.uuid))\r
-        return host\r
+                     cluster.name + "/" + str(cluster.uuid))\r
+        return cluster\r
+\r
+    @staticmethod\r
+    def _k8shasher(cluster):\r
+        return str(hash((cluster.cluster_name,\r
+                         cluster.cluster_api_endpoint, cluster.admin_user)))\r
index 4b825c4..b8c1523 100644 (file)
@@ -17,37 +17,50 @@ import logging
 \r
 from sqlalchemy import (\r
     Table,\r
-    MetaData,\r
+    MetaData,\r
     Column,\r
     # Integer,\r
     String,\r
     # Date,\r
     DateTime,\r
+    # engine,\r
     # ForeignKey,\r
     # event,\r
+    Enum\r
 )\r
 \r
 from sqlalchemy.orm import mapper\r
+# from sqlalchemy.sql.sqltypes import Integer\r
 # from sqlalchemy.sql.expression import true\r
 \r
 from o2ims.domain import stx_object as ocloudModel\r
-from o2ims.adapter.orm import metadata\r
+# from o2ims.adapter.orm import metadata\r
+from o2ims.service.unit_of_work import AbstractUnitOfWork\r
+from o2ims.adapter.unit_of_work import SqlAlchemyUnitOfWork\r
+from o2ims.domain.resource_type import ResourceTypeEnum\r
 \r
 logger = logging.getLogger(__name__)\r
 \r
-metadata = MetaData()\r
+metadata = MetaData()\r
 \r
 stxobject = Table(\r
     "stxcache",\r
     metadata,\r
     Column("id", String(255), primary_key=True),\r
+    Column("type", Enum(ResourceTypeEnum)),\r
     Column("name", String(255)),\r
     Column("updatetime", DateTime),\r
     Column("createtime", DateTime),\r
-    Column("content", String(255))\r
+    Column("hash", String(255)),\r
+    Column("content", String)\r
 )\r
 \r
 \r
-def start_o2ims_stx_mappers():\r
+def start_o2ims_stx_mappers(uow: AbstractUnitOfWork = SqlAlchemyUnitOfWork()):\r
     logger.info("Starting O2 IMS Stx mappers")\r
     mapper(ocloudModel.StxGenericModel, stxobject)\r
+\r
+    with uow:\r
+        engine1 = uow.session.get_bind()\r
+        metadata.create_all(engine1)\r
+        uow.commit()\r
index ca90209..32dced9 100644 (file)
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
-import abc
-from typing import List, Set
+from typing import List
 # from o2ims.adapter import orm
 from o2ims.domain import ocloud
-
-
-class OcloudRepository(abc.ABC):
-    def __init__(self):
-        self.seen = set()  # type: Set[ocloud.Ocloud]
-
-    def add(self, ocloud: ocloud.Ocloud):
-        self._add(ocloud)
-        self.seen.add(ocloud)
-
-    def get(self, ocloudid) -> ocloud.Ocloud:
-        ocloud = self._get(ocloudid)
-        if ocloud:
-            self.seen.add(ocloud)
-        return ocloud
-
-    def list(self) -> List[ocloud.Ocloud]:
-        return self._list()
-
-    def update(self, ocloud: ocloud.Ocloud):
-        self._update(ocloud)
-
-    # def update_fields(self, ocloudid: str, updatefields: dict):
-    #     self._update(ocloudid, updatefields)
-
-    @abc.abstractmethod
-    def _add(self, ocloud: ocloud.Ocloud):
-        raise NotImplementedError
-
-    @abc.abstractmethod
-    def _get(self, ocloudid) -> ocloud.Ocloud:
-        raise NotImplementedError
-
-    @abc.abstractmethod
-    def _update(self, ocloud: ocloud.Ocloud):
-        raise NotImplementedError
+from o2ims.domain.ocloud_repo import OcloudRepository
 
 
 class OcloudSqlAlchemyRepository(OcloudRepository):
@@ -68,7 +32,8 @@ class OcloudSqlAlchemyRepository(OcloudRepository):
             oCloudId=ocloudid).first()
 
     def _list(self) -> List[ocloud.Ocloud]:
-        return self.session.query()
+        return self.session.query(ocloud.Ocloud).order_by(
+            ocloud.Ocloud.name).all()
 
     def _update(self, ocloud: ocloud.Ocloud):
         self.session.add(ocloud)
diff --git a/o2ims/adapter/stx_repository.py b/o2ims/adapter/stx_repository.py
new file mode 100644 (file)
index 0000000..741aa11
--- /dev/null
@@ -0,0 +1,39 @@
+# Copyright (C) 2021 Wind River Systems, Inc.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+from typing import List
+# from o2ims.adapter import orm
+from o2ims.domain.stx_object import StxGenericModel
+from o2ims.domain.stx_repo import StxObjectRepository
+from o2ims.domain.resource_type import ResourceTypeEnum
+
+
+class StxObjectSqlAlchemyRepository(StxObjectRepository):
+    def __init__(self, session):
+        super().__init__()
+        self.session = session
+
+    def _add(self, stx_obj: StxGenericModel):
+        self.session.add(stx_obj)
+
+    def _get(self, stx_obj_id) -> StxGenericModel:
+        return self.session.query(StxGenericModel).filter_by(
+            id=stx_obj_id).first()
+
+    def _list(self, type: ResourceTypeEnum) -> List[StxGenericModel]:
+        return self.session.query(StxGenericModel).filter_by(
+            type=type).order_by(StxGenericModel.updatetime.desc()).all()
+
+    def _update(self, stx_obj: StxGenericModel):
+        self.session.add(stx_obj)
diff --git a/o2ims/adapter/unit_of_work.py b/o2ims/adapter/unit_of_work.py
new file mode 100644 (file)
index 0000000..c958ce2
--- /dev/null
@@ -0,0 +1,53 @@
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+#  Licensed under the Apache License, Version 2.0 (the "License");\r
+#  you may not use this file except in compliance with the License.\r
+#  You may obtain a copy of the License at\r
+#\r
+#      http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+#  Unless required by applicable law or agreed to in writing, software\r
+#  distributed under the License is distributed on an "AS IS" BASIS,\r
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+#  See the License for the specific language governing permissions and\r
+#  limitations under the License.\r
+\r
+# pylint: disable=attribute-defined-outside-init\r
+from __future__ import annotations\r
+from sqlalchemy import create_engine\r
+from sqlalchemy.orm import sessionmaker\r
+from sqlalchemy.orm.session import Session\r
+\r
+from o2ims import config\r
+from o2ims.adapter.ocloud_repository import OcloudSqlAlchemyRepository\r
+from o2ims.adapter.stx_repository import StxObjectSqlAlchemyRepository\r
+from o2ims.service.unit_of_work import AbstractUnitOfWork\r
+\r
+\r
+DEFAULT_SESSION_FACTORY = sessionmaker(\r
+    bind=create_engine(\r
+        config.get_postgres_uri(),\r
+        isolation_level="REPEATABLE READ",\r
+    )\r
+)\r
+\r
+\r
+class SqlAlchemyUnitOfWork(AbstractUnitOfWork):\r
+    def __init__(self, session_factory=DEFAULT_SESSION_FACTORY):\r
+        self.session_factory = session_factory\r
+\r
+    def __enter__(self):\r
+        self.session = self.session_factory()  # type: Session\r
+        self.oclouds = OcloudSqlAlchemyRepository(self.session)\r
+        self.stxobjects = StxObjectSqlAlchemyRepository(self.session)\r
+        return super().__enter__()\r
+\r
+    def __exit__(self, *args):\r
+        super().__exit__(*args)\r
+        self.session.close()\r
+\r
+    def _commit(self):\r
+        self.session.commit()\r
+\r
+    def rollback(self):\r
+        self.session.rollback()\r
index 524f325..55fc99d 100644 (file)
@@ -19,11 +19,13 @@ from o2ims.adapter.notifications import AbstractNotifications,\
     SmoO2Notifications
 
 from o2ims.service import handlers, messagebus, unit_of_work
+from o2ims.adapter.unit_of_work import SqlAlchemyUnitOfWork
+from o2ims.adapter.clients import orm_stx
 
 
 def bootstrap(
     start_orm: bool = True,
-    uow: unit_of_work.AbstractUnitOfWork = unit_of_work.SqlAlchemyUnitOfWork(),
+    uow: unit_of_work.AbstractUnitOfWork = SqlAlchemyUnitOfWork(),
     notifications: AbstractNotifications = None,
     publish: Callable = redis_eventpublisher.publish,
 ) -> messagebus.MessageBus:
@@ -33,6 +35,7 @@ def bootstrap(
 
     if start_orm:
         orm.start_o2ims_mappers()
+        orm_stx.start_o2ims_stx_mappers(uow)
 
     dependencies = {"uow": uow, "notifications": notifications,
                     "publish": publish}
diff --git a/o2ims/domain/ocloud_repo.py b/o2ims/domain/ocloud_repo.py
new file mode 100644 (file)
index 0000000..2c486bd
--- /dev/null
@@ -0,0 +1,53 @@
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+#  Licensed under the Apache License, Version 2.0 (the "License");\r
+#  you may not use this file except in compliance with the License.\r
+#  You may obtain a copy of the License at\r
+#\r
+#      http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+#  Unless required by applicable law or agreed to in writing, software\r
+#  distributed under the License is distributed on an "AS IS" BASIS,\r
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+#  See the License for the specific language governing permissions and\r
+#  limitations under the License.\r
+\r
+import abc\r
+from typing import List, Set\r
+from o2ims.domain import ocloud\r
+\r
+\r
+class OcloudRepository(abc.ABC):\r
+    def __init__(self):\r
+        self.seen = set()  # type: Set[ocloud.Ocloud]\r
+\r
+    def add(self, ocloud: ocloud.Ocloud):\r
+        self._add(ocloud)\r
+        self.seen.add(ocloud)\r
+\r
+    def get(self, ocloudid) -> ocloud.Ocloud:\r
+        ocloud = self._get(ocloudid)\r
+        if ocloud:\r
+            self.seen.add(ocloud)\r
+        return ocloud\r
+\r
+    def list(self) -> List[ocloud.Ocloud]:\r
+        return self._list()\r
+\r
+    def update(self, ocloud: ocloud.Ocloud):\r
+        self._update(ocloud)\r
+\r
+    # def update_fields(self, ocloudid: str, updatefields: dict):\r
+    #     self._update(ocloudid, updatefields)\r
+\r
+    @abc.abstractmethod\r
+    def _add(self, ocloud: ocloud.Ocloud):\r
+        raise NotImplementedError\r
+\r
+    @abc.abstractmethod\r
+    def _get(self, ocloudid) -> ocloud.Ocloud:\r
+        raise NotImplementedError\r
+\r
+    @abc.abstractmethod\r
+    def _update(self, ocloud: ocloud.Ocloud):\r
+        raise NotImplementedError\r
index 72f0db0..d95eb18 100644 (file)
@@ -2,6 +2,9 @@ from enum import Enum
 \r
 \r
 class ResourceTypeEnum(Enum):\r
-    PSERVER = 1\r
-    PSERVER_CPU = 2\r
-    PSERVER_RAM = 3\r
+    OCLOUD = 1,\r
+    RESOURCE_POOL = 2,\r
+    DMS = 3,\r
+    PSERVER = 11\r
+    PSERVER_CPU = 12\r
+    PSERVER_RAM = 13\r
index 09d1a16..36ce349 100644 (file)
 #  limitations under the License.\r
 \r
 # from dataclasses import dataclass\r
-import datetime\r
+import datetime\r
 import json\r
 \r
+from o2ims.domain.resource_type import ResourceTypeEnum\r
+import logging\r
+logger = logging.getLogger(__name__)\r
+\r
 \r
 class MismatchedModel(Exception):\r
     pass\r
 \r
 \r
 class StxGenericModel:\r
-    def __init__(self, api_response: dict = None) -> None:\r
+    def __init__(self, type: ResourceTypeEnum,\r
+                 api_response: dict = None, content_hash=None) -> None:\r
         if api_response:\r
             self.id = api_response.uuid\r
-            self.content = json.dumps(api_response.to_dict())\r
-            self.updatetime = api_response.updated_at\r
-            self.createtime = api_response.created_at\r
+            self.type = type\r
+            self.updatetime = datetime.datetime.strptime(\r
+                api_response.updated_at.split('.')[0], "%Y-%m-%dT%H:%M:%S") \\r
+                if api_response.updated_at else None\r
+            self.createtime = datetime.datetime.strptime(\r
+                api_response.created_at.split('.')[0], "%Y-%m-%dT%H:%M:%S") \\r
+                if api_response.created_at else None\r
             self.name = api_response.name\r
+            self.hash = content_hash if content_hash \\r
+                else str(hash((self.id, self.updatetime)))\r
+            self.content = json.dumps(api_response.to_dict())\r
 \r
     def is_outdated(self, newmodel) -> bool:\r
-        return self.updatetime < newmodel.updatetime\r
+        # return self.updatetime < newmodel.updatetime\r
+        # logger.warning("hash1: " + self.hash + " vs hash2: " + newmodel.hash)\r
+        return self.hash != newmodel.hash\r
 \r
     def update_by(self, newmodel) -> None:\r
         if self.id != newmodel.id:\r
             raise MismatchedModel("Mismatched model")\r
         self.name = newmodel.name\r
-\r
-        self.content = newmodel.content\r
         self.createtime = newmodel.createtime\r
         self.updatetime = newmodel.updatetime\r
-\r
-\r
-class StxK8sClusterModel(StxGenericModel):\r
-    def __init__(self, api_response: dict = None) -> None:\r
-        super().__init__(api_response=api_response)\r
-\r
-    def is_outdated(self, newmodel) -> bool:\r
-        # never outdated since lack of such evidence\r
-        return False\r
+        self.content = newmodel.content\r
diff --git a/o2ims/domain/stx_repo.py b/o2ims/domain/stx_repo.py
new file mode 100644 (file)
index 0000000..cb74480
--- /dev/null
@@ -0,0 +1,58 @@
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+#  Licensed under the Apache License, Version 2.0 (the "License");\r
+#  you may not use this file except in compliance with the License.\r
+#  You may obtain a copy of the License at\r
+#\r
+#      http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+#  Unless required by applicable law or agreed to in writing, software\r
+#  distributed under the License is distributed on an "AS IS" BASIS,\r
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+#  See the License for the specific language governing permissions and\r
+#  limitations under the License.\r
+\r
+import abc\r
+from typing import List, Set\r
+from o2ims.domain.stx_object import StxGenericModel\r
+from o2ims.domain.resource_type import ResourceTypeEnum\r
+\r
+\r
+class StxObjectRepository(abc.ABC):\r
+    def __init__(self):\r
+        self.seen = set()  # type: Set[StxGenericModel]\r
+\r
+    def add(self, stx_obj: StxGenericModel):\r
+        self._add(stx_obj)\r
+        self.seen.add(stx_obj)\r
+\r
+    def get(self, stx_obj_id) -> StxGenericModel:\r
+        stx_obj = self._get(stx_obj_id)\r
+        if stx_obj:\r
+            self.seen.add(stx_obj)\r
+        return stx_obj\r
+\r
+    def list(self, type: ResourceTypeEnum) -> List[StxGenericModel]:\r
+        return self._list(type)\r
+\r
+    def update(self, stx_obj: StxGenericModel):\r
+        self._update(stx_obj)\r
+\r
+    # def update_fields(self, stx_obj_id: str, updatefields: dict):\r
+    #     self._update(stx_obj_id, updatefields)\r
+\r
+    @abc.abstractmethod\r
+    def _add(self, stx_obj: StxGenericModel):\r
+        raise NotImplementedError\r
+\r
+    @abc.abstractmethod\r
+    def _get(self, stx_obj_id) -> StxGenericModel:\r
+        raise NotImplementedError\r
+\r
+    @abc.abstractmethod\r
+    def _update(self, stx_obj: StxGenericModel):\r
+        raise NotImplementedError\r
+\r
+    @abc.abstractmethod\r
+    def _list(self, type: ResourceTypeEnum):\r
+        raise NotImplementedError\r
index bb791bc..8965a30 100644 (file)
@@ -19,12 +19,10 @@ from flask import Flask, jsonify
 # from o2ims.service.handlers import InvalidResourceType\r
 from o2ims import bootstrap, config\r
 from o2ims.views import ocloud_view\r
-from o2ims.service.watcher.executor import start_watchers\r
 \r
 app = Flask(__name__)\r
 bus = bootstrap.bootstrap()\r
 apibase = config.get_o2ims_api_base()\r
-start_watchers()\r
 \r
 \r
 @app.route(apibase, methods=["GET"])\r
diff --git a/o2ims/entrypoints/o2ims-watcher-entry.sh b/o2ims/entrypoints/o2ims-watcher-entry.sh
new file mode 100644 (file)
index 0000000..f942908
--- /dev/null
@@ -0,0 +1,8 @@
+#!/bin/sh
+
+# pip install -e /src
+# python /o2ims/entrypoints/resource_watcher.py
+
+cp -r /o2ims/* /src/o2ims
+pip install -e /src
+python /o2ims/entrypoints/resource_watcher.py
similarity index 50%
rename from o2ims/service/watcher/executor.py
rename to o2ims/entrypoints/resource_watcher.py
index 607bdc9..a3292df 100644 (file)
 import cotyledon\r
 \r
 from o2ims.service.watcher.worker import PollWorker\r
-from o2ims.service.watcher.base import OcloudWather\r
+from o2ims.service.watcher.base import OcloudWatcher\r
 from o2ims.service.watcher.base import DmsWatcher\r
+# from o2ims.service.client.base_client import BaseClient\r
+from o2ims.adapter.clients.ocloud_sa_client import StxSaDmsClient\r
+from o2ims.adapter.clients.ocloud_sa_client import StxSaOcloudClient\r
+\r
+from o2ims import bootstrap\r
+# from o2ims import config\r
+# import redis\r
 \r
 import logging\r
 logger = logging.getLogger(__name__)\r
 \r
+# r = redis.Redis(**config.get_redis_host_and_port())\r
+\r
 \r
 class WatcherService(cotyledon.Service):\r
-    def __init__(self, worker_id, args) -> None:\r
+    def __init__(self, worker_id, args=None) -> None:\r
         super().__init__(worker_id)\r
         self.args = args\r
+        self.bus = bootstrap.bootstrap()\r
         self.worker = PollWorker()\r
+        # self.stxrepo = self.bus.uow.stxobjects\r
+        # tbd: 1 client per resource pool\r
+        # self.client = StxSaOcloudClient()\r
 \r
     def run(self):\r
         try:\r
-            self.worker.add_watcher(OcloudWather())\r
-            self.worker.add_watcher(DmsWatcher())\r
+            self.worker.add_watcher(OcloudWatcher(StxSaOcloudClient(),\r
+                                    self.bus.uow))\r
+            self.worker.add_watcher(DmsWatcher(StxSaDmsClient(),\r
+                                    self.bus.uow))\r
             self.worker.start()\r
         except Exception as ex:\r
-            logger.warning(ex.message)\r
+            logger.warning("WorkerService Exception:" + str(ex))\r
         finally:\r
             self.worker.stop()\r
 \r
 \r
-def start_watchers(sm=None):\r
+def start_watchers(sm: cotyledon.ServiceManager = None):\r
     watchersm = sm if sm else cotyledon.ServiceManager()\r
     watchersm.add(WatcherService, workers=1, args=())\r
-    return watchersm\r
+    watchersm.run()\r
+\r
+\r
+def main():\r
+    logger.info("Resource watcher starting")\r
+    start_watchers()\r
+\r
+\r
+if __name__ == "__main__":\r
+    main()\r
index 40e0f76..a06e9e0 100644 (file)
 # pylint: disable=attribute-defined-outside-init
 from __future__ import annotations
 import abc
-from sqlalchemy import create_engine
-from sqlalchemy.orm import sessionmaker
-from sqlalchemy.orm.session import Session
 
-from o2ims import config
-from o2ims.adapter import ocloud_repository
+from o2ims.domain.ocloud_repo import OcloudRepository
+from o2ims.domain.stx_repo import StxObjectRepository
 
 
 class AbstractUnitOfWork(abc.ABC):
-    oclouds: ocloud_repository.OcloudRepository
+    oclouds: OcloudRepository
+    stxobjects: StxObjectRepository
 
     def __enter__(self):
         return self
@@ -47,32 +45,3 @@ class AbstractUnitOfWork(abc.ABC):
     @abc.abstractmethod
     def rollback(self):
         raise NotImplementedError
-
-
-DEFAULT_SESSION_FACTORY = sessionmaker(
-    bind=create_engine(
-        config.get_postgres_uri(),
-        isolation_level="REPEATABLE READ",
-    )
-)
-
-
-class SqlAlchemyUnitOfWork(AbstractUnitOfWork):
-    def __init__(self, session_factory=DEFAULT_SESSION_FACTORY):
-        self.session_factory = session_factory
-
-    def __enter__(self):
-        self.session = self.session_factory()  # type: Session
-        self.oclouds = ocloud_repository\
-            .OcloudSqlAlchemyRepository(self.session)
-        return super().__enter__()
-
-    def __exit__(self, *args):
-        super().__exit__(*args)
-        self.session.close()
-
-    def _commit(self):
-        self.session.commit()
-
-    def rollback(self):
-        self.session.rollback()
index 26c67da..c3ff3d4 100644 (file)
 #  See the License for the specific language governing permissions and\r
 #  limitations under the License.\r
 \r
+from o2ims.domain.resource_type import ResourceTypeEnum\r
 from o2ims.service.client.base_client import BaseClient\r
 from o2ims.domain.stx_object import StxGenericModel\r
-from o2ims.adapter.ocloud_repository import OcloudRepository\r
+from o2ims.service.unit_of_work import AbstractUnitOfWork\r
+\r
+import logging\r
+logger = logging.getLogger(__name__)\r
 \r
 \r
 class InvalidOcloudState(Exception):\r
@@ -39,11 +43,11 @@ class BaseWatcher(object):
         raise NotImplementedError\r
 \r
 \r
-class OcloudWather(BaseWatcher):\r
+class OcloudWatcher(BaseWatcher):\r
     def __init__(self, ocloud_client: BaseClient,\r
-                 repo: OcloudRepository) -> None:\r
+                 uow: AbstractUnitOfWork) -> None:\r
         super().__init__(ocloud_client)\r
-        self._repo = repo\r
+        self._uow = uow\r
 \r
     def _targetname(self):\r
         return "ocloud"\r
@@ -54,38 +58,105 @@ class OcloudWather(BaseWatcher):
             self._compare_and_update(ocloudmodel)\r
 \r
     def _compare_and_update(self, ocloudmodel: StxGenericModel) -> bool:\r
-        # localmodel = self._repo.get(ocloudmodel.id)\r
-        oclouds = self._repo.list()\r
-        if len(oclouds) > 1:\r
-            raise InvalidOcloudState("More than 1 ocloud is found")\r
-        if len(oclouds) == 0:\r
-            self._repo.add(ocloudmodel)\r
-        else:\r
-            localmodel = oclouds.pop()\r
-            if localmodel.is_outdated(ocloudmodel):\r
-                localmodel.update_by(ocloudmodel)\r
-                self._repo.update(localmodel)\r
+        with self._uow:\r
+            # localmodel = self._uow.stxobjects.get(str(ocloudmodel.id))\r
+            oclouds = self._uow.stxobjects.list(ResourceTypeEnum.OCLOUD)\r
+            if len(oclouds) > 1:\r
+                raise InvalidOcloudState("More than 1 ocloud is found")\r
+            if len(oclouds) == 0:\r
+                logger.info("add ocloud:" + ocloudmodel.name\r
+                            + " update_at: " + str(ocloudmodel.updatetime)\r
+                            + " id: " + str(ocloudmodel.id)\r
+                            + " hash: " + str(ocloudmodel.hash))\r
+                self._uow.stxobjects.add(ocloudmodel)\r
+            else:\r
+                localmodel = oclouds.pop()\r
+                if localmodel.is_outdated(ocloudmodel):\r
+                    logger.info("update ocloud:" + ocloudmodel.name\r
+                                + " update_at: " + str(ocloudmodel.updatetime)\r
+                                + " id: " + str(ocloudmodel.id)\r
+                                + " hash: " + str(ocloudmodel.hash))\r
+                    localmodel.update_by(ocloudmodel)\r
+                    self._uow.stxobjects.update(localmodel)\r
+            self._uow.commit()\r
 \r
 \r
 class DmsWatcher(BaseWatcher):\r
-    def __init__(self, client: BaseClient) -> None:\r
+    def __init__(self, client: BaseClient,\r
+                 uow: AbstractUnitOfWork) -> None:\r
         super().__init__(client)\r
+        self._uow = uow\r
 \r
     def _targetname(self):\r
         return "dms"\r
 \r
+    def _probe(self):\r
+        ocloudmodel = self._client.get(None)\r
+        if ocloudmodel:\r
+            self._compare_and_update(ocloudmodel)\r
+\r
+    def _compare_and_update(self, newmodel: StxGenericModel) -> bool:\r
+        with self._uow:\r
+            # localmodel = self._uow.stxobjects.get(ocloudmodel.id)\r
+            localmodel = self._uow.stxobjects.get(str(newmodel.id))\r
+            if not localmodel:\r
+                logger.info("add dms:" + newmodel.name)\r
+                self._uow.stxobjects.add(newmodel)\r
+            elif localmodel.is_outdated(newmodel):\r
+                logger.info("update dms:" + newmodel.name)\r
+                localmodel.update_by(newmodel)\r
+                self._uow.stxobjects.update(newmodel)\r
+            self._uow.commit()\r
+\r
 \r
 class ResourcePoolWatcher(BaseWatcher):\r
-    def __init__(self) -> None:\r
+    def __init__(self, client: BaseClient,\r
+                 uow: AbstractUnitOfWork) -> None:\r
         super().__init__()\r
+        self._uow = uow\r
 \r
     def _targetname(self):\r
-        return "ocloud"\r
+        return "resourcepool"\r
+\r
+    def _probe(self):\r
+        ocloudmodel = self._client.get(None)\r
+        if ocloudmodel:\r
+            logger.info("detect ocloudmodel:" + ocloudmodel.name)\r
+            self._compare_and_update(ocloudmodel)\r
+\r
+    def _compare_and_update(self, newmodel: StxGenericModel) -> bool:\r
+        with self._uow:\r
+            # localmodel = self._uow.stxobjects.get(ocloudmodel.id)\r
+            localmodel = self._uow.stxobjects.get(str(newmodel.id))\r
+            if not localmodel:\r
+                self._uow.stxobjects.add(newmodel)\r
+            elif localmodel.is_outdated(newmodel):\r
+                localmodel.update_by(newmodel)\r
+                self._uow.stxobjects.update(newmodel)\r
+            self._uow.commit()\r
 \r
 \r
 class ResourceWatcher(BaseWatcher):\r
-    def __init__(self) -> None:\r
+    def __init__(self, client: BaseClient,\r
+                 uow: AbstractUnitOfWork) -> None:\r
         super().__init__()\r
+        self._uow = uow\r
 \r
     def _targetname(self):\r
         return "resource"\r
+\r
+    def _probe(self):\r
+        ocloudmodel = self._client.get(None)\r
+        if ocloudmodel:\r
+            self._compare_and_update(ocloudmodel)\r
+\r
+    def _compare_and_update(self, newmodel: StxGenericModel) -> bool:\r
+        with self._uow:\r
+            # localmodel = self._repo.get(ocloudmodel.id)\r
+            localmodel = self._uow.stxobjects.get(str(newmodel.id))\r
+            if not localmodel:\r
+                self._uow.stxobjects.add(newmodel)\r
+            elif localmodel.is_outdated(newmodel):\r
+                localmodel.update_by(newmodel)\r
+                self._uow.stxobjects.update(newmodel)\r
+            self._uow.commit()\r
index a348074..11bdfdc 100644 (file)
@@ -46,7 +46,7 @@ class PollWorker(object):
                 logger.debug("about to probe:"+w)\r
                 self.watchers[w].probe()\r
             except Exception as ex:\r
-                logger.warning(ex.message)\r
+                logger.warning("Worker:" + w + " raises exception:" + str(ex))\r
                 continue\r
         self.schedinstance.enter(self.schedinterval, 1, self._repeat)\r
 \r
@@ -59,6 +59,3 @@ class PollWorker(object):
 \r
     def stop(self):\r
         self._stopped = True\r
-\r
-\r
-defaultworker = PollWorker()\r
index 7005b5a..a42946d 100644 (file)
@@ -15,7 +15,7 @@
 from o2ims.service import unit_of_work\r
 \r
 \r
-def ocloud_one(ocloudid: str, uow: unit_of_work.SqlAlchemyUnitOfWork):\r
+def ocloud_one(ocloudid: str, uow: unit_of_work.AbstractUnitOfWork):\r
     with uow:\r
         results = uow.session.execute(\r
             """\r
@@ -26,7 +26,7 @@ def ocloud_one(ocloudid: str, uow: unit_of_work.SqlAlchemyUnitOfWork):
     return dict(results[0]) if len(results) > 0 else None\r
 \r
 \r
-def oclouds(uow: unit_of_work.SqlAlchemyUnitOfWork):\r
+def oclouds(uow: unit_of_work.AbstractUnitOfWork):\r
     with uow:\r
         results = uow.session.execute(\r
             """\r
diff --git a/requirements-stx.txt b/requirements-stx.txt
new file mode 100644 (file)
index 0000000..6364e51
--- /dev/null
@@ -0,0 +1,2 @@
+-e git+https://opendev.org/starlingx/distcloud-client.git@master#egg=distributedcloud-client&subdirectory=distributedcloud-client\r
+-e git+https://opendev.org/starlingx/config.git@master#egg=cgtsclient&subdirectory=sysinv/cgts-client/cgts-client#\r
index ca71bc2..3a14faa 100644 (file)
@@ -6,5 +6,9 @@ cotyledon
 \r
 Cython>=3.0a1\r
 \r
+\r
+httplib2\r
+babel\r
+PrettyTable<0.8,>=0.7.2\r
 # -e git+https://opendev.org/starlingx/distcloud-client.git@master#egg=distributedcloud-client&subdirectory=distributedcloud-client\r
 # -e git+https://opendev.org/starlingx/config.git@master#egg=cgtsclient&subdirectory=sysinv/cgts-client/cgts-client#\r
diff --git a/tests/integration-ocloud/test_watcher_w_stx_sa.py b/tests/integration-ocloud/test_watcher_w_stx_sa.py
new file mode 100644 (file)
index 0000000..d722ee7
--- /dev/null
@@ -0,0 +1,30 @@
+# Copyright (C) 2021 Wind River Systems, Inc.\r
+#\r
+#  Licensed under the Apache License, Version 2.0 (the "License");\r
+#  you may not use this file except in compliance with the License.\r
+#  You may obtain a copy of the License at\r
+#\r
+#      http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+#  Unless required by applicable law or agreed to in writing, software\r
+#  distributed under the License is distributed on an "AS IS" BASIS,\r
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+#  See the License for the specific language governing permissions and\r
+#  limitations under the License.\r
+\r
+from multiprocessing.queues import Queue\r
+import pytest\r
+from o2ims.entrypoints.resource_watcher import start_watchers\r
+from multiprocessing import Process\r
+from multiprocessing import Pipe\r
+# pipe = Pipe()\r
+# q = Queue()\r
+import time\r
+# pytestmark = pytest.mark.usefixtures("mappers")\r
+\r
+\r
+def test_watcher_service():\r
+    testedprocess = Process(target=start_watchers, args=())\r
+    testedprocess.start()\r
+    time.sleep(10)\r
+    testedprocess.terminate()\r
index 8f262b8..25bc3ad 100644 (file)
@@ -23,6 +23,7 @@ import uuid
 import json\r
 from o2ims.adapter.clients.ocloud_sa_client import StxSaOcloudClient\r
 from o2ims.domain import stx_object as ocloudModel\r
+from o2ims.domain.resource_type import ResourceTypeEnum\r
 \r
 # pytestmark = pytest.mark.usefixtures("mappers")\r
 \r
@@ -32,7 +33,7 @@ class FakeStxSaClientImp(object):
         super().__init__()\r
 \r
     def getInstanceInfo(self) -> ocloudModel.StxGenericModel:\r
-        model = ocloudModel.StxGenericModel()\r
+        model = ocloudModel.StxGenericModel(ResourceTypeEnum.OCLOUD)\r
         model.id = uuid.uuid4()\r
         model.name = "stx1"\r
         model.updatetime = datetime.now\r
index baff737..e70d188 100644 (file)
@@ -12,7 +12,6 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
-import pytest
 from o2ims.domain import ocloud
 from o2ims import config
 import uuid
index 4cdfb76..76f5d78 100644 (file)
@@ -16,26 +16,29 @@ import time
 from datetime import datetime\r
 import json\r
 from typing import List\r
+from o2ims.domain.resource_type import ResourceTypeEnum\r
 from o2ims.service.client.base_client import BaseClient\r
-import pytest\r
 from o2ims.domain import ocloud\r
 from o2ims import config\r
 import uuid\r
-from o2ims.service.watcher.base import BaseWatcher, OcloudWather\r
+from o2ims.service.watcher.base import BaseWatcher, OcloudWatcher\r
 from o2ims.domain import stx_object as ocloudModel\r
 from o2ims.adapter.ocloud_repository import OcloudRepository\r
+from o2ims.domain.stx_repo import StxObjectRepository\r
 from o2ims.service.watcher import worker\r
-from o2ims.service.watcher.executor import start_watchers\r
+from o2ims.service.unit_of_work import AbstractUnitOfWork\r
+\r
 \r
 class FakeOcloudClient(BaseClient):\r
     def __init__(self):\r
         super().__init__()\r
-        fakeCloud = ocloudModel.StxGenericModel()\r
+        fakeCloud = ocloudModel.StxGenericModel(ResourceTypeEnum.OCLOUD)\r
         fakeCloud.id = uuid.uuid4()\r
         fakeCloud.name = 'stx1'\r
         fakeCloud.content = json.dumps({})\r
         fakeCloud.createtime = datetime.now()\r
-        fakeCloud.updatetime = datetime.now\r
+        fakeCloud.updatetime = datetime.now()\r
+        fakeCloud.hash = str(hash((fakeCloud.id, fakeCloud.updatetime)))\r
         self.fakeCloud = fakeCloud\r
 \r
     def _get(self, id) -> ocloudModel.StxGenericModel:\r
@@ -44,6 +47,7 @@ class FakeOcloudClient(BaseClient):
     def _list(self):\r
         return [self.fakeCloud]\r
 \r
+\r
 class FakeOcloudRepo(OcloudRepository):\r
     def __init__(self):\r
         super().__init__()\r
@@ -65,15 +69,65 @@ class FakeOcloudRepo(OcloudRepository):
         ocloud1 = filtered.pop()\r
         ocloud1.update_by(ocloud)\r
 \r
+\r
+\r
+class FakeStxObjRepo(StxObjectRepository):\r
+    def __init__(self):\r
+        super().__init__()\r
+        self.oclouds = []\r
+\r
+    def _add(self, ocloud: ocloud.Ocloud):\r
+        self.oclouds.append(ocloud)\r
+\r
+    def _get(self, ocloudid) -> ocloud.Ocloud:\r
+        filtered = [o for o in self.oclouds if o.id == ocloudid]\r
+        return filtered.pop()\r
+\r
+    def _list(self, type: ResourceTypeEnum) -> List[ocloud.Ocloud]:\r
+        return [x for x in self.oclouds]\r
+\r
+    def _update(self, ocloud: ocloud.Ocloud):\r
+        filtered = [o for o in self.oclouds if o.id == ocloud.id]\r
+        assert len(filtered) == 1\r
+        ocloud1 = filtered.pop()\r
+        ocloud1.update_by(ocloud)\r
+\r
+\r
+class FakeUnitOfWork(AbstractUnitOfWork):\r
+    def __init__(self):\r
+        pass\r
+\r
+    def __enter__(self):\r
+        # self.session = self.session_factory()  # type: Session\r
+        # self.oclouds = OcloudSqlAlchemyRepository(self.session)\r
+        self.stxobjects = FakeStxObjRepo()\r
+        return super().__enter__()\r
+\r
+    def __exit__(self, *args):\r
+        super().__exit__(*args)\r
+        # self.session.close()\r
+\r
+    def _commit(self):\r
+        pass\r
+        # self.session.commit()\r
+\r
+    def rollback(self):\r
+        pass\r
+        # self.session.rollback()\r
+\r
+\r
 def test_probe_new_ocloud():\r
-    fakeRepo = FakeOcloudRepo()\r
+    # fakeRepo = FakeOcloudRepo()\r
+    fakeuow = FakeUnitOfWork()\r
     fakeClient = FakeOcloudClient()\r
-    ocloudwatcher = OcloudWather(fakeClient, fakeRepo)\r
+    ocloudwatcher = OcloudWatcher(fakeClient, fakeuow)\r
     ocloudwatcher.probe()\r
-    assert len(fakeRepo.oclouds) == 1\r
-    assert fakeRepo.oclouds[0].name == "stx1"\r
+    assert len(fakeuow.stxobjects.oclouds) == 1\r
+    assert fakeuow.stxobjects.oclouds[0].name == "stx1"\r
+\r
 \r
-def test_default_worker():\r
+def test_watchers_worker():\r
+    testedworker = worker.PollWorker()\r
 \r
     class FakeOCloudWatcher(BaseWatcher):\r
         def __init__(self, client: BaseClient,\r
@@ -85,24 +139,26 @@ def test_default_worker():
 \r
         def _targetname(self):\r
             return "fakeocloudwatcher"\r
-        \r
+\r
         def _probe(self):\r
             self.fakeOcloudWatcherCounter += 1\r
             # hacking to stop the blocking sched task\r
             if self.fakeOcloudWatcherCounter > 2:\r
-                worker.defaultworker.stop()\r
+                testedworker.stop()\r
+\r
 \r
+    # fakeRepo = FakeOcloudRepo()\r
+    fakeuow = FakeUnitOfWork()\r
 \r
-    fakeRepo = FakeOcloudRepo()\r
     fakeClient = FakeOcloudClient()\r
-    fakewatcher = FakeOCloudWatcher(fakeClient, fakeRepo)\r
+    fakewatcher = FakeOCloudWatcher(fakeClient, fakeuow)\r
 \r
-    worker.defaultworker.set_interval(1)\r
-    worker.defaultworker.add_watcher(fakewatcher)\r
+    testedworker.set_interval(1)\r
+    testedworker.add_watcher(fakewatcher)\r
     assert fakewatcher.fakeOcloudWatcherCounter == 0\r
 \r
     count1 = fakewatcher.fakeOcloudWatcherCounter\r
-    worker.defaultworker.start()\r
+    testedworker.start()\r
     time.sleep(20)\r
     assert fakewatcher.fakeOcloudWatcherCounter > count1\r
 \r