1 # Copyright (C) 2021 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 from datetime import datetime
18 from typing import Callable, List
19 # from o2common.config import config
21 from o2common.service.watcher.base import BaseWatcher, WatcherTree
22 from o2common.service.watcher import worker
23 from o2common.service.unit_of_work import AbstractUnitOfWork
24 from o2common.service import messagebus
26 from o2ims.domain.resource_type import ResourceTypeEnum
27 from o2ims.domain import stx_object as ocloudModel
28 from o2ims.adapter.ocloud_repository import OcloudRepository
29 from o2ims.domain.stx_repo import StxObjectRepository
30 from o2ims.service.watcher.ocloud_watcher import OcloudWatcher
31 from o2ims.domain import commands
32 from o2common.service.client.base_client import BaseClient
33 from o2ims.domain import ocloud
35 from o2app.service import handlers
36 from o2app import bootstrap
39 class FakeOcloudClient(BaseClient):
42 fakeCloud = ocloudModel.StxGenericModel(ResourceTypeEnum.OCLOUD)
43 fakeCloud.id = uuid.uuid4()
44 fakeCloud.name = 'stx1'
45 fakeCloud.content = json.dumps({})
46 fakeCloud.createtime = datetime.now()
47 fakeCloud.updatetime = datetime.now()
48 fakeCloud.hash = str(hash((fakeCloud.id, fakeCloud.updatetime)))
49 self.fakeCloud = fakeCloud
51 def _get(self, id) -> ocloudModel.StxGenericModel:
55 return [self.fakeCloud]
58 class FakeOcloudRepo(OcloudRepository):
63 def _add(self, ocloud: ocloud.Ocloud):
64 self.oclouds.append(ocloud)
66 def _get(self, ocloudid) -> ocloud.Ocloud:
67 filtered = [o for o in self.oclouds if o.id == ocloudid]
70 def _list(self) -> List[ocloud.Ocloud]:
71 return [x for x in self.oclouds]
73 def _update(self, ocloud: ocloud.Ocloud):
74 filtered = [o for o in self.oclouds if o.id == ocloud.id]
75 assert len(filtered) == 1
76 ocloud1 = filtered.pop()
77 ocloud1.update_by(ocloud)
80 class FakeStxObjRepo(StxObjectRepository):
85 def _add(self, ocloud: ocloud.Ocloud):
86 self.oclouds.append(ocloud)
88 def _get(self, ocloudid) -> ocloud.Ocloud:
89 filtered = [o for o in self.oclouds if o.id == ocloudid]
92 def _list(self, type: ResourceTypeEnum) -> List[ocloud.Ocloud]:
93 return [x for x in self.oclouds]
95 def _update(self, ocloud: ocloud.Ocloud):
96 filtered = [o for o in self.oclouds if o.id == ocloud.id]
97 assert len(filtered) == 1
98 ocloud1 = filtered.pop()
99 ocloud1.update_by(ocloud)
102 class FakeUnitOfWork(AbstractUnitOfWork):
107 # self.session = self.session_factory() # type: Session
108 # self.oclouds = OcloudSqlAlchemyRepository(self.session)
109 self.stxobjects = FakeStxObjRepo()
110 return super().__enter__()
112 def __exit__(self, *args):
113 super().__exit__(*args)
114 # self.session.close()
118 # self.session.commit()
122 # self.session.rollback()
124 def collect_new_events(self):
126 # return super().collect_new_events()
129 def create_fake_bus(uow):
131 cmd: commands.UpdateOCloud,
132 uow: AbstractUnitOfWork,
136 fakeuow = FakeUnitOfWork()
137 handlers.EVENT_HANDLERS = {}
138 handlers.COMMAND_HANDLERS = {
139 commands.UpdateOCloud: update_ocloud,
141 bus = bootstrap.bootstrap(False, fakeuow)
145 def test_probe_new_ocloud():
146 fakeuow = FakeUnitOfWork()
147 bus = create_fake_bus(fakeuow)
148 fakeClient = FakeOcloudClient()
149 ocloudwatcher = OcloudWatcher(fakeClient, bus)
150 cmds = ocloudwatcher.probe()
151 assert cmds is not None
152 assert len(cmds) == 1
153 assert cmds[0].data.name == "stx1"
154 # assert len(fakeuow.stxobjects.oclouds) == 1
155 # assert fakeuow.stxobjects.oclouds[0].name == "stx1"
158 def test_watchers_worker():
159 testedworker = worker.PollWorker()
161 class FakeOCloudWatcher(BaseWatcher):
162 def __init__(self, client: BaseClient,
163 bus: messagebus) -> None:
164 super().__init__(client, None)
165 self.fakeOcloudWatcherCounter = 0
166 self._client = client
169 def _targetname(self):
170 return "fakeocloudwatcher"
172 def _probe(self, parent: object = None):
173 # import pdb; pdb.set_trace()
174 self.fakeOcloudWatcherCounter += 1
175 # hacking to stop the blocking sched task
176 if self.fakeOcloudWatcherCounter > 2:
180 # fakeRepo = FakeOcloudRepo()
181 fakeuow = FakeUnitOfWork()
182 bus = create_fake_bus(fakeuow)
184 fakeClient = FakeOcloudClient()
185 fakewatcher = FakeOCloudWatcher(fakeClient, bus)
187 root = WatcherTree(fakewatcher)
189 testedworker.set_interval(1)
190 testedworker.add_watcher(root)
191 assert fakewatcher.fakeOcloudWatcherCounter == 0
193 count1 = fakewatcher.fakeOcloudWatcherCounter
196 assert fakewatcher.fakeOcloudWatcherCounter > count1
198 # assumed hacking: probe has stopped the sched task
199 count3 = fakewatcher.fakeOcloudWatcherCounter
201 assert fakewatcher.fakeOcloudWatcherCounter == count3