1 # Copyright (C) 2021 Wind River Systems, Inc.
\r
3 # Licensed under the Apache License, Version 2.0 (the "License");
\r
4 # you may not use this file except in compliance with the License.
\r
5 # You may obtain a copy of the License at
\r
7 # http://www.apache.org/licenses/LICENSE-2.0
\r
9 # Unless required by applicable law or agreed to in writing, software
\r
10 # distributed under the License is distributed on an "AS IS" BASIS,
\r
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
12 # See the License for the specific language governing permissions and
\r
13 # limitations under the License.
\r
16 from datetime import datetime
\r
18 from typing import List
\r
19 from o2ims.service.client.base_client import BaseClient
\r
21 from o2ims.domain import ocloud
\r
22 from o2ims import config
\r
24 from o2ims.service.watcher.base import BaseWatcher, OcloudWather
\r
25 from o2ims.domain import stx_object as ocloudModel
\r
26 from o2ims.adapter.ocloud_repository import OcloudRepository
\r
27 from o2ims.service.watcher import worker
\r
28 from o2ims.service.watcher.executor import start_watchers
\r
30 class FakeOcloudClient(BaseClient):
\r
33 fakeCloud = ocloudModel.StxGenericModel()
\r
34 fakeCloud.id = uuid.uuid4()
\r
35 fakeCloud.name = 'stx1'
\r
36 fakeCloud.content = json.dumps({})
\r
37 fakeCloud.createtime = datetime.now()
\r
38 fakeCloud.updatetime = datetime.now
\r
39 self.fakeCloud = fakeCloud
\r
41 def _get(self, id) -> ocloudModel.StxGenericModel:
\r
42 return self.fakeCloud
\r
45 return [self.fakeCloud]
\r
47 class FakeOcloudRepo(OcloudRepository):
\r
52 def _add(self, ocloud: ocloud.Ocloud):
\r
53 self.oclouds.append(ocloud)
\r
55 def _get(self, ocloudid) -> ocloud.Ocloud:
\r
56 filtered = [o for o in self.oclouds if o.id == ocloudid]
\r
57 return filtered.pop()
\r
59 def _list(self) -> List[ocloud.Ocloud]:
\r
60 return [x for x in self.oclouds]
\r
62 def _update(self, ocloud: ocloud.Ocloud):
\r
63 filtered = [o for o in self.oclouds if o.id == ocloud.id]
\r
64 assert len(filtered) == 1
\r
65 ocloud1 = filtered.pop()
\r
66 ocloud1.update_by(ocloud)
\r
68 def test_probe_new_ocloud():
\r
69 fakeRepo = FakeOcloudRepo()
\r
70 fakeClient = FakeOcloudClient()
\r
71 ocloudwatcher = OcloudWather(fakeClient, fakeRepo)
\r
72 ocloudwatcher.probe()
\r
73 assert len(fakeRepo.oclouds) == 1
\r
74 assert fakeRepo.oclouds[0].name == "stx1"
\r
76 def test_default_worker():
\r
78 class FakeOCloudWatcher(BaseWatcher):
\r
79 def __init__(self, client: BaseClient,
\r
80 repo: OcloudRepository) -> None:
\r
81 super().__init__(client)
\r
82 self.fakeOcloudWatcherCounter = 0
\r
83 self._client = client
\r
86 def _targetname(self):
\r
87 return "fakeocloudwatcher"
\r
90 self.fakeOcloudWatcherCounter += 1
\r
91 # hacking to stop the blocking sched task
\r
92 if self.fakeOcloudWatcherCounter > 2:
\r
93 worker.defaultworker.stop()
\r
96 fakeRepo = FakeOcloudRepo()
\r
97 fakeClient = FakeOcloudClient()
\r
98 fakewatcher = FakeOCloudWatcher(fakeClient, fakeRepo)
\r
100 worker.defaultworker.set_interval(1)
\r
101 worker.defaultworker.add_watcher(fakewatcher)
\r
102 assert fakewatcher.fakeOcloudWatcherCounter == 0
\r
104 count1 = fakewatcher.fakeOcloudWatcherCounter
\r
105 worker.defaultworker.start()
\r
107 assert fakewatcher.fakeOcloudWatcherCounter > count1
\r
109 # assumed hacking: probe has stopped the sched task
\r
110 count3 = fakewatcher.fakeOcloudWatcherCounter
\r
112 assert fakewatcher.fakeOcloudWatcherCounter == count3
\r