+ ocloudwatcher = OcloudWatcher(fakeClient, bus)\r
+ cmds = ocloudwatcher.probe()\r
+ assert cmds is not None\r
+ assert len(cmds) == 1\r
+ assert cmds[0].data.name == "stx1"\r
+ # assert len(fakeuow.stxobjects.oclouds) == 1\r
+ # assert fakeuow.stxobjects.oclouds[0].name == "stx1"\r
+\r
+\r
+def test_watchers_worker():\r
+ testedworker = worker.PollWorker()\r
+\r
+ class FakeOCloudWatcher(BaseWatcher):\r
+ def __init__(self, client: BaseClient,\r
+ bus: messagebus) -> None:\r
+ super().__init__(client, None)\r
+ self.fakeOcloudWatcherCounter = 0\r
+ self._client = client\r
+ self._bus = bus\r
+\r
+ def _targetname(self):\r
+ return "fakeocloudwatcher"\r
+\r
+ def _probe(self, parent: object=None):\r
+ # import pdb; pdb.set_trace()\r
+ self.fakeOcloudWatcherCounter += 1\r
+ # hacking to stop the blocking sched task\r
+ if self.fakeOcloudWatcherCounter > 2:\r
+ testedworker.stop()\r
+ return []\r
+\r
+\r
+ # fakeRepo = FakeOcloudRepo()\r
+ fakeuow = FakeUnitOfWork()\r
+ bus = create_fake_bus(fakeuow)\r
+\r
+ fakeClient = FakeOcloudClient()\r
+ fakewatcher = FakeOCloudWatcher(fakeClient, bus)\r
+\r
+ root = WatcherTree(fakewatcher)\r
+\r
+ testedworker.set_interval(1)\r
+ testedworker.add_watcher(root)\r
+ assert fakewatcher.fakeOcloudWatcherCounter == 0\r
+\r
+ count1 = fakewatcher.fakeOcloudWatcherCounter\r
+ testedworker.start()\r
+ time.sleep(20)\r
+ assert fakewatcher.fakeOcloudWatcherCounter > count1\r
+\r
+ # assumed hacking: probe has stopped the sched task\r
+ count3 = fakewatcher.fakeOcloudWatcherCounter\r
+ time.sleep(3)\r
+ assert fakewatcher.fakeOcloudWatcherCounter == count3\r