import time\r
from datetime import datetime\r
import json\r
-from typing import List\r
-from o2ims.service.client.base_client import BaseClient\r
-import pytest\r
-from o2ims.domain import ocloud\r
-from o2ims import config\r
+from typing import Callable, List\r
+# from o2common.config import config\r
import uuid\r
-from o2ims.service.watcher.base import BaseWatcher, OcloudWather\r
+from o2common.service.watcher.base import BaseWatcher, WatcherTree\r
+from o2common.service.watcher import worker\r
+from o2common.service.unit_of_work import AbstractUnitOfWork\r
+from o2common.service import messagebus\r
+\r
+from o2ims.domain.resource_type import ResourceTypeEnum\r
from o2ims.domain import stx_object as ocloudModel\r
from o2ims.adapter.ocloud_repository import OcloudRepository\r
-from o2ims.service.watcher import worker\r
-from o2ims.service.watcher.executor import start_watchers\r
+from o2ims.domain.stx_repo import StxObjectRepository\r
+from o2ims.service.watcher.ocloud_watcher import OcloudWatcher\r
+from o2ims.domain import commands\r
+from o2common.service.client.base_client import BaseClient\r
+from o2ims.domain import ocloud\r
+\r
+from o2app.service import handlers\r
+from o2app import bootstrap\r
+\r
\r
class FakeOcloudClient(BaseClient):\r
def __init__(self):\r
super().__init__()\r
- fakeCloud = ocloudModel.StxGenericModel()\r
+ fakeCloud = ocloudModel.StxGenericModel(ResourceTypeEnum.OCLOUD)\r
fakeCloud.id = uuid.uuid4()\r
fakeCloud.name = 'stx1'\r
fakeCloud.content = json.dumps({})\r
fakeCloud.createtime = datetime.now()\r
- fakeCloud.updatetime = datetime.now\r
+ fakeCloud.updatetime = datetime.now()\r
+ fakeCloud.hash = str(hash((fakeCloud.id, fakeCloud.updatetime)))\r
self.fakeCloud = fakeCloud\r
\r
def _get(self, id) -> ocloudModel.StxGenericModel:\r
def _list(self):\r
return [self.fakeCloud]\r
\r
+\r
class FakeOcloudRepo(OcloudRepository):\r
def __init__(self):\r
super().__init__()\r
ocloud1 = filtered.pop()\r
ocloud1.update_by(ocloud)\r
\r
+\r
+class FakeStxObjRepo(StxObjectRepository):\r
+ def __init__(self):\r
+ super().__init__()\r
+ self.oclouds = []\r
+\r
+ def _add(self, ocloud: ocloud.Ocloud):\r
+ self.oclouds.append(ocloud)\r
+\r
+ def _get(self, ocloudid) -> ocloud.Ocloud:\r
+ filtered = [o for o in self.oclouds if o.id == ocloudid]\r
+ return filtered.pop()\r
+\r
+ def _list(self, type: ResourceTypeEnum) -> List[ocloud.Ocloud]:\r
+ return [x for x in self.oclouds]\r
+\r
+ def _update(self, ocloud: ocloud.Ocloud):\r
+ filtered = [o for o in self.oclouds if o.id == ocloud.id]\r
+ assert len(filtered) == 1\r
+ ocloud1 = filtered.pop()\r
+ ocloud1.update_by(ocloud)\r
+\r
+\r
+class FakeUnitOfWork(AbstractUnitOfWork):\r
+ def __init__(self):\r
+ pass\r
+\r
+ def __enter__(self):\r
+ # self.session = self.session_factory() # type: Session\r
+ # self.oclouds = OcloudSqlAlchemyRepository(self.session)\r
+ self.stxobjects = FakeStxObjRepo()\r
+ return super().__enter__()\r
+\r
+ def __exit__(self, *args):\r
+ super().__exit__(*args)\r
+ # self.session.close()\r
+\r
+ def _commit(self):\r
+ pass\r
+ # self.session.commit()\r
+\r
+ def rollback(self):\r
+ pass\r
+ # self.session.rollback()\r
+\r
+ def collect_new_events(self):\r
+ yield\r
+ # return super().collect_new_events()\r
+\r
+\r
+def create_fake_bus(uow):\r
+ def update_ocloud(\r
+ cmd: commands.UpdateOCloud,\r
+ uow: AbstractUnitOfWork,\r
+ publish: Callable):\r
+ return\r
+\r
+ fakeuow = FakeUnitOfWork()\r
+ handlers.EVENT_HANDLERS = {}\r
+ handlers.COMMAND_HANDLERS = {\r
+ commands.UpdateOCloud: update_ocloud,\r
+ }\r
+ bus = bootstrap.bootstrap(False, fakeuow)\r
+ return bus\r
+\r
+\r
def test_probe_new_ocloud():\r
- fakeRepo = FakeOcloudRepo()\r
+ fakeuow = FakeUnitOfWork()\r
+ bus = create_fake_bus(fakeuow)\r
fakeClient = FakeOcloudClient()\r
- ocloudwatcher = OcloudWather(fakeClient, fakeRepo)\r
- ocloudwatcher.probe()\r
- assert len(fakeRepo.oclouds) == 1\r
- assert fakeRepo.oclouds[0].name == "stx1"\r
+ ocloudwatcher = OcloudWatcher(fakeClient, bus)\r
+ cmds = ocloudwatcher.probe()\r
+ assert cmds is not None\r
+ assert len(cmds) == 1\r
+ assert cmds[0].data.name == "stx1"\r
+ # assert len(fakeuow.stxobjects.oclouds) == 1\r
+ # assert fakeuow.stxobjects.oclouds[0].name == "stx1"\r
+\r
\r
-def test_default_worker():\r
+def test_watchers_worker():\r
+ testedworker = worker.PollWorker()\r
\r
class FakeOCloudWatcher(BaseWatcher):\r
def __init__(self, client: BaseClient,\r
- repo: OcloudRepository) -> None:\r
- super().__init__(client)\r
+ bus: messagebus) -> None:\r
+ super().__init__(client, None)\r
self.fakeOcloudWatcherCounter = 0\r
self._client = client\r
- self._repo = repo\r
+ self._bus = bus\r
\r
def _targetname(self):\r
return "fakeocloudwatcher"\r
- \r
- def _probe(self):\r
+\r
+ def _probe(self, parent: object = None):\r
+ # import pdb; pdb.set_trace()\r
self.fakeOcloudWatcherCounter += 1\r
# hacking to stop the blocking sched task\r
if self.fakeOcloudWatcherCounter > 2:\r
- worker.defaultworker.stop()\r
+ testedworker.stop()\r
+ return []\r
\r
+ # fakeRepo = FakeOcloudRepo()\r
+ fakeuow = FakeUnitOfWork()\r
+ bus = create_fake_bus(fakeuow)\r
\r
- fakeRepo = FakeOcloudRepo()\r
fakeClient = FakeOcloudClient()\r
- fakewatcher = FakeOCloudWatcher(fakeClient, fakeRepo)\r
+ fakewatcher = FakeOCloudWatcher(fakeClient, bus)\r
+\r
+ root = WatcherTree(fakewatcher)\r
\r
- worker.defaultworker.set_interval(1)\r
- worker.defaultworker.add_watcher(fakewatcher)\r
+ testedworker.set_interval(1)\r
+ testedworker.add_watcher(root)\r
assert fakewatcher.fakeOcloudWatcherCounter == 0\r
\r
count1 = fakewatcher.fakeOcloudWatcherCounter\r
- worker.defaultworker.start()\r
+ testedworker.start()\r
time.sleep(20)\r
assert fakewatcher.fakeOcloudWatcherCounter > count1\r
\r