1 # Copyright (C) 2021 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 from datetime import datetime
18 from typing import Callable, List
19 # from o2common.config import config
21 from o2common.service.watcher.base import BaseWatcher, WatcherTree
22 from o2common.service.watcher import worker
23 from o2common.service.unit_of_work import AbstractUnitOfWork
24 from o2common.service import messagebus
26 from o2ims.domain.resource_type import ResourceTypeEnum
27 from o2ims.domain import stx_object as ocloudModel
28 from o2ims.adapter.ocloud_repository import OcloudRepository
29 from o2ims.domain.stx_repo import StxObjectRepository
30 from o2ims.service.watcher.ocloud_watcher import OcloudWatcher
31 from o2ims.domain import commands
32 from o2common.service.client.base_client import BaseClient
33 from o2ims.domain import ocloud
35 from o2app.service import handlers
36 from o2app import bootstrap
39 class FakeOcloudClient(BaseClient):
42 fakeCloud = ocloudModel.StxGenericModel(ResourceTypeEnum.OCLOUD)
43 fakeCloud.id = uuid.uuid4()
44 fakeCloud.name = 'stx1'
45 fakeCloud.content = json.dumps({})
46 fakeCloud.createtime = datetime.now()
47 fakeCloud.updatetime = datetime.now()
48 fakeCloud.hash = str(hash((fakeCloud.id, fakeCloud.updatetime)))
49 self.fakeCloud = fakeCloud
51 def _get(self, id) -> ocloudModel.StxGenericModel:
55 return [self.fakeCloud]
57 def _set_stx_client(self):
61 class FakeOcloudRepo(OcloudRepository):
66 def _add(self, ocloud: ocloud.Ocloud):
67 self.oclouds.append(ocloud)
69 def _get(self, ocloudid) -> ocloud.Ocloud:
70 filtered = [o for o in self.oclouds if o.id == ocloudid]
73 def _list(self) -> List[ocloud.Ocloud]:
74 return [x for x in self.oclouds]
76 def _update(self, ocloud: ocloud.Ocloud):
77 filtered = [o for o in self.oclouds if o.id == ocloud.id]
78 assert len(filtered) == 1
79 ocloud1 = filtered.pop()
80 ocloud1.update_by(ocloud)
83 class FakeStxObjRepo(StxObjectRepository):
88 def _add(self, ocloud: ocloud.Ocloud):
89 self.oclouds.append(ocloud)
91 def _get(self, ocloudid) -> ocloud.Ocloud:
92 filtered = [o for o in self.oclouds if o.id == ocloudid]
95 def _list(self, type: ResourceTypeEnum) -> List[ocloud.Ocloud]:
96 return [x for x in self.oclouds]
98 def _update(self, ocloud: ocloud.Ocloud):
99 filtered = [o for o in self.oclouds if o.id == ocloud.id]
100 assert len(filtered) == 1
101 ocloud1 = filtered.pop()
102 ocloud1.update_by(ocloud)
105 class FakeUnitOfWork(AbstractUnitOfWork):
110 # self.session = self.session_factory() # type: Session
111 # self.oclouds = OcloudSqlAlchemyRepository(self.session)
112 self.stxobjects = FakeStxObjRepo()
113 return super().__enter__()
115 def __exit__(self, *args):
116 super().__exit__(*args)
117 # self.session.close()
121 # self.session.commit()
125 # self.session.rollback()
127 def collect_new_events(self):
129 # return super().collect_new_events()
132 def create_fake_bus(uow):
134 cmd: commands.UpdateOCloud,
135 uow: AbstractUnitOfWork,
139 # fakeuow = FakeUnitOfWork()
140 handlers.EVENT_HANDLERS = {}
141 handlers.COMMAND_HANDLERS = {
142 commands.UpdateOCloud: update_ocloud,
144 bus = bootstrap.bootstrap(False, uow)
148 def test_probe_new_ocloud():
149 fakeuow = FakeUnitOfWork()
150 bus = create_fake_bus(fakeuow)
151 fakeClient = FakeOcloudClient()
152 ocloudwatcher = OcloudWatcher(fakeClient, bus)
153 cmds = ocloudwatcher.probe()
154 assert cmds is not None
155 assert len(cmds) == 1
156 assert cmds[0].data.name == "stx1"
157 # assert len(fakeuow.stxobjects.oclouds) == 1
158 # assert fakeuow.stxobjects.oclouds[0].name == "stx1"
161 def test_watchers_worker():
162 testedworker = worker.PollWorker()
164 class FakeOCloudWatcher(BaseWatcher):
165 def __init__(self, client: BaseClient,
166 bus: messagebus) -> None:
167 super().__init__(client, None)
168 self.fakeOcloudWatcherCounter = 0
169 self._client = client
172 def _targetname(self):
173 return "fakeocloudwatcher"
175 def _probe(self, parent: object = None, tags=None):
176 # import pdb; pdb.set_trace()
177 self.fakeOcloudWatcherCounter += 1
178 # hacking to stop the blocking sched task
179 if self.fakeOcloudWatcherCounter > 2:
183 # fakeRepo = FakeOcloudRepo()
184 fakeuow = FakeUnitOfWork()
185 bus = create_fake_bus(fakeuow)
187 fakeClient = FakeOcloudClient()
188 fakewatcher = FakeOCloudWatcher(fakeClient, bus)
190 root = WatcherTree(fakewatcher)
192 testedworker.set_interval(1)
193 testedworker.add_watcher(root)
194 assert fakewatcher.fakeOcloudWatcherCounter == 0
196 count1 = fakewatcher.fakeOcloudWatcherCounter
199 assert fakewatcher.fakeOcloudWatcherCounter > count1
201 # assumed hacking: probe has stopped the sched task
202 count3 = fakewatcher.fakeOcloudWatcherCounter
204 assert fakewatcher.fakeOcloudWatcherCounter == count3