Add subscription and notification for resource changes; fix a bug while pserver node...
[pti/o2.git] / tests / unit / test_watcher.py
index 8c1afac..806d20f 100644 (file)
@@ -15,7 +15,7 @@
 import time\r
 from datetime import datetime\r
 import json\r
-from typing import List\r
+from typing import Callable, List\r
 # from o2common.config import config\r
 import uuid\r
 from o2common.service.watcher.base import BaseWatcher, WatcherTree\r
@@ -77,7 +77,6 @@ class FakeOcloudRepo(OcloudRepository):
         ocloud1.update_by(ocloud)\r
 \r
 \r
-\r
 class FakeStxObjRepo(StxObjectRepository):\r
     def __init__(self):\r
         super().__init__()\r
@@ -129,8 +128,9 @@ class FakeUnitOfWork(AbstractUnitOfWork):
 \r
 def create_fake_bus(uow):\r
     def update_ocloud(\r
-        cmd: commands.UpdateOCloud,\r
-        uow: AbstractUnitOfWork):\r
+            cmd: commands.UpdateOCloud,\r
+            uow: AbstractUnitOfWork,\r
+            publish: Callable):\r
         return\r
 \r
     fakeuow = FakeUnitOfWork()\r
@@ -169,7 +169,7 @@ def test_watchers_worker():
         def _targetname(self):\r
             return "fakeocloudwatcher"\r
 \r
-        def _probe(self, parent: object=None):\r
+        def _probe(self, parent: object = None):\r
             # import pdb; pdb.set_trace()\r
             self.fakeOcloudWatcherCounter += 1\r
             # hacking to stop the blocking sched task\r
@@ -177,7 +177,6 @@ def test_watchers_worker():
                 testedworker.stop()\r
             return []\r
 \r
-\r
     # fakeRepo = FakeOcloudRepo()\r
     fakeuow = FakeUnitOfWork()\r
     bus = create_fake_bus(fakeuow)\r