Add error handling for ocloud route
[pti/o2.git] / tests / unit / test_watcher.py
1 # Copyright (C) 2021 Wind River Systems, Inc.
2 #
3 #  Licensed under the Apache License, Version 2.0 (the "License");
4 #  you may not use this file except in compliance with the License.
5 #  You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #  Unless required by applicable law or agreed to in writing, software
10 #  distributed under the License is distributed on an "AS IS" BASIS,
11 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #  See the License for the specific language governing permissions and
13 #  limitations under the License.
14
15 import time
16 from datetime import datetime
17 import json
18 from typing import Callable, List
19 # from o2common.config import config
20 import uuid
21 from o2common.service.watcher.base import BaseWatcher, WatcherTree
22 from o2common.service.watcher import worker
23 from o2common.service.unit_of_work import AbstractUnitOfWork
24 from o2common.service import messagebus
25
26 from o2ims.domain.resource_type import ResourceTypeEnum
27 from o2ims.domain import stx_object as ocloudModel
28 from o2ims.adapter.ocloud_repository import OcloudRepository
29 from o2ims.domain.stx_repo import StxObjectRepository
30 from o2ims.service.watcher.ocloud_watcher import OcloudWatcher
31 from o2ims.domain import commands
32 from o2common.service.client.base_client import BaseClient
33 from o2ims.domain import ocloud
34
35 from o2app.service import handlers
36 from o2app import bootstrap
37
38
39 class FakeOcloudClient(BaseClient):
40     def __init__(self):
41         super().__init__()
42         fakeCloud = ocloudModel.StxGenericModel(ResourceTypeEnum.OCLOUD)
43         fakeCloud.id = uuid.uuid4()
44         fakeCloud.name = 'stx1'
45         fakeCloud.content = json.dumps({})
46         fakeCloud.createtime = datetime.now()
47         fakeCloud.updatetime = datetime.now()
48         fakeCloud.hash = str(hash((fakeCloud.id, fakeCloud.updatetime)))
49         self.fakeCloud = fakeCloud
50
51     def _get(self, id) -> ocloudModel.StxGenericModel:
52         return self.fakeCloud
53
54     def _list(self):
55         return [self.fakeCloud]
56
57     def _set_stx_client(self):
58         pass
59
60
61 class FakeOcloudRepo(OcloudRepository):
62     def __init__(self):
63         super().__init__()
64         self.oclouds = []
65
66     def _add(self, ocloud: ocloud.Ocloud):
67         self.oclouds.append(ocloud)
68
69     def _get(self, ocloudid) -> ocloud.Ocloud:
70         filtered = [o for o in self.oclouds if o.id == ocloudid]
71         return filtered.pop()
72
73     def _list(self) -> List[ocloud.Ocloud]:
74         return [x for x in self.oclouds]
75
76     def _update(self, ocloud: ocloud.Ocloud):
77         filtered = [o for o in self.oclouds if o.id == ocloud.id]
78         assert len(filtered) == 1
79         ocloud1 = filtered.pop()
80         ocloud1.update_by(ocloud)
81
82
83 class FakeStxObjRepo(StxObjectRepository):
84     def __init__(self):
85         super().__init__()
86         self.oclouds = []
87
88     def _add(self, ocloud: ocloud.Ocloud):
89         self.oclouds.append(ocloud)
90
91     def _get(self, ocloudid) -> ocloud.Ocloud:
92         filtered = [o for o in self.oclouds if o.id == ocloudid]
93         return filtered.pop()
94
95     def _list(self, type: ResourceTypeEnum) -> List[ocloud.Ocloud]:
96         return [x for x in self.oclouds]
97
98     def _update(self, ocloud: ocloud.Ocloud):
99         filtered = [o for o in self.oclouds if o.id == ocloud.id]
100         assert len(filtered) == 1
101         ocloud1 = filtered.pop()
102         ocloud1.update_by(ocloud)
103
104
105 class FakeUnitOfWork(AbstractUnitOfWork):
106     def __init__(self):
107         pass
108
109     def __enter__(self):
110         # self.session = self.session_factory()  # type: Session
111         # self.oclouds = OcloudSqlAlchemyRepository(self.session)
112         self.stxobjects = FakeStxObjRepo()
113         return super().__enter__()
114
115     def __exit__(self, *args):
116         super().__exit__(*args)
117         # self.session.close()
118
119     def _commit(self):
120         pass
121         # self.session.commit()
122
123     def rollback(self):
124         pass
125         # self.session.rollback()
126
127     def collect_new_events(self):
128         yield
129         # return super().collect_new_events()
130
131
132 def create_fake_bus(uow):
133     def update_ocloud(
134             cmd: commands.UpdateOCloud,
135             uow: AbstractUnitOfWork,
136             publish: Callable):
137         return
138
139     # fakeuow = FakeUnitOfWork()
140     handlers.EVENT_HANDLERS = {}
141     handlers.COMMAND_HANDLERS = {
142         commands.UpdateOCloud: update_ocloud,
143     }
144     bus = bootstrap.bootstrap(False, uow)
145     return bus
146
147
148 def test_probe_new_ocloud():
149     fakeuow = FakeUnitOfWork()
150     bus = create_fake_bus(fakeuow)
151     fakeClient = FakeOcloudClient()
152     ocloudwatcher = OcloudWatcher(fakeClient, bus)
153     cmds = ocloudwatcher.probe()
154     assert cmds is not None
155     assert len(cmds) == 1
156     assert cmds[0].data.name == "stx1"
157     # assert len(fakeuow.stxobjects.oclouds) == 1
158     # assert fakeuow.stxobjects.oclouds[0].name == "stx1"
159
160
161 def test_watchers_worker():
162     testedworker = worker.PollWorker()
163
164     class FakeOCloudWatcher(BaseWatcher):
165         def __init__(self, client: BaseClient,
166                      bus: messagebus) -> None:
167             super().__init__(client, None)
168             self.fakeOcloudWatcherCounter = 0
169             self._client = client
170             self._bus = bus
171
172         def _targetname(self):
173             return "fakeocloudwatcher"
174
175         def _probe(self, parent: object = None, tags=None):
176             # import pdb; pdb.set_trace()
177             self.fakeOcloudWatcherCounter += 1
178             # hacking to stop the blocking sched task
179             if self.fakeOcloudWatcherCounter > 2:
180                 testedworker.stop()
181             return []
182
183     # fakeRepo = FakeOcloudRepo()
184     fakeuow = FakeUnitOfWork()
185     bus = create_fake_bus(fakeuow)
186
187     fakeClient = FakeOcloudClient()
188     fakewatcher = FakeOCloudWatcher(fakeClient, bus)
189
190     root = WatcherTree(fakewatcher)
191
192     testedworker.set_interval(1)
193     testedworker.add_watcher(root)
194     assert fakewatcher.fakeOcloudWatcherCounter == 0
195
196     count1 = fakewatcher.fakeOcloudWatcherCounter
197     testedworker.start()
198     time.sleep(1)
199     assert fakewatcher.fakeOcloudWatcherCounter > count1
200
201     # assumed hacking: probe has stopped the sched task
202     count3 = fakewatcher.fakeOcloudWatcherCounter
203     time.sleep(1)
204     assert fakewatcher.fakeOcloudWatcherCounter == count3