INF-303 Add Infrastructure Monitoring Fault Service; INF-305 update inventory api...
[pti/o2.git] / tests / unit / test_alarm.py
1 # Copyright (C) 2021 Wind River Systems, Inc.
2 #
3 #  Licensed under the Apache License, Version 2.0 (the "License");
4 #  you may not use this file except in compliance with the License.
5 #  You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #  Unless required by applicable law or agreed to in writing, software
10 #  distributed under the License is distributed on an "AS IS" BASIS,
11 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #  See the License for the specific language governing permissions and
13 #  limitations under the License.
14
15 import uuid
16 import time
17 import json
18 from datetime import datetime
19 from unittest.mock import MagicMock
20 from typing import Callable
21
22 from o2common.service.watcher import worker
23 from o2common.service.unit_of_work import AbstractUnitOfWork
24 from o2common.service.client.base_client import BaseClient
25 from o2common.service.watcher.base import BaseWatcher, WatcherTree
26 from o2common.service import messagebus
27 from o2common.config import config
28
29 from o2ims.domain.resource_type import ResourceTypeEnum
30 from o2ims.domain import alarm_obj
31 from o2ims.domain import commands
32 from o2ims.views import alarm_view
33 from o2ims.service.watcher.alarm_watcher import AlarmWatcher
34
35 from o2app.service import handlers
36 from o2app import bootstrap
37
38
39 def test_new_alarm_event_record():
40     alarm_event_record_id1 = str(uuid.uuid4())
41     alarm_event_record = alarm_obj.AlarmEventRecord(
42         alarm_event_record_id1, '',
43         '', '', '', alarm_obj.PerceivedSeverityEnum.CRITICAL)
44     assert alarm_event_record_id1 is not None and \
45         alarm_event_record.alarmEventRecordId == alarm_event_record_id1
46
47
48 def test_view_alarm_event_records(mock_uow):
49     session, uow = mock_uow
50
51     alarm_event_record_id1 = str(uuid.uuid4())
52     alarm_event_record1 = MagicMock()
53     alarm_event_record1.serialize.return_value = {
54         "alarmEventRecordId": alarm_event_record_id1}
55     session.return_value.query.return_value = [alarm_event_record1]
56
57     alarm_event_record_list = alarm_view.alarm_event_records(uow)
58     assert str(alarm_event_record_list[0].get(
59         "alarmEventRecordId")) == alarm_event_record_id1
60
61
62 def test_view_alarm_event_record_one(mock_uow):
63     session, uow = mock_uow
64
65     alarm_event_record_id1 = str(uuid.uuid4())
66     session.return_value.query.return_value.filter_by.return_value.first.\
67         return_value.serialize.return_value = None
68
69     # Query return None
70     alarm_event_record1 = alarm_view.alarm_event_record_one(
71         alarm_event_record_id1, uow)
72     assert alarm_event_record1 is None
73
74     session.return_value.query.return_value.filter_by.return_value.first.\
75         return_value.serialize.return_value = {
76             "alarmEventRecordId": alarm_event_record_id1}
77
78     alarm_event_record1 = alarm_view.alarm_event_record_one(
79         alarm_event_record_id1, uow)
80     assert str(alarm_event_record1.get(
81         "alarmEventRecordId")) == alarm_event_record_id1
82
83
84 def test_alarm_dictionary(mock_uow):
85     session, uow = mock_uow
86     alarm_dict1 = alarm_obj.AlarmDictionary('test1')
87     alarm_dict1.entityType = 'test1'
88     with uow:
89         uow.alarm_dictionaries.add(alarm_dict1)
90
91         alarm_dict2 = uow.alarm_dictionaries.get('test1')
92         assert alarm_dict1 == alarm_dict2
93
94         dict_list = uow.alarm_dictionaries.list()
95         assert len(dict_list) > 0
96
97
98 def test_flask_get_list(mock_flask_uow):
99     session, app = mock_flask_uow
100     session.query.return_value = []
101     apibase = config.get_o2ims_monitoring_api_base()
102
103     with app.test_client() as client:
104         # Get list and return empty list
105         ##########################
106         resp = client.get(apibase+"/alarms")
107         assert resp.get_data() == b'[]\n'
108
109         resp = client.get(apibase+"/alarmSubscriptions")
110         assert resp.get_data() == b'[]\n'
111
112
113 def test_flask_get_one(mock_flask_uow):
114     session, app = mock_flask_uow
115
116     session.return_value.query.return_value.filter_by.return_value.\
117         first.return_value = None
118     apibase = config.get_o2ims_monitoring_api_base()
119
120     with app.test_client() as client:
121         # Get one and return 404
122         ###########################
123         alarm_id1 = str(uuid.uuid4())
124         resp = client.get(apibase+"/alarms/"+alarm_id1)
125         assert resp.status_code == 404
126
127         sub_id1 = str(uuid.uuid4())
128         resp = client.get(apibase+"/alarmSubscriptions/"+sub_id1)
129         assert resp.status_code == 404
130
131
132 def test_flask_post(mock_flask_uow):
133     session, app = mock_flask_uow
134     apibase = config.get_o2ims_monitoring_api_base()
135
136     with app.test_client() as client:
137         session.return_value.execute.return_value = []
138
139         sub_callback = 'http://subscription/callback/url'
140         resp = client.post(apibase+'/alarmSubscriptions', json={
141             'callback': sub_callback,
142             'consumerSubscriptionId': 'consumerSubId1',
143             'filter': 'empty'
144         })
145         assert resp.status_code == 201
146         assert 'alarmSubscriptionId' in resp.get_json()
147
148
149 def test_flask_delete(mock_flask_uow):
150     session, app = mock_flask_uow
151     apibase = config.get_o2ims_monitoring_api_base()
152
153     with app.test_client() as client:
154         session.return_value.execute.return_value.first.return_value = {}
155
156         subscription_id1 = str(uuid.uuid4())
157         resp = client.delete(apibase+"/alarmSubscriptions/"+subscription_id1)
158         assert resp.status_code == 204
159
160
161 def test_flask_not_allowed(mock_flask_uow):
162     _, app = mock_flask_uow
163     apibase = config.get_o2ims_monitoring_api_base()
164
165     with app.test_client() as client:
166         # Testing resource type not support method
167         ##########################
168         uri = apibase + "/alarms"
169         resp = client.post(uri)
170         assert resp.status == '405 METHOD NOT ALLOWED'
171         resp = client.put(uri)
172         assert resp.status == '405 METHOD NOT ALLOWED'
173         resp = client.patch(uri)
174         assert resp.status == '405 METHOD NOT ALLOWED'
175         resp = client.delete(uri)
176         assert resp.status == '405 METHOD NOT ALLOWED'
177
178
179 class FakeAlarmClient(BaseClient):
180     def __init__(self):
181         super().__init__()
182         fakeAlarm = alarm_obj.FaultGenericModel(ResourceTypeEnum.OCLOUD)
183         fakeAlarm.id = str(uuid.uuid4())
184         fakeAlarm.name = 'alarm'
185         fakeAlarm.content = json.dumps({})
186         fakeAlarm.createtime = datetime.now()
187         fakeAlarm.updatetime = datetime.now()
188         fakeAlarm.hash = str(hash((fakeAlarm.id, fakeAlarm.updatetime)))
189         self.fakeAlarm = fakeAlarm
190
191     def _get(self, id) -> alarm_obj.FaultGenericModel:
192         return self.fakeAlarm
193
194     def _list(self):
195         return [self.fakeAlarm]
196
197     def _set_stx_client(self):
198         pass
199
200
201 # class FakeStxObjRepo(StxObjectRepository):
202 #     def __init__(self):
203 #         super().__init__()
204 #         self.alarms = []
205
206 #     def _add(self, alarm: alarm_obj.AlarmEventRecord):
207 #         self.alarms.append(alarm)
208
209 #     def _get(self, alarmid) -> alarm_obj.AlarmEventRecord:
210 #         filtered = [a for a in self.alarms if a.id == alarmid]
211 #         return filtered.pop()
212
213 #     def _list(self) -> List[alarm_obj.AlarmEventRecord]:
214 #         return [x for x in self.oclouds]
215
216 #     def _update(self, alarm: alarm_obj.AlarmEventRecord):
217 #         filtered = [a for a in self.alarms if a.id == alarm.id]
218 #         assert len(filtered) == 1
219 #         ocloud1 = filtered.pop()
220 #         ocloud1.update_by(alarm)
221
222
223 class FakeUnitOfWork(AbstractUnitOfWork):
224     def __init__(self, session_factory=None):
225         self.session_factory = session_factory
226
227     def __enter__(self):
228         self.session = self.session_factory
229         # self.stxobjects = FakeStxObjRepo()
230         return super().__enter__()
231
232     def __exit__(self, *args):
233         super().__exit__(*args)
234         # self.session.close()
235
236     def _commit(self):
237         pass
238         # self.session.commit()
239
240     def rollback(self):
241         pass
242         # self.session.rollback()
243
244     def collect_new_events(self):
245         yield
246         # return super().collect_new_events()
247
248
249 def create_alarm_fake_bus(uow):
250     def update_alarm(
251             cmd: commands.UpdateAlarm,
252             uow: AbstractUnitOfWork,
253             publish: Callable):
254         return
255
256     handlers.EVENT_HANDLERS = {}
257     handlers.COMMAND_HANDLERS = {
258         commands.UpdateAlarm: update_alarm,
259     }
260     bus = bootstrap.bootstrap(False, uow)
261     return bus
262
263
264 def test_probe_new_alarm():
265     session = MagicMock()
266     session.return_value.execute.return_value = []
267     fakeuow = FakeUnitOfWork(session)
268     bus = create_alarm_fake_bus(fakeuow)
269     fakeClient = FakeAlarmClient()
270     alarmwatcher = AlarmWatcher(fakeClient, bus)
271     cmds = alarmwatcher.probe()
272     assert cmds is not None
273     assert len(cmds) == 1
274     assert cmds[0].data.name == "alarm"
275     # assert len(fakeuow.stxobjects.oclouds) == 1
276     # assert fakeuow.stxobjects.oclouds[0].name == "stx1"
277
278
279 def test_watchers_worker():
280     testedworker = worker.PollWorker()
281
282     class FakeAlarmWatcher(BaseWatcher):
283         def __init__(self, client: BaseClient,
284                      bus: messagebus) -> None:
285             super().__init__(client, None)
286             self.fakeOcloudWatcherCounter = 0
287             self._client = client
288             self._bus = bus
289
290         def _targetname(self):
291             return "fakealarmwatcher"
292
293         def _probe(self, parent: object = None, tags=None):
294             # import pdb; pdb.set_trace()
295             self.fakeOcloudWatcherCounter += 1
296             # hacking to stop the blocking sched task
297             if self.fakeOcloudWatcherCounter > 2:
298                 testedworker.stop()
299             return []
300
301     # fakeRepo = FakeOcloudRepo()
302     fakeuow = FakeUnitOfWork()
303     bus = create_alarm_fake_bus(fakeuow)
304
305     fakeClient = FakeAlarmClient()
306     fakewatcher = FakeAlarmWatcher(fakeClient, bus)
307
308     root = WatcherTree(fakewatcher)
309
310     testedworker.set_interval(1)
311     testedworker.add_watcher(root)
312     assert fakewatcher.fakeOcloudWatcherCounter == 0
313
314     count1 = fakewatcher.fakeOcloudWatcherCounter
315     testedworker.start()
316     time.sleep(20)
317     assert fakewatcher.fakeOcloudWatcherCounter > count1
318
319     # assumed hacking: probe has stopped the sched task
320     count3 = fakewatcher.fakeOcloudWatcherCounter
321     time.sleep(3)
322     assert fakewatcher.fakeOcloudWatcherCounter == count3