b0aded4f67e3f01d5165e33dedd4144f8eb49a4f
[pti/o2.git] / tests / unit / test_watcher.py
1 # Copyright (C) 2021 Wind River Systems, Inc.\r
2 #\r
3 #  Licensed under the Apache License, Version 2.0 (the "License");\r
4 #  you may not use this file except in compliance with the License.\r
5 #  You may obtain a copy of the License at\r
6 #\r
7 #      http://www.apache.org/licenses/LICENSE-2.0\r
8 #\r
9 #  Unless required by applicable law or agreed to in writing, software\r
10 #  distributed under the License is distributed on an "AS IS" BASIS,\r
11 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
12 #  See the License for the specific language governing permissions and\r
13 #  limitations under the License.\r
14 \r
15 import time\r
16 from datetime import datetime\r
17 import json\r
18 from typing import List\r
19 from o2ims.domain.resource_type import ResourceTypeEnum\r
20 from o2ims.service.client.base_client import BaseClient\r
21 from o2ims.domain import ocloud\r
22 from o2ims import config\r
23 import uuid\r
24 from o2ims.service.watcher.base import BaseWatcher, WatcherTree\r
25 from o2ims.domain import stx_object as ocloudModel\r
26 from o2ims.adapter.ocloud_repository import OcloudRepository\r
27 from o2ims.domain.stx_repo import StxObjectRepository\r
28 from o2ims.service.watcher import worker\r
29 from o2ims.service.unit_of_work import AbstractUnitOfWork\r
30 from o2ims.service.watcher.ocloud_watcher import OcloudWatcher\r
31 \r
32 \r
33 class FakeOcloudClient(BaseClient):\r
34     def __init__(self):\r
35         super().__init__()\r
36         fakeCloud = ocloudModel.StxGenericModel(ResourceTypeEnum.OCLOUD)\r
37         fakeCloud.id = uuid.uuid4()\r
38         fakeCloud.name = 'stx1'\r
39         fakeCloud.content = json.dumps({})\r
40         fakeCloud.createtime = datetime.now()\r
41         fakeCloud.updatetime = datetime.now()\r
42         fakeCloud.hash = str(hash((fakeCloud.id, fakeCloud.updatetime)))\r
43         self.fakeCloud = fakeCloud\r
44 \r
45     def _get(self, id) -> ocloudModel.StxGenericModel:\r
46         return self.fakeCloud\r
47 \r
48     def _list(self):\r
49         return [self.fakeCloud]\r
50 \r
51 \r
52 class FakeOcloudRepo(OcloudRepository):\r
53     def __init__(self):\r
54         super().__init__()\r
55         self.oclouds = []\r
56 \r
57     def _add(self, ocloud: ocloud.Ocloud):\r
58         self.oclouds.append(ocloud)\r
59 \r
60     def _get(self, ocloudid) -> ocloud.Ocloud:\r
61         filtered = [o for o in self.oclouds if o.id == ocloudid]\r
62         return filtered.pop()\r
63 \r
64     def _list(self) -> List[ocloud.Ocloud]:\r
65         return [x for x in self.oclouds]\r
66 \r
67     def _update(self, ocloud: ocloud.Ocloud):\r
68         filtered = [o for o in self.oclouds if o.id == ocloud.id]\r
69         assert len(filtered) == 1\r
70         ocloud1 = filtered.pop()\r
71         ocloud1.update_by(ocloud)\r
72 \r
73 \r
74 \r
75 class FakeStxObjRepo(StxObjectRepository):\r
76     def __init__(self):\r
77         super().__init__()\r
78         self.oclouds = []\r
79 \r
80     def _add(self, ocloud: ocloud.Ocloud):\r
81         self.oclouds.append(ocloud)\r
82 \r
83     def _get(self, ocloudid) -> ocloud.Ocloud:\r
84         filtered = [o for o in self.oclouds if o.id == ocloudid]\r
85         return filtered.pop()\r
86 \r
87     def _list(self, type: ResourceTypeEnum) -> List[ocloud.Ocloud]:\r
88         return [x for x in self.oclouds]\r
89 \r
90     def _update(self, ocloud: ocloud.Ocloud):\r
91         filtered = [o for o in self.oclouds if o.id == ocloud.id]\r
92         assert len(filtered) == 1\r
93         ocloud1 = filtered.pop()\r
94         ocloud1.update_by(ocloud)\r
95 \r
96 \r
97 class FakeUnitOfWork(AbstractUnitOfWork):\r
98     def __init__(self):\r
99         pass\r
100 \r
101     def __enter__(self):\r
102         # self.session = self.session_factory()  # type: Session\r
103         # self.oclouds = OcloudSqlAlchemyRepository(self.session)\r
104         self.stxobjects = FakeStxObjRepo()\r
105         return super().__enter__()\r
106 \r
107     def __exit__(self, *args):\r
108         super().__exit__(*args)\r
109         # self.session.close()\r
110 \r
111     def _commit(self):\r
112         pass\r
113         # self.session.commit()\r
114 \r
115     def rollback(self):\r
116         pass\r
117         # self.session.rollback()\r
118 \r
119 \r
120 def test_probe_new_ocloud():\r
121     # fakeRepo = FakeOcloudRepo()\r
122     fakeuow = FakeUnitOfWork()\r
123     fakeClient = FakeOcloudClient()\r
124     ocloudwatcher = OcloudWatcher(fakeClient, fakeuow)\r
125     ocloudwatcher.probe()\r
126     assert len(fakeuow.stxobjects.oclouds) == 1\r
127     assert fakeuow.stxobjects.oclouds[0].name == "stx1"\r
128 \r
129 \r
130 def test_watchers_worker():\r
131     testedworker = worker.PollWorker()\r
132 \r
133     class FakeOCloudWatcher(BaseWatcher):\r
134         def __init__(self, client: BaseClient,\r
135                      repo: OcloudRepository) -> None:\r
136             super().__init__(client, None)\r
137             self.fakeOcloudWatcherCounter = 0\r
138             self._client = client\r
139             self._repo = repo\r
140 \r
141         def _targetname(self):\r
142             return "fakeocloudwatcher"\r
143 \r
144         def _probe(self, parent: object=None):\r
145             # import pdb; pdb.set_trace()\r
146             self.fakeOcloudWatcherCounter += 1\r
147             # hacking to stop the blocking sched task\r
148             if self.fakeOcloudWatcherCounter > 2:\r
149                 testedworker.stop()\r
150             return []\r
151 \r
152 \r
153     # fakeRepo = FakeOcloudRepo()\r
154     fakeuow = FakeUnitOfWork()\r
155 \r
156     fakeClient = FakeOcloudClient()\r
157     fakewatcher = FakeOCloudWatcher(fakeClient, fakeuow)\r
158 \r
159     root = WatcherTree(fakewatcher)\r
160 \r
161     testedworker.set_interval(1)\r
162     testedworker.add_watcher(root)\r
163     assert fakewatcher.fakeOcloudWatcherCounter == 0\r
164 \r
165     count1 = fakewatcher.fakeOcloudWatcherCounter\r
166     testedworker.start()\r
167     time.sleep(20)\r
168     assert fakewatcher.fakeOcloudWatcherCounter > count1\r
169 \r
170     # assumed hacking: probe has stopped the sched task\r
171     count3 = fakewatcher.fakeOcloudWatcherCounter\r
172     time.sleep(3)\r
173     assert fakewatcher.fakeOcloudWatcherCounter == count3\r