2bdb31c73bc4120806cc1cb27ddbeb3681272da2
[pti/o2.git] / tests / unit / test_alarm.py
1 # Copyright (C) 2021 Wind River Systems, Inc.
2 #
3 #  Licensed under the Apache License, Version 2.0 (the "License");
4 #  you may not use this file except in compliance with the License.
5 #  You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #  Unless required by applicable law or agreed to in writing, software
10 #  distributed under the License is distributed on an "AS IS" BASIS,
11 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #  See the License for the specific language governing permissions and
13 #  limitations under the License.
14
15 import uuid
16 import time
17 import json
18 from datetime import datetime
19 from unittest.mock import MagicMock
20 from typing import Callable
21
22 from o2common.service.watcher import worker
23 from o2common.service.unit_of_work import AbstractUnitOfWork
24 from o2common.service.client.base_client import BaseClient
25 from o2common.service.watcher.base import BaseWatcher, WatcherTree
26 from o2common.service import messagebus
27 from o2common.config import config
28
29 from o2ims.domain.resource_type import ResourceTypeEnum
30 from o2ims.domain import alarm_obj
31 from o2ims.domain import commands
32 from o2ims.views import alarm_view
33 from o2ims.service.watcher.alarm_watcher import AlarmWatcher
34
35 from o2app.service import handlers
36 from o2app import bootstrap
37
38
39 def test_new_alarm_event_record():
40     alarm_event_record_id1 = str(uuid.uuid4())
41     alarm_event_record = alarm_obj.AlarmEventRecord(
42         alarm_event_record_id1, '',
43         '', '', '', alarm_obj.PerceivedSeverityEnum.CRITICAL)
44     assert alarm_event_record_id1 is not None and \
45         alarm_event_record.alarmEventRecordId == alarm_event_record_id1
46
47
48 def test_view_alarm_event_records(mock_uow):
49     session, uow = mock_uow
50
51     alarm_event_record_id1 = str(uuid.uuid4())
52     alarm_event_record1 = MagicMock()
53     alarm_event_record1.serialize.return_value = {
54         "alarmEventRecordId": alarm_event_record_id1}
55
56     order_by = MagicMock()
57     order_by.count.return_value = 1
58     order_by.limit.return_value.offset.return_value = [alarm_event_record1]
59     session.return_value.query.return_value.filter.return_value.\
60         order_by.return_value = order_by
61
62     result = alarm_view.alarm_event_records(uow)
63     assert result['count'] == 1
64     ret_list = result['results']
65     assert str(ret_list[0].get("alarmEventRecordId")) == alarm_event_record_id1
66
67
68 def test_view_alarm_event_record_one(mock_uow):
69     session, uow = mock_uow
70
71     alarm_event_record_id1 = str(uuid.uuid4())
72     session.return_value.query.return_value.filter_by.return_value.first.\
73         return_value.serialize.return_value = None
74
75     # Query return None
76     alarm_event_record1 = alarm_view.alarm_event_record_one(
77         alarm_event_record_id1, uow)
78     assert alarm_event_record1 is None
79
80     session.return_value.query.return_value.filter_by.return_value.first.\
81         return_value.serialize.return_value = {
82             "alarmEventRecordId": alarm_event_record_id1}
83
84     alarm_event_record1 = alarm_view.alarm_event_record_one(
85         alarm_event_record_id1, uow)
86     assert str(alarm_event_record1.get(
87         "alarmEventRecordId")) == alarm_event_record_id1
88
89
90 def test_view_alarm_subscriptions(mock_uow):
91     session, uow = mock_uow
92
93     subscription_id1 = str(uuid.uuid4())
94     sub1 = MagicMock()
95     sub1.serialize.return_value = {
96         "alarmSubscriptionId": subscription_id1,
97     }
98
99     order_by = MagicMock()
100     order_by.count.return_value = 1
101     order_by.limit.return_value.offset.return_value = [sub1]
102     session.return_value.query.return_value.filter.return_value.\
103         order_by.return_value = order_by
104
105     result = alarm_view.subscriptions(uow)
106     assert result['count'] == 1
107     ret_list = result['results']
108     assert str(ret_list[0].get("alarmSubscriptionId")) == subscription_id1
109
110
111 def test_view_alarm_subscription_one(mock_uow):
112     session, uow = mock_uow
113
114     subscription_id1 = str(uuid.uuid4())
115     session.return_value.query.return_value.filter_by.return_value.first.\
116         return_value.serialize.return_value = None
117
118     # Query return None
119     subscription_res = alarm_view.subscription_one(
120         subscription_id1, uow)
121     assert subscription_res is None
122
123     session.return_value.query.return_value.filter_by.return_value.first.\
124         return_value.serialize.return_value = {
125             "alarmSubscriptionId": subscription_id1,
126         }
127
128     subscription_res = alarm_view.subscription_one(
129         subscription_id1, uow)
130     assert str(subscription_res.get(
131         "alarmSubscriptionId")) == subscription_id1
132
133
134 def test_alarm_dictionary(mock_uow):
135     session, uow = mock_uow
136     alarm_dict1 = alarm_obj.AlarmDictionary('test1')
137     alarm_dict1.entityType = 'test1'
138     with uow:
139         uow.alarm_dictionaries.add(alarm_dict1)
140
141         alarm_dict2 = uow.alarm_dictionaries.get('test1')
142         assert alarm_dict1 == alarm_dict2
143
144         dict_list = uow.alarm_dictionaries.list()
145         assert len(dict_list) > 0
146
147
148 def test_flask_get_list(mock_flask_uow):
149     session, app = mock_flask_uow
150     order_by = MagicMock()
151     order_by.count.return_value = 0
152     order_by.limit.return_value.offset.return_value = []
153     session.return_value.query.return_value.filter.return_value.\
154         order_by.return_value = order_by
155     apibase = config.get_o2ims_monitoring_api_base()
156
157     with app.test_client() as client:
158         # Get list and return empty list
159         ##########################
160         resp = client.get(apibase+"/alarms")
161         assert resp.get_data() == b'[]\n'
162
163         resp = client.get(apibase+"/alarmSubscriptions")
164         assert resp.get_data() == b'[]\n'
165
166
167 def test_flask_get_one(mock_flask_uow):
168     session, app = mock_flask_uow
169
170     session.return_value.query.return_value.filter_by.return_value.\
171         first.return_value = None
172     apibase = config.get_o2ims_monitoring_api_base()
173
174     with app.test_client() as client:
175         # Get one and return 404
176         ###########################
177         alarm_id1 = str(uuid.uuid4())
178         resp = client.get(apibase+"/alarms/"+alarm_id1)
179         assert resp.status_code == 404
180
181         sub_id1 = str(uuid.uuid4())
182         resp = client.get(apibase+"/alarmSubscriptions/"+sub_id1)
183         assert resp.status_code == 404
184
185
186 def test_flask_post(mock_flask_uow):
187     session, app = mock_flask_uow
188     apibase = config.get_o2ims_monitoring_api_base()
189
190     with app.test_client() as client:
191         session.return_value.execute.return_value = []
192
193         sub_callback = 'http://subscription/callback/url'
194         resp = client.post(apibase+'/alarmSubscriptions', json={
195             'callback': sub_callback,
196             'consumerSubscriptionId': 'consumerSubId1',
197             'filter': 'empty'
198         })
199         assert resp.status_code == 201
200         assert 'alarmSubscriptionId' in resp.get_json()
201
202
203 def test_flask_delete(mock_flask_uow):
204     session, app = mock_flask_uow
205     apibase = config.get_o2ims_monitoring_api_base()
206
207     with app.test_client() as client:
208         session.return_value.execute.return_value.first.return_value = {}
209
210         subscription_id1 = str(uuid.uuid4())
211         resp = client.delete(apibase+"/alarmSubscriptions/"+subscription_id1)
212         assert resp.status_code == 204
213
214
215 def test_flask_not_allowed(mock_flask_uow):
216     _, app = mock_flask_uow
217     apibase = config.get_o2ims_monitoring_api_base()
218
219     with app.test_client() as client:
220         # Testing resource type not support method
221         ##########################
222         uri = apibase + "/alarms"
223         resp = client.post(uri)
224         assert resp.status == '405 METHOD NOT ALLOWED'
225         resp = client.put(uri)
226         assert resp.status == '405 METHOD NOT ALLOWED'
227         resp = client.patch(uri)
228         assert resp.status == '405 METHOD NOT ALLOWED'
229         resp = client.delete(uri)
230         assert resp.status == '405 METHOD NOT ALLOWED'
231
232
233 class FakeAlarmClient(BaseClient):
234     def __init__(self):
235         super().__init__()
236         fakeAlarm = alarm_obj.FaultGenericModel(ResourceTypeEnum.OCLOUD)
237         fakeAlarm.id = str(uuid.uuid4())
238         fakeAlarm.name = 'alarm'
239         fakeAlarm.content = json.dumps({})
240         fakeAlarm.createtime = datetime.now()
241         fakeAlarm.updatetime = datetime.now()
242         fakeAlarm.hash = str(hash((fakeAlarm.id, fakeAlarm.updatetime)))
243         self.fakeAlarm = fakeAlarm
244
245     def _get(self, id) -> alarm_obj.FaultGenericModel:
246         return self.fakeAlarm
247
248     def _list(self):
249         return [self.fakeAlarm]
250
251     def _set_stx_client(self):
252         pass
253
254
255 # class FakeStxObjRepo(StxObjectRepository):
256 #     def __init__(self):
257 #         super().__init__()
258 #         self.alarms = []
259
260 #     def _add(self, alarm: alarm_obj.AlarmEventRecord):
261 #         self.alarms.append(alarm)
262
263 #     def _get(self, alarmid) -> alarm_obj.AlarmEventRecord:
264 #         filtered = [a for a in self.alarms if a.id == alarmid]
265 #         return filtered.pop()
266
267 #     def _list(self) -> List[alarm_obj.AlarmEventRecord]:
268 #         return [x for x in self.oclouds]
269
270 #     def _update(self, alarm: alarm_obj.AlarmEventRecord):
271 #         filtered = [a for a in self.alarms if a.id == alarm.id]
272 #         assert len(filtered) == 1
273 #         ocloud1 = filtered.pop()
274 #         ocloud1.update_by(alarm)
275
276
277 class FakeUnitOfWork(AbstractUnitOfWork):
278     def __init__(self, session_factory=None):
279         self.session_factory = session_factory
280
281     def __enter__(self):
282         self.session = self.session_factory
283         # self.stxobjects = FakeStxObjRepo()
284         return super().__enter__()
285
286     def __exit__(self, *args):
287         super().__exit__(*args)
288         # self.session.close()
289
290     def _commit(self):
291         pass
292         # self.session.commit()
293
294     def rollback(self):
295         pass
296         # self.session.rollback()
297
298     def collect_new_events(self):
299         yield
300         # return super().collect_new_events()
301
302
303 def create_alarm_fake_bus(uow):
304     def update_alarm(
305             cmd: commands.UpdateAlarm,
306             uow: AbstractUnitOfWork,
307             publish: Callable):
308         return
309
310     handlers.EVENT_HANDLERS = {}
311     handlers.COMMAND_HANDLERS = {
312         commands.UpdateAlarm: update_alarm,
313     }
314     bus = bootstrap.bootstrap(False, uow)
315     return bus
316
317
318 def test_probe_new_alarm():
319     session = MagicMock()
320     session.return_value.execute.return_value = []
321     fakeuow = FakeUnitOfWork(session)
322     bus = create_alarm_fake_bus(fakeuow)
323     fakeClient = FakeAlarmClient()
324     alarmwatcher = AlarmWatcher(fakeClient, bus)
325     cmds = alarmwatcher.probe()
326     assert cmds is not None
327     assert len(cmds) == 1
328     assert cmds[0].data.name == "alarm"
329     # assert len(fakeuow.stxobjects.oclouds) == 1
330     # assert fakeuow.stxobjects.oclouds[0].name == "stx1"
331
332
333 def test_watchers_worker():
334     testedworker = worker.PollWorker()
335
336     class FakeAlarmWatcher(BaseWatcher):
337         def __init__(self, client: BaseClient,
338                      bus: messagebus) -> None:
339             super().__init__(client, None)
340             self.fakeOcloudWatcherCounter = 0
341             self._client = client
342             self._bus = bus
343
344         def _targetname(self):
345             return "fakealarmwatcher"
346
347         def _probe(self, parent: object = None, tags=None):
348             # import pdb; pdb.set_trace()
349             self.fakeOcloudWatcherCounter += 1
350             # hacking to stop the blocking sched task
351             if self.fakeOcloudWatcherCounter > 2:
352                 testedworker.stop()
353             return []
354
355     # fakeRepo = FakeOcloudRepo()
356     fakeuow = FakeUnitOfWork()
357     bus = create_alarm_fake_bus(fakeuow)
358
359     fakeClient = FakeAlarmClient()
360     fakewatcher = FakeAlarmWatcher(fakeClient, bus)
361
362     root = WatcherTree(fakewatcher)
363
364     testedworker.set_interval(1)
365     testedworker.add_watcher(root)
366     assert fakewatcher.fakeOcloudWatcherCounter == 0
367
368     count1 = fakewatcher.fakeOcloudWatcherCounter
369     testedworker.start()
370     time.sleep(20)
371     assert fakewatcher.fakeOcloudWatcherCounter > count1
372
373     # assumed hacking: probe has stopped the sched task
374     count3 = fakewatcher.fakeOcloudWatcherCounter
375     time.sleep(3)
376     assert fakewatcher.fakeOcloudWatcherCounter == count3