69c3e91245672ca2c6838ffed1c77b6f49ee5be4
[pti/o2.git] / tests / unit / test_watcher.py
1 # Copyright (C) 2021 Wind River Systems, Inc.\r
2 #\r
3 #  Licensed under the Apache License, Version 2.0 (the "License");\r
4 #  you may not use this file except in compliance with the License.\r
5 #  You may obtain a copy of the License at\r
6 #\r
7 #      http://www.apache.org/licenses/LICENSE-2.0\r
8 #\r
9 #  Unless required by applicable law or agreed to in writing, software\r
10 #  distributed under the License is distributed on an "AS IS" BASIS,\r
11 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
12 #  See the License for the specific language governing permissions and\r
13 #  limitations under the License.\r
14 \r
15 import time\r
16 from datetime import datetime\r
17 import json\r
18 from typing import List\r
19 from o2ims.service import handlers\r
20 from o2ims.domain.resource_type import ResourceTypeEnum\r
21 from o2ims.service.client.base_client import BaseClient\r
22 from o2ims.domain import ocloud\r
23 from o2ims import config\r
24 import uuid\r
25 from o2ims.service.watcher.base import BaseWatcher, WatcherTree\r
26 from o2ims.domain import stx_object as ocloudModel\r
27 from o2ims.adapter.ocloud_repository import OcloudRepository\r
28 from o2ims.domain.stx_repo import StxObjectRepository\r
29 from o2ims.service.watcher import worker\r
30 from o2ims.service.unit_of_work import AbstractUnitOfWork\r
31 from o2ims.service.watcher.ocloud_watcher import OcloudWatcher\r
32 from o2ims.service import messagebus\r
33 from o2ims import bootstrap\r
34 from o2ims.domain import commands\r
35 \r
36 class FakeOcloudClient(BaseClient):\r
37     def __init__(self):\r
38         super().__init__()\r
39         fakeCloud = ocloudModel.StxGenericModel(ResourceTypeEnum.OCLOUD)\r
40         fakeCloud.id = uuid.uuid4()\r
41         fakeCloud.name = 'stx1'\r
42         fakeCloud.content = json.dumps({})\r
43         fakeCloud.createtime = datetime.now()\r
44         fakeCloud.updatetime = datetime.now()\r
45         fakeCloud.hash = str(hash((fakeCloud.id, fakeCloud.updatetime)))\r
46         self.fakeCloud = fakeCloud\r
47 \r
48     def _get(self, id) -> ocloudModel.StxGenericModel:\r
49         return self.fakeCloud\r
50 \r
51     def _list(self):\r
52         return [self.fakeCloud]\r
53 \r
54 \r
55 class FakeOcloudRepo(OcloudRepository):\r
56     def __init__(self):\r
57         super().__init__()\r
58         self.oclouds = []\r
59 \r
60     def _add(self, ocloud: ocloud.Ocloud):\r
61         self.oclouds.append(ocloud)\r
62 \r
63     def _get(self, ocloudid) -> ocloud.Ocloud:\r
64         filtered = [o for o in self.oclouds if o.id == ocloudid]\r
65         return filtered.pop()\r
66 \r
67     def _list(self) -> List[ocloud.Ocloud]:\r
68         return [x for x in self.oclouds]\r
69 \r
70     def _update(self, ocloud: ocloud.Ocloud):\r
71         filtered = [o for o in self.oclouds if o.id == ocloud.id]\r
72         assert len(filtered) == 1\r
73         ocloud1 = filtered.pop()\r
74         ocloud1.update_by(ocloud)\r
75 \r
76 \r
77 \r
78 class FakeStxObjRepo(StxObjectRepository):\r
79     def __init__(self):\r
80         super().__init__()\r
81         self.oclouds = []\r
82 \r
83     def _add(self, ocloud: ocloud.Ocloud):\r
84         self.oclouds.append(ocloud)\r
85 \r
86     def _get(self, ocloudid) -> ocloud.Ocloud:\r
87         filtered = [o for o in self.oclouds if o.id == ocloudid]\r
88         return filtered.pop()\r
89 \r
90     def _list(self, type: ResourceTypeEnum) -> List[ocloud.Ocloud]:\r
91         return [x for x in self.oclouds]\r
92 \r
93     def _update(self, ocloud: ocloud.Ocloud):\r
94         filtered = [o for o in self.oclouds if o.id == ocloud.id]\r
95         assert len(filtered) == 1\r
96         ocloud1 = filtered.pop()\r
97         ocloud1.update_by(ocloud)\r
98 \r
99 \r
100 class FakeUnitOfWork(AbstractUnitOfWork):\r
101     def __init__(self):\r
102         pass\r
103 \r
104     def __enter__(self):\r
105         # self.session = self.session_factory()  # type: Session\r
106         # self.oclouds = OcloudSqlAlchemyRepository(self.session)\r
107         self.stxobjects = FakeStxObjRepo()\r
108         return super().__enter__()\r
109 \r
110     def __exit__(self, *args):\r
111         super().__exit__(*args)\r
112         # self.session.close()\r
113 \r
114     def _commit(self):\r
115         pass\r
116         # self.session.commit()\r
117 \r
118     def rollback(self):\r
119         pass\r
120         # self.session.rollback()\r
121 \r
122     def collect_new_events(self):\r
123         yield\r
124         # return super().collect_new_events()\r
125 \r
126 \r
127 def create_fake_bus(uow):\r
128     def update_ocloud(\r
129         cmd: commands.UpdateOCloud,\r
130         uow: AbstractUnitOfWork):\r
131         return\r
132 \r
133     fakeuow = FakeUnitOfWork()\r
134     handlers.EVENT_HANDLERS = {}\r
135     handlers.COMMAND_HANDLERS = {\r
136         commands.UpdateOCloud: update_ocloud,\r
137     }\r
138     bus = bootstrap.bootstrap(False, fakeuow)\r
139     return bus\r
140 \r
141 \r
142 def test_probe_new_ocloud():\r
143     fakeuow = FakeUnitOfWork()\r
144     bus = create_fake_bus(fakeuow)\r
145     fakeClient = FakeOcloudClient()\r
146     ocloudwatcher = OcloudWatcher(fakeClient, bus)\r
147     cmds = ocloudwatcher.probe()\r
148     assert cmds is not None\r
149     assert len(cmds) == 1\r
150     assert cmds[0].data.name == "stx1"\r
151     # assert len(fakeuow.stxobjects.oclouds) == 1\r
152     # assert fakeuow.stxobjects.oclouds[0].name == "stx1"\r
153 \r
154 \r
155 def test_watchers_worker():\r
156     testedworker = worker.PollWorker()\r
157 \r
158     class FakeOCloudWatcher(BaseWatcher):\r
159         def __init__(self, client: BaseClient,\r
160                      bus: messagebus) -> None:\r
161             super().__init__(client, None)\r
162             self.fakeOcloudWatcherCounter = 0\r
163             self._client = client\r
164             self._bus = bus\r
165 \r
166         def _targetname(self):\r
167             return "fakeocloudwatcher"\r
168 \r
169         def _probe(self, parent: object=None):\r
170             # import pdb; pdb.set_trace()\r
171             self.fakeOcloudWatcherCounter += 1\r
172             # hacking to stop the blocking sched task\r
173             if self.fakeOcloudWatcherCounter > 2:\r
174                 testedworker.stop()\r
175             return []\r
176 \r
177 \r
178     # fakeRepo = FakeOcloudRepo()\r
179     fakeuow = FakeUnitOfWork()\r
180     bus = create_fake_bus(fakeuow)\r
181 \r
182     fakeClient = FakeOcloudClient()\r
183     fakewatcher = FakeOCloudWatcher(fakeClient, bus)\r
184 \r
185     root = WatcherTree(fakewatcher)\r
186 \r
187     testedworker.set_interval(1)\r
188     testedworker.add_watcher(root)\r
189     assert fakewatcher.fakeOcloudWatcherCounter == 0\r
190 \r
191     count1 = fakewatcher.fakeOcloudWatcherCounter\r
192     testedworker.start()\r
193     time.sleep(20)\r
194     assert fakewatcher.fakeOcloudWatcherCounter > count1\r
195 \r
196     # assumed hacking: probe has stopped the sched task\r
197     count3 = fakewatcher.fakeOcloudWatcherCounter\r
198     time.sleep(3)\r
199     assert fakewatcher.fakeOcloudWatcherCounter == count3\r