806d20fe10605cac9fea6ffe13a30f31628832a2
[pti/o2.git] / tests / unit / test_watcher.py
1 # Copyright (C) 2021 Wind River Systems, Inc.\r
2 #\r
3 #  Licensed under the Apache License, Version 2.0 (the "License");\r
4 #  you may not use this file except in compliance with the License.\r
5 #  You may obtain a copy of the License at\r
6 #\r
7 #      http://www.apache.org/licenses/LICENSE-2.0\r
8 #\r
9 #  Unless required by applicable law or agreed to in writing, software\r
10 #  distributed under the License is distributed on an "AS IS" BASIS,\r
11 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
12 #  See the License for the specific language governing permissions and\r
13 #  limitations under the License.\r
14 \r
15 import time\r
16 from datetime import datetime\r
17 import json\r
18 from typing import Callable, List\r
19 # from o2common.config import config\r
20 import uuid\r
21 from o2common.service.watcher.base import BaseWatcher, WatcherTree\r
22 from o2common.service.watcher import worker\r
23 from o2common.service.unit_of_work import AbstractUnitOfWork\r
24 from o2common.service import messagebus\r
25 \r
26 from o2ims.domain.resource_type import ResourceTypeEnum\r
27 from o2ims.domain import stx_object as ocloudModel\r
28 from o2ims.adapter.ocloud_repository import OcloudRepository\r
29 from o2ims.domain.stx_repo import StxObjectRepository\r
30 from o2ims.service.watcher.ocloud_watcher import OcloudWatcher\r
31 from o2ims.domain import commands\r
32 from o2common.service.client.base_client import BaseClient\r
33 from o2ims.domain import ocloud\r
34 \r
35 from o2app.service import handlers\r
36 from o2app import bootstrap\r
37 \r
38 \r
39 class FakeOcloudClient(BaseClient):\r
40     def __init__(self):\r
41         super().__init__()\r
42         fakeCloud = ocloudModel.StxGenericModel(ResourceTypeEnum.OCLOUD)\r
43         fakeCloud.id = uuid.uuid4()\r
44         fakeCloud.name = 'stx1'\r
45         fakeCloud.content = json.dumps({})\r
46         fakeCloud.createtime = datetime.now()\r
47         fakeCloud.updatetime = datetime.now()\r
48         fakeCloud.hash = str(hash((fakeCloud.id, fakeCloud.updatetime)))\r
49         self.fakeCloud = fakeCloud\r
50 \r
51     def _get(self, id) -> ocloudModel.StxGenericModel:\r
52         return self.fakeCloud\r
53 \r
54     def _list(self):\r
55         return [self.fakeCloud]\r
56 \r
57 \r
58 class FakeOcloudRepo(OcloudRepository):\r
59     def __init__(self):\r
60         super().__init__()\r
61         self.oclouds = []\r
62 \r
63     def _add(self, ocloud: ocloud.Ocloud):\r
64         self.oclouds.append(ocloud)\r
65 \r
66     def _get(self, ocloudid) -> ocloud.Ocloud:\r
67         filtered = [o for o in self.oclouds if o.id == ocloudid]\r
68         return filtered.pop()\r
69 \r
70     def _list(self) -> List[ocloud.Ocloud]:\r
71         return [x for x in self.oclouds]\r
72 \r
73     def _update(self, ocloud: ocloud.Ocloud):\r
74         filtered = [o for o in self.oclouds if o.id == ocloud.id]\r
75         assert len(filtered) == 1\r
76         ocloud1 = filtered.pop()\r
77         ocloud1.update_by(ocloud)\r
78 \r
79 \r
80 class FakeStxObjRepo(StxObjectRepository):\r
81     def __init__(self):\r
82         super().__init__()\r
83         self.oclouds = []\r
84 \r
85     def _add(self, ocloud: ocloud.Ocloud):\r
86         self.oclouds.append(ocloud)\r
87 \r
88     def _get(self, ocloudid) -> ocloud.Ocloud:\r
89         filtered = [o for o in self.oclouds if o.id == ocloudid]\r
90         return filtered.pop()\r
91 \r
92     def _list(self, type: ResourceTypeEnum) -> List[ocloud.Ocloud]:\r
93         return [x for x in self.oclouds]\r
94 \r
95     def _update(self, ocloud: ocloud.Ocloud):\r
96         filtered = [o for o in self.oclouds if o.id == ocloud.id]\r
97         assert len(filtered) == 1\r
98         ocloud1 = filtered.pop()\r
99         ocloud1.update_by(ocloud)\r
100 \r
101 \r
102 class FakeUnitOfWork(AbstractUnitOfWork):\r
103     def __init__(self):\r
104         pass\r
105 \r
106     def __enter__(self):\r
107         # self.session = self.session_factory()  # type: Session\r
108         # self.oclouds = OcloudSqlAlchemyRepository(self.session)\r
109         self.stxobjects = FakeStxObjRepo()\r
110         return super().__enter__()\r
111 \r
112     def __exit__(self, *args):\r
113         super().__exit__(*args)\r
114         # self.session.close()\r
115 \r
116     def _commit(self):\r
117         pass\r
118         # self.session.commit()\r
119 \r
120     def rollback(self):\r
121         pass\r
122         # self.session.rollback()\r
123 \r
124     def collect_new_events(self):\r
125         yield\r
126         # return super().collect_new_events()\r
127 \r
128 \r
129 def create_fake_bus(uow):\r
130     def update_ocloud(\r
131             cmd: commands.UpdateOCloud,\r
132             uow: AbstractUnitOfWork,\r
133             publish: Callable):\r
134         return\r
135 \r
136     fakeuow = FakeUnitOfWork()\r
137     handlers.EVENT_HANDLERS = {}\r
138     handlers.COMMAND_HANDLERS = {\r
139         commands.UpdateOCloud: update_ocloud,\r
140     }\r
141     bus = bootstrap.bootstrap(False, fakeuow)\r
142     return bus\r
143 \r
144 \r
145 def test_probe_new_ocloud():\r
146     fakeuow = FakeUnitOfWork()\r
147     bus = create_fake_bus(fakeuow)\r
148     fakeClient = FakeOcloudClient()\r
149     ocloudwatcher = OcloudWatcher(fakeClient, bus)\r
150     cmds = ocloudwatcher.probe()\r
151     assert cmds is not None\r
152     assert len(cmds) == 1\r
153     assert cmds[0].data.name == "stx1"\r
154     # assert len(fakeuow.stxobjects.oclouds) == 1\r
155     # assert fakeuow.stxobjects.oclouds[0].name == "stx1"\r
156 \r
157 \r
158 def test_watchers_worker():\r
159     testedworker = worker.PollWorker()\r
160 \r
161     class FakeOCloudWatcher(BaseWatcher):\r
162         def __init__(self, client: BaseClient,\r
163                      bus: messagebus) -> None:\r
164             super().__init__(client, None)\r
165             self.fakeOcloudWatcherCounter = 0\r
166             self._client = client\r
167             self._bus = bus\r
168 \r
169         def _targetname(self):\r
170             return "fakeocloudwatcher"\r
171 \r
172         def _probe(self, parent: object = None):\r
173             # import pdb; pdb.set_trace()\r
174             self.fakeOcloudWatcherCounter += 1\r
175             # hacking to stop the blocking sched task\r
176             if self.fakeOcloudWatcherCounter > 2:\r
177                 testedworker.stop()\r
178             return []\r
179 \r
180     # fakeRepo = FakeOcloudRepo()\r
181     fakeuow = FakeUnitOfWork()\r
182     bus = create_fake_bus(fakeuow)\r
183 \r
184     fakeClient = FakeOcloudClient()\r
185     fakewatcher = FakeOCloudWatcher(fakeClient, bus)\r
186 \r
187     root = WatcherTree(fakewatcher)\r
188 \r
189     testedworker.set_interval(1)\r
190     testedworker.add_watcher(root)\r
191     assert fakewatcher.fakeOcloudWatcherCounter == 0\r
192 \r
193     count1 = fakewatcher.fakeOcloudWatcherCounter\r
194     testedworker.start()\r
195     time.sleep(20)\r
196     assert fakewatcher.fakeOcloudWatcherCounter > count1\r
197 \r
198     # assumed hacking: probe has stopped the sched task\r
199     count3 = fakewatcher.fakeOcloudWatcherCounter\r
200     time.sleep(3)\r
201     assert fakewatcher.fakeOcloudWatcherCounter == count3\r