X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=tests%2Funit%2Ftest_watcher.py;h=b0aded4f67e3f01d5165e33dedd4144f8eb49a4f;hb=refs%2Fchanges%2F56%2F7056%2F1;hp=4cdfb76f9bcca894004bad818c186d5dcc6fb2ff;hpb=bff814cb30e501eb1e54aecb8110a78d41e7bdb0;p=pti%2Fo2.git diff --git a/tests/unit/test_watcher.py b/tests/unit/test_watcher.py index 4cdfb76..b0aded4 100644 --- a/tests/unit/test_watcher.py +++ b/tests/unit/test_watcher.py @@ -16,26 +16,30 @@ import time from datetime import datetime import json from typing import List +from o2ims.domain.resource_type import ResourceTypeEnum from o2ims.service.client.base_client import BaseClient -import pytest from o2ims.domain import ocloud from o2ims import config import uuid -from o2ims.service.watcher.base import BaseWatcher, OcloudWather +from o2ims.service.watcher.base import BaseWatcher, WatcherTree from o2ims.domain import stx_object as ocloudModel from o2ims.adapter.ocloud_repository import OcloudRepository +from o2ims.domain.stx_repo import StxObjectRepository from o2ims.service.watcher import worker -from o2ims.service.watcher.executor import start_watchers +from o2ims.service.unit_of_work import AbstractUnitOfWork +from o2ims.service.watcher.ocloud_watcher import OcloudWatcher + class FakeOcloudClient(BaseClient): def __init__(self): super().__init__() - fakeCloud = ocloudModel.StxGenericModel() + fakeCloud = ocloudModel.StxGenericModel(ResourceTypeEnum.OCLOUD) fakeCloud.id = uuid.uuid4() fakeCloud.name = 'stx1' fakeCloud.content = json.dumps({}) fakeCloud.createtime = datetime.now() - fakeCloud.updatetime = datetime.now + fakeCloud.updatetime = datetime.now() + fakeCloud.hash = str(hash((fakeCloud.id, fakeCloud.updatetime))) self.fakeCloud = fakeCloud def _get(self, id) -> ocloudModel.StxGenericModel: @@ -44,6 +48,7 @@ class FakeOcloudClient(BaseClient): def _list(self): return [self.fakeCloud] + class FakeOcloudRepo(OcloudRepository): def __init__(self): super().__init__() @@ -65,44 +70,100 @@ class FakeOcloudRepo(OcloudRepository): ocloud1 = filtered.pop() ocloud1.update_by(ocloud) + + +class FakeStxObjRepo(StxObjectRepository): + def __init__(self): + super().__init__() + self.oclouds = [] + + def _add(self, ocloud: ocloud.Ocloud): + self.oclouds.append(ocloud) + + def _get(self, ocloudid) -> ocloud.Ocloud: + filtered = [o for o in self.oclouds if o.id == ocloudid] + return filtered.pop() + + def _list(self, type: ResourceTypeEnum) -> List[ocloud.Ocloud]: + return [x for x in self.oclouds] + + def _update(self, ocloud: ocloud.Ocloud): + filtered = [o for o in self.oclouds if o.id == ocloud.id] + assert len(filtered) == 1 + ocloud1 = filtered.pop() + ocloud1.update_by(ocloud) + + +class FakeUnitOfWork(AbstractUnitOfWork): + def __init__(self): + pass + + def __enter__(self): + # self.session = self.session_factory() # type: Session + # self.oclouds = OcloudSqlAlchemyRepository(self.session) + self.stxobjects = FakeStxObjRepo() + return super().__enter__() + + def __exit__(self, *args): + super().__exit__(*args) + # self.session.close() + + def _commit(self): + pass + # self.session.commit() + + def rollback(self): + pass + # self.session.rollback() + + def test_probe_new_ocloud(): - fakeRepo = FakeOcloudRepo() + # fakeRepo = FakeOcloudRepo() + fakeuow = FakeUnitOfWork() fakeClient = FakeOcloudClient() - ocloudwatcher = OcloudWather(fakeClient, fakeRepo) + ocloudwatcher = OcloudWatcher(fakeClient, fakeuow) ocloudwatcher.probe() - assert len(fakeRepo.oclouds) == 1 - assert fakeRepo.oclouds[0].name == "stx1" + assert len(fakeuow.stxobjects.oclouds) == 1 + assert fakeuow.stxobjects.oclouds[0].name == "stx1" -def test_default_worker(): + +def test_watchers_worker(): + testedworker = worker.PollWorker() class FakeOCloudWatcher(BaseWatcher): def __init__(self, client: BaseClient, repo: OcloudRepository) -> None: - super().__init__(client) + super().__init__(client, None) self.fakeOcloudWatcherCounter = 0 self._client = client self._repo = repo def _targetname(self): return "fakeocloudwatcher" - - def _probe(self): + + def _probe(self, parent: object=None): + # import pdb; pdb.set_trace() self.fakeOcloudWatcherCounter += 1 # hacking to stop the blocking sched task if self.fakeOcloudWatcherCounter > 2: - worker.defaultworker.stop() + testedworker.stop() + return [] - fakeRepo = FakeOcloudRepo() + # fakeRepo = FakeOcloudRepo() + fakeuow = FakeUnitOfWork() + fakeClient = FakeOcloudClient() - fakewatcher = FakeOCloudWatcher(fakeClient, fakeRepo) + fakewatcher = FakeOCloudWatcher(fakeClient, fakeuow) + + root = WatcherTree(fakewatcher) - worker.defaultworker.set_interval(1) - worker.defaultworker.add_watcher(fakewatcher) + testedworker.set_interval(1) + testedworker.add_watcher(root) assert fakewatcher.fakeOcloudWatcherCounter == 0 count1 = fakewatcher.fakeOcloudWatcherCounter - worker.defaultworker.start() + testedworker.start() time.sleep(20) assert fakewatcher.fakeOcloudWatcherCounter > count1