import time\r
from datetime import datetime\r
import json\r
-from typing import List\r
-from o2ims.service import handlers\r
-from o2ims.domain.resource_type import ResourceTypeEnum\r
-from o2ims.service.client.base_client import BaseClient\r
-from o2ims.domain import ocloud\r
-from o2ims import config\r
+from typing import Callable, List\r
+# from o2common.config import config\r
import uuid\r
-from o2ims.service.watcher.base import BaseWatcher, WatcherTree\r
+from o2common.service.watcher.base import BaseWatcher, WatcherTree\r
+from o2common.service.watcher import worker\r
+from o2common.service.unit_of_work import AbstractUnitOfWork\r
+from o2common.service import messagebus\r
+\r
+from o2ims.domain.resource_type import ResourceTypeEnum\r
from o2ims.domain import stx_object as ocloudModel\r
from o2ims.adapter.ocloud_repository import OcloudRepository\r
from o2ims.domain.stx_repo import StxObjectRepository\r
-from o2ims.service.watcher import worker\r
-from o2ims.service.unit_of_work import AbstractUnitOfWork\r
from o2ims.service.watcher.ocloud_watcher import OcloudWatcher\r
-from o2ims.service import messagebus\r
-from o2ims import bootstrap\r
from o2ims.domain import commands\r
+from o2common.service.client.base_client import BaseClient\r
+from o2ims.domain import ocloud\r
+\r
+from o2app.service import handlers\r
+from o2app import bootstrap\r
+\r
\r
class FakeOcloudClient(BaseClient):\r
def __init__(self):\r
ocloud1.update_by(ocloud)\r
\r
\r
-\r
class FakeStxObjRepo(StxObjectRepository):\r
def __init__(self):\r
super().__init__()\r
\r
def create_fake_bus(uow):\r
def update_ocloud(\r
- cmd: commands.UpdateOCloud,\r
- uow: AbstractUnitOfWork):\r
+ cmd: commands.UpdateOCloud,\r
+ uow: AbstractUnitOfWork,\r
+ publish: Callable):\r
return\r
\r
fakeuow = FakeUnitOfWork()\r
def _targetname(self):\r
return "fakeocloudwatcher"\r
\r
- def _probe(self, parent: object=None):\r
+ def _probe(self, parent: object = None):\r
# import pdb; pdb.set_trace()\r
self.fakeOcloudWatcherCounter += 1\r
# hacking to stop the blocking sched task\r
testedworker.stop()\r
return []\r
\r
-\r
# fakeRepo = FakeOcloudRepo()\r
fakeuow = FakeUnitOfWork()\r
bus = create_fake_bus(fakeuow)\r