1 # Copyright (C) 2021 Wind River Systems, Inc.
\r
3 # Licensed under the Apache License, Version 2.0 (the "License");
\r
4 # you may not use this file except in compliance with the License.
\r
5 # You may obtain a copy of the License at
\r
7 # http://www.apache.org/licenses/LICENSE-2.0
\r
9 # Unless required by applicable law or agreed to in writing, software
\r
10 # distributed under the License is distributed on an "AS IS" BASIS,
\r
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
12 # See the License for the specific language governing permissions and
\r
13 # limitations under the License.
\r
16 from datetime import datetime
\r
18 from typing import List
\r
19 # from o2common.config import config
\r
21 from o2common.service.watcher.base import BaseWatcher, WatcherTree
\r
22 from o2common.service.watcher import worker
\r
23 from o2common.service.unit_of_work import AbstractUnitOfWork
\r
24 from o2common.service import messagebus
\r
26 from o2ims.domain.resource_type import ResourceTypeEnum
\r
27 from o2ims.domain import stx_object as ocloudModel
\r
28 from o2ims.adapter.ocloud_repository import OcloudRepository
\r
29 from o2ims.domain.stx_repo import StxObjectRepository
\r
30 from o2ims.service.watcher.ocloud_watcher import OcloudWatcher
\r
31 from o2ims.domain import commands
\r
32 from o2common.service.client.base_client import BaseClient
\r
33 from o2ims.domain import ocloud
\r
35 from o2app.service import handlers
\r
36 from o2app import bootstrap
\r
39 class FakeOcloudClient(BaseClient):
\r
42 fakeCloud = ocloudModel.StxGenericModel(ResourceTypeEnum.OCLOUD)
\r
43 fakeCloud.id = uuid.uuid4()
\r
44 fakeCloud.name = 'stx1'
\r
45 fakeCloud.content = json.dumps({})
\r
46 fakeCloud.createtime = datetime.now()
\r
47 fakeCloud.updatetime = datetime.now()
\r
48 fakeCloud.hash = str(hash((fakeCloud.id, fakeCloud.updatetime)))
\r
49 self.fakeCloud = fakeCloud
\r
51 def _get(self, id) -> ocloudModel.StxGenericModel:
\r
52 return self.fakeCloud
\r
55 return [self.fakeCloud]
\r
58 class FakeOcloudRepo(OcloudRepository):
\r
63 def _add(self, ocloud: ocloud.Ocloud):
\r
64 self.oclouds.append(ocloud)
\r
66 def _get(self, ocloudid) -> ocloud.Ocloud:
\r
67 filtered = [o for o in self.oclouds if o.id == ocloudid]
\r
68 return filtered.pop()
\r
70 def _list(self) -> List[ocloud.Ocloud]:
\r
71 return [x for x in self.oclouds]
\r
73 def _update(self, ocloud: ocloud.Ocloud):
\r
74 filtered = [o for o in self.oclouds if o.id == ocloud.id]
\r
75 assert len(filtered) == 1
\r
76 ocloud1 = filtered.pop()
\r
77 ocloud1.update_by(ocloud)
\r
81 class FakeStxObjRepo(StxObjectRepository):
\r
86 def _add(self, ocloud: ocloud.Ocloud):
\r
87 self.oclouds.append(ocloud)
\r
89 def _get(self, ocloudid) -> ocloud.Ocloud:
\r
90 filtered = [o for o in self.oclouds if o.id == ocloudid]
\r
91 return filtered.pop()
\r
93 def _list(self, type: ResourceTypeEnum) -> List[ocloud.Ocloud]:
\r
94 return [x for x in self.oclouds]
\r
96 def _update(self, ocloud: ocloud.Ocloud):
\r
97 filtered = [o for o in self.oclouds if o.id == ocloud.id]
\r
98 assert len(filtered) == 1
\r
99 ocloud1 = filtered.pop()
\r
100 ocloud1.update_by(ocloud)
\r
103 class FakeUnitOfWork(AbstractUnitOfWork):
\r
104 def __init__(self):
\r
107 def __enter__(self):
\r
108 # self.session = self.session_factory() # type: Session
\r
109 # self.oclouds = OcloudSqlAlchemyRepository(self.session)
\r
110 self.stxobjects = FakeStxObjRepo()
\r
111 return super().__enter__()
\r
113 def __exit__(self, *args):
\r
114 super().__exit__(*args)
\r
115 # self.session.close()
\r
119 # self.session.commit()
\r
121 def rollback(self):
\r
123 # self.session.rollback()
\r
125 def collect_new_events(self):
\r
127 # return super().collect_new_events()
\r
130 def create_fake_bus(uow):
\r
132 cmd: commands.UpdateOCloud,
\r
133 uow: AbstractUnitOfWork):
\r
136 fakeuow = FakeUnitOfWork()
\r
137 handlers.EVENT_HANDLERS = {}
\r
138 handlers.COMMAND_HANDLERS = {
\r
139 commands.UpdateOCloud: update_ocloud,
\r
141 bus = bootstrap.bootstrap(False, fakeuow)
\r
145 def test_probe_new_ocloud():
\r
146 fakeuow = FakeUnitOfWork()
\r
147 bus = create_fake_bus(fakeuow)
\r
148 fakeClient = FakeOcloudClient()
\r
149 ocloudwatcher = OcloudWatcher(fakeClient, bus)
\r
150 cmds = ocloudwatcher.probe()
\r
151 assert cmds is not None
\r
152 assert len(cmds) == 1
\r
153 assert cmds[0].data.name == "stx1"
\r
154 # assert len(fakeuow.stxobjects.oclouds) == 1
\r
155 # assert fakeuow.stxobjects.oclouds[0].name == "stx1"
\r
158 def test_watchers_worker():
\r
159 testedworker = worker.PollWorker()
\r
161 class FakeOCloudWatcher(BaseWatcher):
\r
162 def __init__(self, client: BaseClient,
\r
163 bus: messagebus) -> None:
\r
164 super().__init__(client, None)
\r
165 self.fakeOcloudWatcherCounter = 0
\r
166 self._client = client
\r
169 def _targetname(self):
\r
170 return "fakeocloudwatcher"
\r
172 def _probe(self, parent: object=None):
\r
173 # import pdb; pdb.set_trace()
\r
174 self.fakeOcloudWatcherCounter += 1
\r
175 # hacking to stop the blocking sched task
\r
176 if self.fakeOcloudWatcherCounter > 2:
\r
177 testedworker.stop()
\r
181 # fakeRepo = FakeOcloudRepo()
\r
182 fakeuow = FakeUnitOfWork()
\r
183 bus = create_fake_bus(fakeuow)
\r
185 fakeClient = FakeOcloudClient()
\r
186 fakewatcher = FakeOCloudWatcher(fakeClient, bus)
\r
188 root = WatcherTree(fakewatcher)
\r
190 testedworker.set_interval(1)
\r
191 testedworker.add_watcher(root)
\r
192 assert fakewatcher.fakeOcloudWatcherCounter == 0
\r
194 count1 = fakewatcher.fakeOcloudWatcherCounter
\r
195 testedworker.start()
\r
197 assert fakewatcher.fakeOcloudWatcherCounter > count1
\r
199 # assumed hacking: probe has stopped the sched task
\r
200 count3 = fakewatcher.fakeOcloudWatcherCounter
\r
202 assert fakewatcher.fakeOcloudWatcherCounter == count3
\r