X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=tests%2Funit%2Ftest_watcher.py;h=806d20fe10605cac9fea6ffe13a30f31628832a2;hb=refs%2Fchanges%2F24%2F7524%2F1;hp=b0aded4f67e3f01d5165e33dedd4144f8eb49a4f;hpb=e60b74de2ecaccc5f2fef8e75a44649d0e90d362;p=pti%2Fo2.git diff --git a/tests/unit/test_watcher.py b/tests/unit/test_watcher.py index b0aded4..806d20f 100644 --- a/tests/unit/test_watcher.py +++ b/tests/unit/test_watcher.py @@ -15,19 +15,25 @@ import time from datetime import datetime import json -from typing import List -from o2ims.domain.resource_type import ResourceTypeEnum -from o2ims.service.client.base_client import BaseClient -from o2ims.domain import ocloud -from o2ims import config +from typing import Callable, List +# from o2common.config import config import uuid -from o2ims.service.watcher.base import BaseWatcher, WatcherTree +from o2common.service.watcher.base import BaseWatcher, WatcherTree +from o2common.service.watcher import worker +from o2common.service.unit_of_work import AbstractUnitOfWork +from o2common.service import messagebus + +from o2ims.domain.resource_type import ResourceTypeEnum from o2ims.domain import stx_object as ocloudModel from o2ims.adapter.ocloud_repository import OcloudRepository from o2ims.domain.stx_repo import StxObjectRepository -from o2ims.service.watcher import worker -from o2ims.service.unit_of_work import AbstractUnitOfWork from o2ims.service.watcher.ocloud_watcher import OcloudWatcher +from o2ims.domain import commands +from o2common.service.client.base_client import BaseClient +from o2ims.domain import ocloud + +from o2app.service import handlers +from o2app import bootstrap class FakeOcloudClient(BaseClient): @@ -71,7 +77,6 @@ class FakeOcloudRepo(OcloudRepository): ocloud1.update_by(ocloud) - class FakeStxObjRepo(StxObjectRepository): def __init__(self): super().__init__() @@ -116,15 +121,38 @@ class FakeUnitOfWork(AbstractUnitOfWork): pass # self.session.rollback() + def collect_new_events(self): + yield + # return super().collect_new_events() + + +def create_fake_bus(uow): + def update_ocloud( + cmd: commands.UpdateOCloud, + uow: AbstractUnitOfWork, + publish: Callable): + return + + fakeuow = FakeUnitOfWork() + handlers.EVENT_HANDLERS = {} + handlers.COMMAND_HANDLERS = { + commands.UpdateOCloud: update_ocloud, + } + bus = bootstrap.bootstrap(False, fakeuow) + return bus + def test_probe_new_ocloud(): - # fakeRepo = FakeOcloudRepo() fakeuow = FakeUnitOfWork() + bus = create_fake_bus(fakeuow) fakeClient = FakeOcloudClient() - ocloudwatcher = OcloudWatcher(fakeClient, fakeuow) - ocloudwatcher.probe() - assert len(fakeuow.stxobjects.oclouds) == 1 - assert fakeuow.stxobjects.oclouds[0].name == "stx1" + ocloudwatcher = OcloudWatcher(fakeClient, bus) + cmds = ocloudwatcher.probe() + assert cmds is not None + assert len(cmds) == 1 + assert cmds[0].data.name == "stx1" + # assert len(fakeuow.stxobjects.oclouds) == 1 + # assert fakeuow.stxobjects.oclouds[0].name == "stx1" def test_watchers_worker(): @@ -132,16 +160,16 @@ def test_watchers_worker(): class FakeOCloudWatcher(BaseWatcher): def __init__(self, client: BaseClient, - repo: OcloudRepository) -> None: + bus: messagebus) -> None: super().__init__(client, None) self.fakeOcloudWatcherCounter = 0 self._client = client - self._repo = repo + self._bus = bus def _targetname(self): return "fakeocloudwatcher" - def _probe(self, parent: object=None): + def _probe(self, parent: object = None): # import pdb; pdb.set_trace() self.fakeOcloudWatcherCounter += 1 # hacking to stop the blocking sched task @@ -149,12 +177,12 @@ def test_watchers_worker(): testedworker.stop() return [] - # fakeRepo = FakeOcloudRepo() fakeuow = FakeUnitOfWork() + bus = create_fake_bus(fakeuow) fakeClient = FakeOcloudClient() - fakewatcher = FakeOCloudWatcher(fakeClient, fakeuow) + fakewatcher = FakeOCloudWatcher(fakeClient, bus) root = WatcherTree(fakewatcher)