Convert file endlines to Unix (LF)
[pti/o2.git] / tests / unit / test_watcher.py
index 806d20f..2fe82ab 100644 (file)
-# Copyright (C) 2021 Wind River Systems, Inc.\r
-#\r
-#  Licensed under the Apache License, Version 2.0 (the "License");\r
-#  you may not use this file except in compliance with the License.\r
-#  You may obtain a copy of the License at\r
-#\r
-#      http://www.apache.org/licenses/LICENSE-2.0\r
-#\r
-#  Unless required by applicable law or agreed to in writing, software\r
-#  distributed under the License is distributed on an "AS IS" BASIS,\r
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
-#  See the License for the specific language governing permissions and\r
-#  limitations under the License.\r
-\r
-import time\r
-from datetime import datetime\r
-import json\r
-from typing import Callable, List\r
-# from o2common.config import config\r
-import uuid\r
-from o2common.service.watcher.base import BaseWatcher, WatcherTree\r
-from o2common.service.watcher import worker\r
-from o2common.service.unit_of_work import AbstractUnitOfWork\r
-from o2common.service import messagebus\r
-\r
-from o2ims.domain.resource_type import ResourceTypeEnum\r
-from o2ims.domain import stx_object as ocloudModel\r
-from o2ims.adapter.ocloud_repository import OcloudRepository\r
-from o2ims.domain.stx_repo import StxObjectRepository\r
-from o2ims.service.watcher.ocloud_watcher import OcloudWatcher\r
-from o2ims.domain import commands\r
-from o2common.service.client.base_client import BaseClient\r
-from o2ims.domain import ocloud\r
-\r
-from o2app.service import handlers\r
-from o2app import bootstrap\r
-\r
-\r
-class FakeOcloudClient(BaseClient):\r
-    def __init__(self):\r
-        super().__init__()\r
-        fakeCloud = ocloudModel.StxGenericModel(ResourceTypeEnum.OCLOUD)\r
-        fakeCloud.id = uuid.uuid4()\r
-        fakeCloud.name = 'stx1'\r
-        fakeCloud.content = json.dumps({})\r
-        fakeCloud.createtime = datetime.now()\r
-        fakeCloud.updatetime = datetime.now()\r
-        fakeCloud.hash = str(hash((fakeCloud.id, fakeCloud.updatetime)))\r
-        self.fakeCloud = fakeCloud\r
-\r
-    def _get(self, id) -> ocloudModel.StxGenericModel:\r
-        return self.fakeCloud\r
-\r
-    def _list(self):\r
-        return [self.fakeCloud]\r
-\r
-\r
-class FakeOcloudRepo(OcloudRepository):\r
-    def __init__(self):\r
-        super().__init__()\r
-        self.oclouds = []\r
-\r
-    def _add(self, ocloud: ocloud.Ocloud):\r
-        self.oclouds.append(ocloud)\r
-\r
-    def _get(self, ocloudid) -> ocloud.Ocloud:\r
-        filtered = [o for o in self.oclouds if o.id == ocloudid]\r
-        return filtered.pop()\r
-\r
-    def _list(self) -> List[ocloud.Ocloud]:\r
-        return [x for x in self.oclouds]\r
-\r
-    def _update(self, ocloud: ocloud.Ocloud):\r
-        filtered = [o for o in self.oclouds if o.id == ocloud.id]\r
-        assert len(filtered) == 1\r
-        ocloud1 = filtered.pop()\r
-        ocloud1.update_by(ocloud)\r
-\r
-\r
-class FakeStxObjRepo(StxObjectRepository):\r
-    def __init__(self):\r
-        super().__init__()\r
-        self.oclouds = []\r
-\r
-    def _add(self, ocloud: ocloud.Ocloud):\r
-        self.oclouds.append(ocloud)\r
-\r
-    def _get(self, ocloudid) -> ocloud.Ocloud:\r
-        filtered = [o for o in self.oclouds if o.id == ocloudid]\r
-        return filtered.pop()\r
-\r
-    def _list(self, type: ResourceTypeEnum) -> List[ocloud.Ocloud]:\r
-        return [x for x in self.oclouds]\r
-\r
-    def _update(self, ocloud: ocloud.Ocloud):\r
-        filtered = [o for o in self.oclouds if o.id == ocloud.id]\r
-        assert len(filtered) == 1\r
-        ocloud1 = filtered.pop()\r
-        ocloud1.update_by(ocloud)\r
-\r
-\r
-class FakeUnitOfWork(AbstractUnitOfWork):\r
-    def __init__(self):\r
-        pass\r
-\r
-    def __enter__(self):\r
-        # self.session = self.session_factory()  # type: Session\r
-        # self.oclouds = OcloudSqlAlchemyRepository(self.session)\r
-        self.stxobjects = FakeStxObjRepo()\r
-        return super().__enter__()\r
-\r
-    def __exit__(self, *args):\r
-        super().__exit__(*args)\r
-        # self.session.close()\r
-\r
-    def _commit(self):\r
-        pass\r
-        # self.session.commit()\r
-\r
-    def rollback(self):\r
-        pass\r
-        # self.session.rollback()\r
-\r
-    def collect_new_events(self):\r
-        yield\r
-        # return super().collect_new_events()\r
-\r
-\r
-def create_fake_bus(uow):\r
-    def update_ocloud(\r
-            cmd: commands.UpdateOCloud,\r
-            uow: AbstractUnitOfWork,\r
-            publish: Callable):\r
-        return\r
-\r
-    fakeuow = FakeUnitOfWork()\r
-    handlers.EVENT_HANDLERS = {}\r
-    handlers.COMMAND_HANDLERS = {\r
-        commands.UpdateOCloud: update_ocloud,\r
-    }\r
-    bus = bootstrap.bootstrap(False, fakeuow)\r
-    return bus\r
-\r
-\r
-def test_probe_new_ocloud():\r
-    fakeuow = FakeUnitOfWork()\r
-    bus = create_fake_bus(fakeuow)\r
-    fakeClient = FakeOcloudClient()\r
-    ocloudwatcher = OcloudWatcher(fakeClient, bus)\r
-    cmds = ocloudwatcher.probe()\r
-    assert cmds is not None\r
-    assert len(cmds) == 1\r
-    assert cmds[0].data.name == "stx1"\r
-    # assert len(fakeuow.stxobjects.oclouds) == 1\r
-    # assert fakeuow.stxobjects.oclouds[0].name == "stx1"\r
-\r
-\r
-def test_watchers_worker():\r
-    testedworker = worker.PollWorker()\r
-\r
-    class FakeOCloudWatcher(BaseWatcher):\r
-        def __init__(self, client: BaseClient,\r
-                     bus: messagebus) -> None:\r
-            super().__init__(client, None)\r
-            self.fakeOcloudWatcherCounter = 0\r
-            self._client = client\r
-            self._bus = bus\r
-\r
-        def _targetname(self):\r
-            return "fakeocloudwatcher"\r
-\r
-        def _probe(self, parent: object = None):\r
-            # import pdb; pdb.set_trace()\r
-            self.fakeOcloudWatcherCounter += 1\r
-            # hacking to stop the blocking sched task\r
-            if self.fakeOcloudWatcherCounter > 2:\r
-                testedworker.stop()\r
-            return []\r
-\r
-    # fakeRepo = FakeOcloudRepo()\r
-    fakeuow = FakeUnitOfWork()\r
-    bus = create_fake_bus(fakeuow)\r
-\r
-    fakeClient = FakeOcloudClient()\r
-    fakewatcher = FakeOCloudWatcher(fakeClient, bus)\r
-\r
-    root = WatcherTree(fakewatcher)\r
-\r
-    testedworker.set_interval(1)\r
-    testedworker.add_watcher(root)\r
-    assert fakewatcher.fakeOcloudWatcherCounter == 0\r
-\r
-    count1 = fakewatcher.fakeOcloudWatcherCounter\r
-    testedworker.start()\r
-    time.sleep(20)\r
-    assert fakewatcher.fakeOcloudWatcherCounter > count1\r
-\r
-    # assumed hacking: probe has stopped the sched task\r
-    count3 = fakewatcher.fakeOcloudWatcherCounter\r
-    time.sleep(3)\r
-    assert fakewatcher.fakeOcloudWatcherCounter == count3\r
+# Copyright (C) 2021 Wind River Systems, Inc.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import time
+from datetime import datetime
+import json
+from typing import Callable, List
+# from o2common.config import config
+import uuid
+from o2common.service.watcher.base import BaseWatcher, WatcherTree
+from o2common.service.watcher import worker
+from o2common.service.unit_of_work import AbstractUnitOfWork
+from o2common.service import messagebus
+
+from o2ims.domain.resource_type import ResourceTypeEnum
+from o2ims.domain import stx_object as ocloudModel
+from o2ims.adapter.ocloud_repository import OcloudRepository
+from o2ims.domain.stx_repo import StxObjectRepository
+from o2ims.service.watcher.ocloud_watcher import OcloudWatcher
+from o2ims.domain import commands
+from o2common.service.client.base_client import BaseClient
+from o2ims.domain import ocloud
+
+from o2app.service import handlers
+from o2app import bootstrap
+
+
+class FakeOcloudClient(BaseClient):
+    def __init__(self):
+        super().__init__()
+        fakeCloud = ocloudModel.StxGenericModel(ResourceTypeEnum.OCLOUD)
+        fakeCloud.id = uuid.uuid4()
+        fakeCloud.name = 'stx1'
+        fakeCloud.content = json.dumps({})
+        fakeCloud.createtime = datetime.now()
+        fakeCloud.updatetime = datetime.now()
+        fakeCloud.hash = str(hash((fakeCloud.id, fakeCloud.updatetime)))
+        self.fakeCloud = fakeCloud
+
+    def _get(self, id) -> ocloudModel.StxGenericModel:
+        return self.fakeCloud
+
+    def _list(self):
+        return [self.fakeCloud]
+
+
+class FakeOcloudRepo(OcloudRepository):
+    def __init__(self):
+        super().__init__()
+        self.oclouds = []
+
+    def _add(self, ocloud: ocloud.Ocloud):
+        self.oclouds.append(ocloud)
+
+    def _get(self, ocloudid) -> ocloud.Ocloud:
+        filtered = [o for o in self.oclouds if o.id == ocloudid]
+        return filtered.pop()
+
+    def _list(self) -> List[ocloud.Ocloud]:
+        return [x for x in self.oclouds]
+
+    def _update(self, ocloud: ocloud.Ocloud):
+        filtered = [o for o in self.oclouds if o.id == ocloud.id]
+        assert len(filtered) == 1
+        ocloud1 = filtered.pop()
+        ocloud1.update_by(ocloud)
+
+
+class FakeStxObjRepo(StxObjectRepository):
+    def __init__(self):
+        super().__init__()
+        self.oclouds = []
+
+    def _add(self, ocloud: ocloud.Ocloud):
+        self.oclouds.append(ocloud)
+
+    def _get(self, ocloudid) -> ocloud.Ocloud:
+        filtered = [o for o in self.oclouds if o.id == ocloudid]
+        return filtered.pop()
+
+    def _list(self, type: ResourceTypeEnum) -> List[ocloud.Ocloud]:
+        return [x for x in self.oclouds]
+
+    def _update(self, ocloud: ocloud.Ocloud):
+        filtered = [o for o in self.oclouds if o.id == ocloud.id]
+        assert len(filtered) == 1
+        ocloud1 = filtered.pop()
+        ocloud1.update_by(ocloud)
+
+
+class FakeUnitOfWork(AbstractUnitOfWork):
+    def __init__(self):
+        pass
+
+    def __enter__(self):
+        # self.session = self.session_factory()  # type: Session
+        # self.oclouds = OcloudSqlAlchemyRepository(self.session)
+        self.stxobjects = FakeStxObjRepo()
+        return super().__enter__()
+
+    def __exit__(self, *args):
+        super().__exit__(*args)
+        # self.session.close()
+
+    def _commit(self):
+        pass
+        # self.session.commit()
+
+    def rollback(self):
+        pass
+        # self.session.rollback()
+
+    def collect_new_events(self):
+        yield
+        # return super().collect_new_events()
+
+
+def create_fake_bus(uow):
+    def update_ocloud(
+            cmd: commands.UpdateOCloud,
+            uow: AbstractUnitOfWork,
+            publish: Callable):
+        return
+
+    fakeuow = FakeUnitOfWork()
+    handlers.EVENT_HANDLERS = {}
+    handlers.COMMAND_HANDLERS = {
+        commands.UpdateOCloud: update_ocloud,
+    }
+    bus = bootstrap.bootstrap(False, fakeuow)
+    return bus
+
+
+def test_probe_new_ocloud():
+    fakeuow = FakeUnitOfWork()
+    bus = create_fake_bus(fakeuow)
+    fakeClient = FakeOcloudClient()
+    ocloudwatcher = OcloudWatcher(fakeClient, bus)
+    cmds = ocloudwatcher.probe()
+    assert cmds is not None
+    assert len(cmds) == 1
+    assert cmds[0].data.name == "stx1"
+    # assert len(fakeuow.stxobjects.oclouds) == 1
+    # assert fakeuow.stxobjects.oclouds[0].name == "stx1"
+
+
+def test_watchers_worker():
+    testedworker = worker.PollWorker()
+
+    class FakeOCloudWatcher(BaseWatcher):
+        def __init__(self, client: BaseClient,
+                     bus: messagebus) -> None:
+            super().__init__(client, None)
+            self.fakeOcloudWatcherCounter = 0
+            self._client = client
+            self._bus = bus
+
+        def _targetname(self):
+            return "fakeocloudwatcher"
+
+        def _probe(self, parent: object = None):
+            # import pdb; pdb.set_trace()
+            self.fakeOcloudWatcherCounter += 1
+            # hacking to stop the blocking sched task
+            if self.fakeOcloudWatcherCounter > 2:
+                testedworker.stop()
+            return []
+
+    # fakeRepo = FakeOcloudRepo()
+    fakeuow = FakeUnitOfWork()
+    bus = create_fake_bus(fakeuow)
+
+    fakeClient = FakeOcloudClient()
+    fakewatcher = FakeOCloudWatcher(fakeClient, bus)
+
+    root = WatcherTree(fakewatcher)
+
+    testedworker.set_interval(1)
+    testedworker.add_watcher(root)
+    assert fakewatcher.fakeOcloudWatcherCounter == 0
+
+    count1 = fakewatcher.fakeOcloudWatcherCounter
+    testedworker.start()
+    time.sleep(20)
+    assert fakewatcher.fakeOcloudWatcherCounter > count1
+
+    # assumed hacking: probe has stopped the sched task
+    count3 = fakewatcher.fakeOcloudWatcherCounter
+    time.sleep(3)
+    assert fakewatcher.fakeOcloudWatcherCounter == count3