Add subscription and notification for resource changes; fix a bug while pserver node...
[pti/o2.git] / tests / unit / test_watcher.py
index ec28519..806d20f 100644 (file)
 #  See the License for the specific language governing permissions and\r
 #  limitations under the License.\r
 \r
+import time\r
 from datetime import datetime\r
 import json\r
-from typing import List\r
-from o2ims.service.client.base_client import BaseClient\r
-import pytest\r
-from o2ims.domain import ocloud\r
-from o2ims import config\r
+from typing import Callable, List\r
+# from o2common.config import config\r
 import uuid\r
-from o2ims.service.watcher.base import OcloudWather\r
+from o2common.service.watcher.base import BaseWatcher, WatcherTree\r
+from o2common.service.watcher import worker\r
+from o2common.service.unit_of_work import AbstractUnitOfWork\r
+from o2common.service import messagebus\r
+\r
+from o2ims.domain.resource_type import ResourceTypeEnum\r
 from o2ims.domain import stx_object as ocloudModel\r
 from o2ims.adapter.ocloud_repository import OcloudRepository\r
+from o2ims.domain.stx_repo import StxObjectRepository\r
+from o2ims.service.watcher.ocloud_watcher import OcloudWatcher\r
+from o2ims.domain import commands\r
+from o2common.service.client.base_client import BaseClient\r
+from o2ims.domain import ocloud\r
+\r
+from o2app.service import handlers\r
+from o2app import bootstrap\r
+\r
 \r
 class FakeOcloudClient(BaseClient):\r
     def __init__(self):\r
         super().__init__()\r
-        fakeCloud = ocloudModel.StxGenericModel()\r
+        fakeCloud = ocloudModel.StxGenericModel(ResourceTypeEnum.OCLOUD)\r
         fakeCloud.id = uuid.uuid4()\r
         fakeCloud.name = 'stx1'\r
         fakeCloud.content = json.dumps({})\r
         fakeCloud.createtime = datetime.now()\r
-        fakeCloud.updatetime = datetime.now\r
+        fakeCloud.updatetime = datetime.now()\r
+        fakeCloud.hash = str(hash((fakeCloud.id, fakeCloud.updatetime)))\r
         self.fakeCloud = fakeCloud\r
 \r
     def _get(self, id) -> ocloudModel.StxGenericModel:\r
@@ -41,6 +54,7 @@ class FakeOcloudClient(BaseClient):
     def _list(self):\r
         return [self.fakeCloud]\r
 \r
+\r
 class FakeOcloudRepo(OcloudRepository):\r
     def __init__(self):\r
         super().__init__()\r
@@ -62,10 +76,126 @@ class FakeOcloudRepo(OcloudRepository):
         ocloud1 = filtered.pop()\r
         ocloud1.update_by(ocloud)\r
 \r
