Update watcher worker
[pti/o2.git] / tests / unit / test_watcher.py
index 4cdfb76..76f5d78 100644 (file)
@@ -16,26 +16,29 @@ import time
 from datetime import datetime\r
 import json\r
 from typing import List\r
+from o2ims.domain.resource_type import ResourceTypeEnum\r
 from o2ims.service.client.base_client import BaseClient\r
-import pytest\r
 from o2ims.domain import ocloud\r
 from o2ims import config\r
 import uuid\r
-from o2ims.service.watcher.base import BaseWatcher, OcloudWather\r
+from o2ims.service.watcher.base import BaseWatcher, OcloudWatcher\r
 from o2ims.domain import stx_object as ocloudModel\r
 from o2ims.adapter.ocloud_repository import OcloudRepository\r
+from o2ims.domain.stx_repo import StxObjectRepository\r
 from o2ims.service.watcher import worker\r
-from o2ims.service.watcher.executor import start_watchers\r
+from o2ims.service.unit_of_work import AbstractUnitOfWork\r
+\r
 \r
 class FakeOcloudClient(BaseClient):\r
     def __init__(self):\r
         super().__init__()\r
-        fakeCloud = ocloudModel.StxGenericModel()\r
+        fakeCloud = ocloudModel.StxGenericModel(ResourceTypeEnum.OCLOUD)\r
         fakeCloud.id = uuid.uuid4()\r
         fakeCloud.name = 'stx1'\r
         fakeCloud.content = json.dumps({})\r
         fakeCloud.createtime = datetime.now()\r
-        fakeCloud.updatetime = datetime.now\r
+        fakeCloud.updatetime = datetime.now()\r
+        fakeCloud.hash = str(hash((fakeCloud.id, fakeCloud.updatetime)))\r
         self.fakeCloud = fakeCloud\r
 \r
     def _get(self, id) -> ocloudModel.StxGenericModel:\r
@@ -44,6 +47,7 @@ class FakeOcloudClient(BaseClient):
     def _list(self):\r
         return [self.fakeCloud]\r
 \r
+\r
 class FakeOcloudRepo(OcloudRepository):\r
     def __init__(self):\r
         super().__init__()\r
@@ -65,15 +69,65 @@ class FakeOcloudRepo(OcloudRepository):
         ocloud1 = filtered.pop()\r
         ocloud1.update_by(ocloud)\r
 \r
+\r
+\r
+class FakeStxObjRepo(StxObjectRepository):\r
+    def __init__(self):\r
+        super().__init__()\r
+        self.oclouds = []\r
+\r
+    def _add(self, ocloud: ocloud.Ocloud):\r
+        self.oclouds.append(ocloud)\r
+\r
+    def _get(self, ocloudid) -> ocloud.Ocloud:\r
+        filtered = [o for o in self.oclouds if o.id == ocloudid]\r
+        return filtered.pop()\r
+\r
+    def _list(self, type: ResourceTypeEnum) -> List[ocloud.Ocloud]:\r
+        return [x for x in self.oclouds]\r
+\r
+    def _update(self, ocloud: ocloud.Ocloud):\r
+        filtered = [o for o in self.oclouds if o.id == ocloud.id]\r
+        assert len(filtered) == 1\r
+        ocloud1 = filtered.pop()\r
+        ocloud1.update_by(ocloud)\r
+\r
+\r
+class FakeUnitOfWork(AbstractUnitOfWork):\r
+    def __init__(self):\r
+        pass\r
+\r
+    def __enter__(self):\r
+        # self.session = self.session_factory()  # type: Session\r
+        # self.oclouds = OcloudSqlAlchemyRepository(self.session)\r
+        self.stxobjects = FakeStxObjRepo()\r
+        return super().__enter__()\r
+\r
+    def __exit__(self, *args):\r
+        super().__exit__(*args)\r
+        # self.session.close()\r
+\r
+    def _commit(self):\r
+        pass\r
+        # self.session.commit()\r
+\r
+    def rollback(self):\r
+        pass\r
+        # self.session.rollback()\r
+\r
+\r
 def test_probe_new_ocloud():\r
-    fakeRepo = FakeOcloudRepo()\r
+    # fakeRepo = FakeOcloudRepo()\r
+    fakeuow = FakeUnitOfWork()\r
     fakeClient = FakeOcloudClient()\r
-    ocloudwatcher = OcloudWather(fakeClient, fakeRepo)\r
+    ocloudwatcher = OcloudWatcher(fakeClient, fakeuow)\r
     ocloudwatcher.probe()\r
-    assert len(fakeRepo.oclouds) == 1\r
-    assert fakeRepo.oclouds[0].name == "stx1"\r
+    assert len(fakeuow.stxobjects.oclouds) == 1\r
+    assert fakeuow.stxobjects.oclouds[0].name == "stx1"\r
+\r
 \r
-def test_default_worker():\r
+def test_watchers_worker():\r
+    testedworker = worker.PollWorker()\r
 \r
     class FakeOCloudWatcher(BaseWatcher):\r
         def __init__(self, client: BaseClient,\r
@@ -85,24 +139,26 @@ def test_default_worker():
 \r
         def _targetname(self):\r
             return "fakeocloudwatcher"\r
-        \r
+\r
         def _probe(self):\r
             self.fakeOcloudWatcherCounter += 1\r
             # hacking to stop the blocking sched task\r
             if self.fakeOcloudWatcherCounter > 2:\r
-                worker.defaultworker.stop()\r
+                testedworker.stop()\r
+\r
 \r
+    # fakeRepo = FakeOcloudRepo()\r
+    fakeuow = FakeUnitOfWork()\r
 \r
-    fakeRepo = FakeOcloudRepo()\r
     fakeClient = FakeOcloudClient()\r
-    fakewatcher = FakeOCloudWatcher(fakeClient, fakeRepo)\r
+    fakewatcher = FakeOCloudWatcher(fakeClient, fakeuow)\r
 \r
-    worker.defaultworker.set_interval(1)\r
-    worker.defaultworker.add_watcher(fakewatcher)\r
+    testedworker.set_interval(1)\r
+    testedworker.add_watcher(fakewatcher)\r
     assert fakewatcher.fakeOcloudWatcherCounter == 0\r
 \r
     count1 = fakewatcher.fakeOcloudWatcherCounter\r
-    worker.defaultworker.start()\r
+    testedworker.start()\r
     time.sleep(20)\r
     assert fakewatcher.fakeOcloudWatcherCounter > count1\r
 \r