import time\r
from datetime import datetime\r
import json\r
-from typing import List\r
+from typing import Callable, List\r
# from o2common.config import config\r
import uuid\r
from o2common.service.watcher.base import BaseWatcher, WatcherTree\r
ocloud1.update_by(ocloud)\r
\r
\r
-\r
class FakeStxObjRepo(StxObjectRepository):\r
def __init__(self):\r
super().__init__()\r
\r
def create_fake_bus(uow):\r
def update_ocloud(\r
- cmd: commands.UpdateOCloud,\r
- uow: AbstractUnitOfWork):\r
+ cmd: commands.UpdateOCloud,\r
+ uow: AbstractUnitOfWork,\r
+ publish: Callable):\r
return\r
\r
fakeuow = FakeUnitOfWork()\r
def _targetname(self):\r
return "fakeocloudwatcher"\r
\r
- def _probe(self, parent: object=None):\r
+ def _probe(self, parent: object = None):\r
# import pdb; pdb.set_trace()\r
self.fakeOcloudWatcherCounter += 1\r
# hacking to stop the blocking sched task\r
testedworker.stop()\r
return []\r
\r
-\r
# fakeRepo = FakeOcloudRepo()\r
fakeuow = FakeUnitOfWork()\r
bus = create_fake_bus(fakeuow)\r