Convert file endlines to Unix (LF)
[pti/o2.git] / tests / unit / test_watcher.py
1 # Copyright (C) 2021 Wind River Systems, Inc.
2 #
3 #  Licensed under the Apache License, Version 2.0 (the "License");
4 #  you may not use this file except in compliance with the License.
5 #  You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #  Unless required by applicable law or agreed to in writing, software
10 #  distributed under the License is distributed on an "AS IS" BASIS,
11 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #  See the License for the specific language governing permissions and
13 #  limitations under the License.
14
15 import time
16 from datetime import datetime
17 import json
18 from typing import Callable, List
19 # from o2common.config import config
20 import uuid
21 from o2common.service.watcher.base import BaseWatcher, WatcherTree
22 from o2common.service.watcher import worker
23 from o2common.service.unit_of_work import AbstractUnitOfWork
24 from o2common.service import messagebus
25
26 from o2ims.domain.resource_type import ResourceTypeEnum
27 from o2ims.domain import stx_object as ocloudModel
28 from o2ims.adapter.ocloud_repository import OcloudRepository
29 from o2ims.domain.stx_repo import StxObjectRepository
30 from o2ims.service.watcher.ocloud_watcher import OcloudWatcher
31 from o2ims.domain import commands
32 from o2common.service.client.base_client import BaseClient
33 from o2ims.domain import ocloud
34
35 from o2app.service import handlers
36 from o2app import bootstrap
37
38
39 class FakeOcloudClient(BaseClient):
40     def __init__(self):
41         super().__init__()
42         fakeCloud = ocloudModel.StxGenericModel(ResourceTypeEnum.OCLOUD)
43         fakeCloud.id = uuid.uuid4()
44         fakeCloud.name = 'stx1'
45         fakeCloud.content = json.dumps({})
46         fakeCloud.createtime = datetime.now()
47         fakeCloud.updatetime = datetime.now()
48         fakeCloud.hash = str(hash((fakeCloud.id, fakeCloud.updatetime)))
49         self.fakeCloud = fakeCloud
50
51     def _get(self, id) -> ocloudModel.StxGenericModel:
52         return self.fakeCloud
53
54     def _list(self):
55         return [self.fakeCloud]
56
57
58 class FakeOcloudRepo(OcloudRepository):
59     def __init__(self):
60         super().__init__()
61         self.oclouds = []
62
63     def _add(self, ocloud: ocloud.Ocloud):
64         self.oclouds.append(ocloud)
65
66     def _get(self, ocloudid) -> ocloud.Ocloud:
67         filtered = [o for o in self.oclouds if o.id == ocloudid]
68         return filtered.pop()
69
70     def _list(self) -> List[ocloud.Ocloud]:
71         return [x for x in self.oclouds]
72
73     def _update(self, ocloud: ocloud.Ocloud):
74         filtered = [o for o in self.oclouds if o.id == ocloud.id]
75         assert len(filtered) == 1
76         ocloud1 = filtered.pop()
77         ocloud1.update_by(ocloud)
78
79
80 class FakeStxObjRepo(StxObjectRepository):
81     def __init__(self):
82         super().__init__()
83         self.oclouds = []
84
85     def _add(self, ocloud: ocloud.Ocloud):
86         self.oclouds.append(ocloud)
87
88     def _get(self, ocloudid) -> ocloud.Ocloud:
89         filtered = [o for o in self.oclouds if o.id == ocloudid]
90         return filtered.pop()
91
92     def _list(self, type: ResourceTypeEnum) -> List[ocloud.Ocloud]:
93         return [x for x in self.oclouds]
94
95     def _update(self, ocloud: ocloud.Ocloud):
96         filtered = [o for o in self.oclouds if o.id == ocloud.id]
97         assert len(filtered) == 1
98         ocloud1 = filtered.pop()
99         ocloud1.update_by(ocloud)
100
101
102 class FakeUnitOfWork(AbstractUnitOfWork):
103     def __init__(self):
104         pass
105
106     def __enter__(self):
107         # self.session = self.session_factory()  # type: Session
108         # self.oclouds = OcloudSqlAlchemyRepository(self.session)
109         self.stxobjects = FakeStxObjRepo()
110         return super().__enter__()
111
112     def __exit__(self, *args):
113         super().__exit__(*args)
114         # self.session.close()
115
116     def _commit(self):
117         pass
118         # self.session.commit()
119
120     def rollback(self):
121         pass
122         # self.session.rollback()
123
124     def collect_new_events(self):
125         yield
126         # return super().collect_new_events()
127
128
129 def create_fake_bus(uow):
130     def update_ocloud(
131             cmd: commands.UpdateOCloud,
132             uow: AbstractUnitOfWork,
133             publish: Callable):
134         return
135
136     fakeuow = FakeUnitOfWork()
137     handlers.EVENT_HANDLERS = {}
138     handlers.COMMAND_HANDLERS = {
139         commands.UpdateOCloud: update_ocloud,
140     }
141     bus = bootstrap.bootstrap(False, fakeuow)
142     return bus
143
144
145 def test_probe_new_ocloud():
146     fakeuow = FakeUnitOfWork()
147     bus = create_fake_bus(fakeuow)
148     fakeClient = FakeOcloudClient()
149     ocloudwatcher = OcloudWatcher(fakeClient, bus)
150     cmds = ocloudwatcher.probe()
151     assert cmds is not None
152     assert len(cmds) == 1
153     assert cmds[0].data.name == "stx1"
154     # assert len(fakeuow.stxobjects.oclouds) == 1
155     # assert fakeuow.stxobjects.oclouds[0].name == "stx1"
156
157
158 def test_watchers_worker():
159     testedworker = worker.PollWorker()
160
161     class FakeOCloudWatcher(BaseWatcher):
162         def __init__(self, client: BaseClient,
163                      bus: messagebus) -> None:
164             super().__init__(client, None)
165             self.fakeOcloudWatcherCounter = 0
166             self._client = client
167             self._bus = bus
168
169         def _targetname(self):
170             return "fakeocloudwatcher"
171
172         def _probe(self, parent: object = None):
173             # import pdb; pdb.set_trace()
174             self.fakeOcloudWatcherCounter += 1
175             # hacking to stop the blocking sched task
176             if self.fakeOcloudWatcherCounter > 2:
177                 testedworker.stop()
178             return []
179
180     # fakeRepo = FakeOcloudRepo()
181     fakeuow = FakeUnitOfWork()
182     bus = create_fake_bus(fakeuow)
183
184     fakeClient = FakeOcloudClient()
185     fakewatcher = FakeOCloudWatcher(fakeClient, bus)
186
187     root = WatcherTree(fakewatcher)
188
189     testedworker.set_interval(1)
190     testedworker.add_watcher(root)
191     assert fakewatcher.fakeOcloudWatcherCounter == 0
192
193     count1 = fakewatcher.fakeOcloudWatcherCounter
194     testedworker.start()
195     time.sleep(20)
196     assert fakewatcher.fakeOcloudWatcherCounter > count1
197
198     # assumed hacking: probe has stopped the sched task
199     count3 = fakewatcher.fakeOcloudWatcherCounter
200     time.sleep(3)
201     assert fakewatcher.fakeOcloudWatcherCounter == count3