+\r
+class FakeStxObjRepo(StxObjectRepository):\r
+    def __init__(self):\r
+        super().__init__()\r
+        self.oclouds = []\r
+\r
+    def _add(self, ocloud: ocloud.Ocloud):\r
+        self.oclouds.append(ocloud)\r
+\r
+    def _get(self, ocloudid) -> ocloud.Ocloud:\r
+        filtered = [o for o in self.oclouds if o.id == ocloudid]\r
+        return filtered.pop()\r
+\r
+    def _list(self, type: ResourceTypeEnum) -> List[ocloud.Ocloud]:\r
+        return [x for x in self.oclouds]\r
+\r
+    def _update(self, ocloud: ocloud.Ocloud):\r
+        filtered = [o for o in self.oclouds if o.id == ocloud.id]\r
+        assert len(filtered) == 1\r
+        ocloud1 = filtered.pop()\r
+        ocloud1.update_by(ocloud)\r
+\r
+\r
+class FakeUnitOfWork(AbstractUnitOfWork):\r
+    def __init__(self):\r
+        pass\r
+\r
+    def __enter__(self):\r
+        # self.session = self.session_factory()  # type: Session\r
+        # self.oclouds = OcloudSqlAlchemyRepository(self.session)\r
+        self.stxobjects = FakeStxObjRepo()\r
+        return super().__enter__()\r
+\r
+    def __exit__(self, *args):\r
+        super().__exit__(*args)\r
+        # self.session.close()\r
+\r
+    def _commit(self):\r
+        pass\r
+        # self.session.commit()\r
+\r
+    def rollback(self):\r
+        pass\r
+        # self.session.rollback()\r
+\r
+    def collect_new_events(self):\r
+        yield\r
+        # return super().collect_new_events()\r
+\r
+\r
+def create_fake_bus(uow):\r
+    def update_ocloud(\r
+            cmd: commands.UpdateOCloud,\r
+            uow: AbstractUnitOfWork,\r
+            publish: Callable):\r
+        return\r
+\r
+    fakeuow = FakeUnitOfWork()\r
+    handlers.EVENT_HANDLERS = {}\r
+    handlers.COMMAND_HANDLERS = {\r
+        commands.UpdateOCloud: update_ocloud,\r
+    }\r
+    bus = bootstrap.bootstrap(False, fakeuow)\r
+    return bus\r
+\r
+\r
 def test_probe_new_ocloud():\r
-    fakeRepo = FakeOcloudRepo()\r
+    fakeuow = FakeUnitOfWork()\r
+    bus = create_fake_bus(fakeuow)\r
     fakeClient = FakeOcloudClient()\r
-    ocloudwatcher = OcloudWather(fakeClient, fakeRepo)\r
-    ocloudwatcher.probe()\r
-    assert len(fakeRepo.oclouds) == 1\r
-    assert fakeRepo.oclouds[0].name == "stx1"\r
+    ocloudwatcher = OcloudWatcher(fakeClient, bus)\r
+    cmds = ocloudwatcher.probe()\r
+    assert cmds is not None\r
+    assert len(cmds) == 1\r
+    assert cmds[0].data.name == "stx1"\r
+    # assert len(fakeuow.stxobjects.oclouds) == 1\r
+    # assert fakeuow.stxobjects.oclouds[0].name == "stx1"\r
+\r
+\r
+def test_watchers_worker():\r
+    testedworker = worker.PollWorker()\r
+\r
+    class FakeOCloudWatcher(BaseWatcher):\r
+        def __init__(self, client: BaseClient,\r
+                     bus: messagebus) -> None:\r
+            super().__init__(client, None)\r
+            self.fakeOcloudWatcherCounter = 0\r
+            self._client = client\r
+            self._bus = bus\r
+\r
+        def _targetname(self):\r
+            return "fakeocloudwatcher"\r
+\r
+        def _probe(self, parent: object = None):\r
+            # import pdb; pdb.set_trace()\r
+            self.fakeOcloudWatcherCounter += 1\r
+            # hacking to stop the blocking sched task\r
+            if self.fakeOcloudWatcherCounter > 2:\r
+                testedworker.stop()\r
+            return []\r
+\r
+    # fakeRepo = FakeOcloudRepo()\r
+    fakeuow = FakeUnitOfWork()\r
+    bus = create_fake_bus(fakeuow)\r
+\r
+    fakeClient = FakeOcloudClient()\r
+    fakewatcher = FakeOCloudWatcher(fakeClient, bus)\r
+\r
+    root = WatcherTree(fakewatcher)\r
+\r
+    testedworker.set_interval(1)\r
+    testedworker.add_watcher(root)\r
+    assert fakewatcher.fakeOcloudWatcherCounter == 0\r
+\r
+    count1 = fakewatcher.fakeOcloudWatcherCounter\r
+    testedworker.start()\r
+    time.sleep(20)\r
+    assert fakewatcher.fakeOcloudWatcherCounter > count1\r
+\r
+    # assumed hacking: probe has stopped the sched task\r
+    count3 = fakewatcher.fakeOcloudWatcherCounter\r
+    time.sleep(3)\r
+    assert fakewatcher.fakeOcloudWatcherCounter == count3\r