from datetime import datetime\r
import json\r
from typing import List\r
+from o2ims.service import handlers\r
from o2ims.domain.resource_type import ResourceTypeEnum\r
from o2ims.service.client.base_client import BaseClient\r
from o2ims.domain import ocloud\r
from o2ims import config\r
import uuid\r
-from o2ims.service.watcher.base import BaseWatcher, OcloudWatcher\r
+from o2ims.service.watcher.base import BaseWatcher, WatcherTree\r
from o2ims.domain import stx_object as ocloudModel\r
from o2ims.adapter.ocloud_repository import OcloudRepository\r
from o2ims.domain.stx_repo import StxObjectRepository\r
from o2ims.service.watcher import worker\r
from o2ims.service.unit_of_work import AbstractUnitOfWork\r
-\r
+from o2ims.service.watcher.ocloud_watcher import OcloudWatcher\r
+from o2ims.service import messagebus\r
+from o2ims import bootstrap\r
+from o2ims.domain import commands\r
\r
class FakeOcloudClient(BaseClient):\r
def __init__(self):\r
pass\r
# self.session.rollback()\r
\r
+ def collect_new_events(self):\r
+ yield\r
+ # return super().collect_new_events()\r
+\r
+\r
+def create_fake_bus(uow):\r
+ def update_ocloud(\r
+ cmd: commands.UpdateOCloud,\r
+ uow: AbstractUnitOfWork):\r
+ return\r
+\r
+ fakeuow = FakeUnitOfWork()\r
+ handlers.EVENT_HANDLERS = {}\r
+ handlers.COMMAND_HANDLERS = {\r
+ commands.UpdateOCloud: update_ocloud,\r
+ }\r
+ bus = bootstrap.bootstrap(False, fakeuow)\r
+ return bus\r
+\r
\r
def test_probe_new_ocloud():\r
- # fakeRepo = FakeOcloudRepo()\r
fakeuow = FakeUnitOfWork()\r
+ bus = create_fake_bus(fakeuow)\r
fakeClient = FakeOcloudClient()\r
- ocloudwatcher = OcloudWatcher(fakeClient, fakeuow)\r
- ocloudwatcher.probe()\r
- assert len(fakeuow.stxobjects.oclouds) == 1\r
- assert fakeuow.stxobjects.oclouds[0].name == "stx1"\r
+ ocloudwatcher = OcloudWatcher(fakeClient, bus)\r
+ cmds = ocloudwatcher.probe()\r
+ assert cmds is not None\r
+ assert len(cmds) == 1\r
+ assert cmds[0].data.name == "stx1"\r
+ # assert len(fakeuow.stxobjects.oclouds) == 1\r
+ # assert fakeuow.stxobjects.oclouds[0].name == "stx1"\r
\r
\r
def test_watchers_worker():\r
\r
class FakeOCloudWatcher(BaseWatcher):\r
def __init__(self, client: BaseClient,\r
- repo: OcloudRepository) -> None:\r
- super().__init__(client)\r
+ bus: messagebus) -> None:\r
+ super().__init__(client, None)\r
self.fakeOcloudWatcherCounter = 0\r
self._client = client\r
- self._repo = repo\r
+ self._bus = bus\r
\r
def _targetname(self):\r
return "fakeocloudwatcher"\r
\r
- def _probe(self):\r
+ def _probe(self, parent: object=None):\r
+ # import pdb; pdb.set_trace()\r
self.fakeOcloudWatcherCounter += 1\r
# hacking to stop the blocking sched task\r
if self.fakeOcloudWatcherCounter > 2:\r
testedworker.stop()\r
+ return []\r
\r
\r
# fakeRepo = FakeOcloudRepo()\r
fakeuow = FakeUnitOfWork()\r
+ bus = create_fake_bus(fakeuow)\r
\r
fakeClient = FakeOcloudClient()\r
- fakewatcher = FakeOCloudWatcher(fakeClient, fakeuow)\r
+ fakewatcher = FakeOCloudWatcher(fakeClient, bus)\r
+\r
+ root = WatcherTree(fakewatcher)\r
\r
testedworker.set_interval(1)\r
- testedworker.add_watcher(fakewatcher)\r
+ testedworker.add_watcher(root)\r
assert fakewatcher.fakeOcloudWatcherCounter == 0\r
\r
count1 = fakewatcher.fakeOcloudWatcherCounter\r