1 # Copyright (C) 2021 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
18 from datetime import datetime
19 from unittest.mock import MagicMock
20 from typing import Callable
22 from o2common.service.watcher import worker
23 from o2common.service.unit_of_work import AbstractUnitOfWork
24 from o2common.service.client.base_client import BaseClient
25 from o2common.service.watcher.base import BaseWatcher, WatcherTree
26 from o2common.service import messagebus
27 from o2common.config import config
29 from o2ims.domain.resource_type import ResourceTypeEnum
30 from o2ims.domain import alarm_obj
31 from o2ims.domain import commands
32 from o2ims.views import alarm_view
33 from o2ims.service.watcher.alarm_watcher import AlarmWatcher
35 from o2app.service import handlers
36 from o2app import bootstrap
39 def test_new_alarm_event_record():
40 alarm_event_record_id1 = str(uuid.uuid4())
41 alarm_event_record = alarm_obj.AlarmEventRecord(
42 alarm_event_record_id1, '',
43 '', '', '', alarm_obj.PerceivedSeverityEnum.CRITICAL)
44 assert alarm_event_record_id1 is not None and \
45 alarm_event_record.alarmEventRecordId == alarm_event_record_id1
48 def test_view_alarm_event_records(mock_uow):
49 session, uow = mock_uow
51 alarm_event_record_id1 = str(uuid.uuid4())
52 alarm_event_record1 = MagicMock()
53 alarm_event_record1.serialize.return_value = {
54 "alarmEventRecordId": alarm_event_record_id1}
56 order_by = MagicMock()
57 order_by.count.return_value = 1
58 order_by.limit.return_value.offset.return_value = [alarm_event_record1]
59 session.return_value.query.return_value.filter_by.return_value.\
60 order_by.return_value = order_by
62 result = alarm_view.alarm_event_records(uow)
63 assert result['count'] == 1
64 ret_list = result['results']
65 assert str(ret_list[0].get("alarmEventRecordId")) == alarm_event_record_id1
68 def test_view_alarm_event_record_one(mock_uow):
69 session, uow = mock_uow
71 alarm_event_record_id1 = str(uuid.uuid4())
72 session.return_value.query.return_value.filter_by.return_value.first.\
73 return_value.serialize.return_value = None
76 alarm_event_record1 = alarm_view.alarm_event_record_one(
77 alarm_event_record_id1, uow)
78 assert alarm_event_record1 is None
80 session.return_value.query.return_value.filter_by.return_value.first.\
81 return_value.serialize.return_value = {
82 "alarmEventRecordId": alarm_event_record_id1}
84 alarm_event_record1 = alarm_view.alarm_event_record_one(
85 alarm_event_record_id1, uow)
86 assert str(alarm_event_record1.get(
87 "alarmEventRecordId")) == alarm_event_record_id1
90 def test_view_alarm_subscriptions(mock_uow):
91 session, uow = mock_uow
93 subscription_id1 = str(uuid.uuid4())
95 sub1.serialize.return_value = {
96 "alarmSubscriptionId": subscription_id1,
99 order_by = MagicMock()
100 order_by.count.return_value = 1
101 order_by.limit.return_value.offset.return_value = [sub1]
102 session.return_value.query.return_value.filter_by.return_value.\
103 order_by.return_value = order_by
105 result = alarm_view.subscriptions(uow)
106 assert result['count'] == 1
107 ret_list = result['results']
108 assert str(ret_list[0].get("alarmSubscriptionId")) == subscription_id1
111 def test_view_alarm_subscription_one(mock_uow):
112 session, uow = mock_uow
114 subscription_id1 = str(uuid.uuid4())
115 session.return_value.query.return_value.filter_by.return_value.first.\
116 return_value.serialize.return_value = None
119 subscription_res = alarm_view.subscription_one(
120 subscription_id1, uow)
121 assert subscription_res is None
123 session.return_value.query.return_value.filter_by.return_value.first.\
124 return_value.serialize.return_value = {
125 "alarmSubscriptionId": subscription_id1,
128 subscription_res = alarm_view.subscription_one(
129 subscription_id1, uow)
130 assert str(subscription_res.get(
131 "alarmSubscriptionId")) == subscription_id1
134 def test_alarm_dictionary(mock_uow):
135 session, uow = mock_uow
136 alarm_dict1 = alarm_obj.AlarmDictionary('test1')
137 alarm_dict1.entityType = 'test1'
139 uow.alarm_dictionaries.add(alarm_dict1)
141 alarm_dict2 = uow.alarm_dictionaries.get('test1')
142 assert alarm_dict1 == alarm_dict2
144 dict_list = uow.alarm_dictionaries.list()
145 assert len(dict_list) > 0
148 def test_flask_get_list(mock_flask_uow):
149 session, app = mock_flask_uow
150 order_by = MagicMock()
151 order_by.count.return_value = 0
152 order_by.limit.return_value.offset.return_value = []
153 session.return_value.query.return_value.filter_by.return_value.\
154 order_by.return_value = order_by
155 apibase = config.get_o2ims_monitoring_api_base()
157 with app.test_client() as client:
158 # Get list and return empty list
159 ##########################
160 resp = client.get(apibase+"/alarms")
161 assert resp.get_data() == b'[]\n'
163 resp = client.get(apibase+"/alarmSubscriptions")
164 assert resp.get_data() == b'[]\n'
167 def test_flask_get_one(mock_flask_uow):
168 session, app = mock_flask_uow
170 session.return_value.query.return_value.filter_by.return_value.\
171 first.return_value = None
172 apibase = config.get_o2ims_monitoring_api_base()
174 with app.test_client() as client:
175 # Get one and return 404
176 ###########################
177 alarm_id1 = str(uuid.uuid4())
178 resp = client.get(apibase+"/alarms/"+alarm_id1)
179 assert resp.status_code == 404
181 sub_id1 = str(uuid.uuid4())
182 resp = client.get(apibase+"/alarmSubscriptions/"+sub_id1)
183 assert resp.status_code == 404
186 def test_flask_post(mock_flask_uow):
187 session, app = mock_flask_uow
188 apibase = config.get_o2ims_monitoring_api_base()
190 with app.test_client() as client:
191 session.return_value.execute.return_value = []
193 sub_callback = 'http://subscription/callback/url'
194 resp = client.post(apibase+'/alarmSubscriptions', json={
195 'callback': sub_callback,
196 'consumerSubscriptionId': 'consumerSubId1',
199 assert resp.status_code == 201
200 assert 'alarmSubscriptionId' in resp.get_json()
203 def test_flask_delete(mock_flask_uow):
204 session, app = mock_flask_uow
205 apibase = config.get_o2ims_monitoring_api_base()
207 with app.test_client() as client:
208 session.return_value.execute.return_value.first.return_value = {}
210 subscription_id1 = str(uuid.uuid4())
211 resp = client.delete(apibase+"/alarmSubscriptions/"+subscription_id1)
212 assert resp.status_code == 204
215 def test_flask_not_allowed(mock_flask_uow):
216 _, app = mock_flask_uow
217 apibase = config.get_o2ims_monitoring_api_base()
219 with app.test_client() as client:
220 # Testing resource type not support method
221 ##########################
222 uri = apibase + "/alarms"
223 resp = client.post(uri)
224 assert resp.status == '405 METHOD NOT ALLOWED'
225 resp = client.put(uri)
226 assert resp.status == '405 METHOD NOT ALLOWED'
227 resp = client.patch(uri)
228 assert resp.status == '405 METHOD NOT ALLOWED'
229 resp = client.delete(uri)
230 assert resp.status == '405 METHOD NOT ALLOWED'
233 class FakeAlarmClient(BaseClient):
236 fakeAlarm = alarm_obj.FaultGenericModel(ResourceTypeEnum.OCLOUD)
237 fakeAlarm.id = str(uuid.uuid4())
238 fakeAlarm.name = 'alarm'
239 fakeAlarm.content = json.dumps({})
240 fakeAlarm.createtime = datetime.now()
241 fakeAlarm.updatetime = datetime.now()
242 fakeAlarm.hash = str(hash((fakeAlarm.id, fakeAlarm.updatetime)))
243 self.fakeAlarm = fakeAlarm
245 def _get(self, id) -> alarm_obj.FaultGenericModel:
246 return self.fakeAlarm
249 return [self.fakeAlarm]
251 def _set_stx_client(self):
255 # class FakeStxObjRepo(StxObjectRepository):
256 # def __init__(self):
260 # def _add(self, alarm: alarm_obj.AlarmEventRecord):
261 # self.alarms.append(alarm)
263 # def _get(self, alarmid) -> alarm_obj.AlarmEventRecord:
264 # filtered = [a for a in self.alarms if a.id == alarmid]
265 # return filtered.pop()
267 # def _list(self) -> List[alarm_obj.AlarmEventRecord]:
268 # return [x for x in self.oclouds]
270 # def _update(self, alarm: alarm_obj.AlarmEventRecord):
271 # filtered = [a for a in self.alarms if a.id == alarm.id]
272 # assert len(filtered) == 1
273 # ocloud1 = filtered.pop()
274 # ocloud1.update_by(alarm)
277 class FakeUnitOfWork(AbstractUnitOfWork):
278 def __init__(self, session_factory=None):
279 self.session_factory = session_factory
282 self.session = self.session_factory
283 # self.stxobjects = FakeStxObjRepo()
284 return super().__enter__()
286 def __exit__(self, *args):
287 super().__exit__(*args)
288 # self.session.close()
292 # self.session.commit()
296 # self.session.rollback()
298 def collect_new_events(self):
300 # return super().collect_new_events()
303 def create_alarm_fake_bus(uow):
305 cmd: commands.UpdateAlarm,
306 uow: AbstractUnitOfWork,
310 handlers.EVENT_HANDLERS = {}
311 handlers.COMMAND_HANDLERS = {
312 commands.UpdateAlarm: update_alarm,
314 bus = bootstrap.bootstrap(False, uow)
318 def test_probe_new_alarm():
319 session = MagicMock()
320 session.return_value.execute.return_value = []
321 fakeuow = FakeUnitOfWork(session)
322 bus = create_alarm_fake_bus(fakeuow)
323 fakeClient = FakeAlarmClient()
324 alarmwatcher = AlarmWatcher(fakeClient, bus)
325 cmds = alarmwatcher.probe()
326 assert cmds is not None
327 assert len(cmds) == 1
328 assert cmds[0].data.name == "alarm"
329 # assert len(fakeuow.stxobjects.oclouds) == 1
330 # assert fakeuow.stxobjects.oclouds[0].name == "stx1"
333 def test_watchers_worker():
334 testedworker = worker.PollWorker()
336 class FakeAlarmWatcher(BaseWatcher):
337 def __init__(self, client: BaseClient,
338 bus: messagebus) -> None:
339 super().__init__(client, None)
340 self.fakeOcloudWatcherCounter = 0
341 self._client = client
344 def _targetname(self):
345 return "fakealarmwatcher"
347 def _probe(self, parent: object = None, tags=None):
348 # import pdb; pdb.set_trace()
349 self.fakeOcloudWatcherCounter += 1
350 # hacking to stop the blocking sched task
351 if self.fakeOcloudWatcherCounter > 2:
355 # fakeRepo = FakeOcloudRepo()
356 fakeuow = FakeUnitOfWork()
357 bus = create_alarm_fake_bus(fakeuow)
359 fakeClient = FakeAlarmClient()
360 fakewatcher = FakeAlarmWatcher(fakeClient, bus)
362 root = WatcherTree(fakewatcher)
364 testedworker.set_interval(1)
365 testedworker.add_watcher(root)
366 assert fakewatcher.fakeOcloudWatcherCounter == 0
368 count1 = fakewatcher.fakeOcloudWatcherCounter
371 assert fakewatcher.fakeOcloudWatcherCounter > count1
373 # assumed hacking: probe has stopped the sched task
374 count3 = fakewatcher.fakeOcloudWatcherCounter
376 assert fakewatcher.fakeOcloudWatcherCounter == count3