# See the License for the specific language governing permissions and\r
# limitations under the License.\r
\r
+import time\r
from datetime import datetime\r
import json\r
from typing import List\r
-from o2ims.service.client.base_client import BaseClient\r
-import pytest\r
-from o2ims.domain import ocloud\r
-from o2ims import config\r
+# from o2common.config import config\r
import uuid\r
-from o2ims.service.watcher.base import OcloudWather\r
+from o2common.service.watcher.base import BaseWatcher, WatcherTree\r
+from o2common.service.watcher import worker\r
+from o2common.service.unit_of_work import AbstractUnitOfWork\r
+from o2common.service import messagebus\r
+\r
+from o2ims.domain.resource_type import ResourceTypeEnum\r
from o2ims.domain import stx_object as ocloudModel\r
from o2ims.adapter.ocloud_repository import OcloudRepository\r
+from o2ims.domain.stx_repo import StxObjectRepository\r
+from o2ims.service.watcher.ocloud_watcher import OcloudWatcher\r
+from o2ims.domain import commands\r
+from o2common.service.client.base_client import BaseClient\r
+from o2ims.domain import ocloud\r
+\r
+from o2app.service import handlers\r
+from o2app import bootstrap\r
+\r
\r
class FakeOcloudClient(BaseClient):\r
def __init__(self):\r
super().__init__()\r
- fakeCloud = ocloudModel.StxGenericModel()\r
+ fakeCloud = ocloudModel.StxGenericModel(ResourceTypeEnum.OCLOUD)\r
fakeCloud.id = uuid.uuid4()\r
fakeCloud.name = 'stx1'\r
fakeCloud.content = json.dumps({})\r
fakeCloud.createtime = datetime.now()\r
- fakeCloud.updatetime = datetime.now\r
+ fakeCloud.updatetime = datetime.now()\r
+ fakeCloud.hash = str(hash((fakeCloud.id, fakeCloud.updatetime)))\r
self.fakeCloud = fakeCloud\r
\r
def _get(self, id) -> ocloudModel.StxGenericModel:\r
def _list(self):\r
return [self.fakeCloud]\r
\r
+\r
class FakeOcloudRepo(OcloudRepository):\r
def __init__(self):\r
super().__init__()\r
ocloud1 = filtered.pop()\r
ocloud1.update_by(ocloud)\r
\r
+\r
+class FakeStxObjRepo(StxObjectRepository):\r
+ def __init__(self):\r
+ super().__init__()\r
+ self.oclouds = []\r
+\r
+ def _add(self, ocloud: ocloud.Ocloud):\r
+ self.oclouds.append(ocloud)\r
+\r
+ def _get(self, ocloudid) -> ocloud.Ocloud:\r
+ filtered = [o for o in self.oclouds if o.id == ocloudid]\r
+ return filtered.pop()\r
+\r
+ def _list(self, type: ResourceTypeEnum) -> List[ocloud.Ocloud]:\r
+ return [x for x in self.oclouds]\r
+\r
+ def _update(self, ocloud: ocloud.Ocloud):\r
+ filtered = [o for o in self.oclouds if o.id == ocloud.id]\r
+ assert len(filtered) == 1\r
+ ocloud1 = filtered.pop()\r
+ ocloud1.update_by(ocloud)\r
+\r
+\r
+class FakeUnitOfWork(AbstractUnitOfWork):\r
+ def __init__(self):\r
+ pass\r
+\r
+ def __enter__(self):\r
+ # self.session = self.session_factory() # type: Session\r
+ # self.oclouds = OcloudSqlAlchemyRepository(self.session)\r
+ self.stxobjects = FakeStxObjRepo()\r
+ return super().__enter__()\r
+\r
+ def __exit__(self, *args):\r
+ super().__exit__(*args)\r
+ # self.session.close()\r
+\r
+ def _commit(self):\r
+ pass\r
+ # self.session.commit()\r
+\r
+ def rollback(self):\r
+ pass\r
+ # self.session.rollback()\r
+\r
+ def collect_new_events(self):\r
+ yield\r
+ # return super().collect_new_events()\r
+\r
+\r
+def create_fake_bus(uow):\r
+ def update_ocloud(\r
+ cmd: commands.UpdateOCloud,\r
+ uow: AbstractUnitOfWork):\r
+ return\r
+\r
+ fakeuow = FakeUnitOfWork()\r
+ handlers.EVENT_HANDLERS = {}\r
+ handlers.COMMAND_HANDLERS = {\r
+ commands.UpdateOCloud: update_ocloud,\r
+ }\r
+ bus = bootstrap.bootstrap(False, fakeuow)\r
+ return bus\r
+\r
+\r
def test_probe_new_ocloud():\r
- fakeRepo = FakeOcloudRepo()\r
+ fakeuow = FakeUnitOfWork()\r
+ bus = create_fake_bus(fakeuow)\r
fakeClient = FakeOcloudClient()\r
- ocloudwatcher = OcloudWather(fakeClient, fakeRepo)\r
- ocloudwatcher.probe()\r
- assert len(fakeRepo.oclouds) == 1\r
- assert fakeRepo.oclouds[0].name == "stx1"\r
+ ocloudwatcher = OcloudWatcher(fakeClient, bus)\r
+ cmds = ocloudwatcher.probe()\r
+ assert cmds is not None\r
+ assert len(cmds) == 1\r
+ assert cmds[0].data.name == "stx1"\r
+ # assert len(fakeuow.stxobjects.oclouds) == 1\r
+ # assert fakeuow.stxobjects.oclouds[0].name == "stx1"\r
+\r
+\r
+def test_watchers_worker():\r
+ testedworker = worker.PollWorker()\r
+\r
+ class FakeOCloudWatcher(BaseWatcher):\r
+ def __init__(self, client: BaseClient,\r
+ bus: messagebus) -> None:\r
+ super().__init__(client, None)\r
+ self.fakeOcloudWatcherCounter = 0\r
+ self._client = client\r
+ self._bus = bus\r
+\r
+ def _targetname(self):\r
+ return "fakeocloudwatcher"\r
+\r
+ def _probe(self, parent: object = None):\r
+ # import pdb; pdb.set_trace()\r
+ self.fakeOcloudWatcherCounter += 1\r
+ # hacking to stop the blocking sched task\r
+ if self.fakeOcloudWatcherCounter > 2:\r
+ testedworker.stop()\r
+ return []\r
+\r
+ # fakeRepo = FakeOcloudRepo()\r
+ fakeuow = FakeUnitOfWork()\r
+ bus = create_fake_bus(fakeuow)\r
+\r
+ fakeClient = FakeOcloudClient()\r
+ fakewatcher = FakeOCloudWatcher(fakeClient, bus)\r
+\r
+ root = WatcherTree(fakewatcher)\r
+\r
+ testedworker.set_interval(1)\r
+ testedworker.add_watcher(root)\r
+ assert fakewatcher.fakeOcloudWatcherCounter == 0\r
+\r
+ count1 = fakewatcher.fakeOcloudWatcherCounter\r
+ testedworker.start()\r
+ time.sleep(20)\r
+ assert fakewatcher.fakeOcloudWatcherCounter > count1\r
+\r
+ # assumed hacking: probe has stopped the sched task\r
+ count3 = fakewatcher.fakeOcloudWatcherCounter\r
+ time.sleep(3)\r
+ assert fakewatcher.fakeOcloudWatcherCounter == count3\r