1 # Copyright (C) 2021 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
18 from datetime import datetime
19 from unittest.mock import MagicMock
20 from typing import Callable
22 from o2common.service.watcher import worker
23 from o2common.service.unit_of_work import AbstractUnitOfWork
24 from o2common.service.client.base_client import BaseClient
25 from o2common.service.watcher.base import BaseWatcher, WatcherTree
26 from o2common.service import messagebus
27 from o2common.config import config
29 from o2ims.domain.resource_type import ResourceTypeEnum
30 from o2ims.domain import alarm_obj
31 from o2ims.domain import commands
32 from o2ims.views import alarm_view
33 from o2ims.service.watcher.alarm_watcher import AlarmWatcher
35 from o2app.service import handlers
36 from o2app import bootstrap
39 def test_new_alarm_event_record():
40 alarm_event_record_id1 = str(uuid.uuid4())
41 alarm_event_record = alarm_obj.AlarmEventRecord(
42 alarm_event_record_id1, '',
43 '', '', '', alarm_obj.PerceivedSeverityEnum.CRITICAL)
44 assert alarm_event_record_id1 is not None and \
45 alarm_event_record.alarmEventRecordId == alarm_event_record_id1
48 def test_view_alarm_event_records(mock_uow):
49 session, uow = mock_uow
51 alarm_event_record_id1 = str(uuid.uuid4())
52 alarm_event_record1 = MagicMock()
53 alarm_event_record1.serialize.return_value = {
54 "alarmEventRecordId": alarm_event_record_id1}
56 order_by = MagicMock()
57 order_by.count.return_value = 1
58 order_by.limit.return_value.offset.return_value = [alarm_event_record1]
59 session.return_value.query.return_value.filter.return_value.\
60 order_by.return_value = order_by
62 result = alarm_view.alarm_event_records(uow)
63 assert result['count'] == 1
64 ret_list = result['results']
65 assert str(ret_list[0].get("alarmEventRecordId")) == alarm_event_record_id1
68 def test_view_alarm_event_record_one(mock_uow):
69 session, uow = mock_uow
71 alarm_event_record_id1 = str(uuid.uuid4())
72 session.return_value.query.return_value.filter_by.return_value.first.\
73 return_value.serialize.return_value = None
76 alarm_event_record1 = alarm_view.alarm_event_record_one(
77 alarm_event_record_id1, uow)
78 assert alarm_event_record1 is None
80 session.return_value.query.return_value.filter_by.return_value.first.\
81 return_value.serialize.return_value = {
82 "alarmEventRecordId": alarm_event_record_id1}
84 alarm_event_record1 = alarm_view.alarm_event_record_one(
85 alarm_event_record_id1, uow)
86 assert str(alarm_event_record1.get(
87 "alarmEventRecordId")) == alarm_event_record_id1
90 def test_view_alarm_subscriptions(mock_uow):
91 session, uow = mock_uow
93 subscription_id1 = str(uuid.uuid4())
95 sub1.serialize.return_value = {
96 "alarmSubscriptionId": subscription_id1,
99 order_by = MagicMock()
100 order_by.count.return_value = 1
101 order_by.limit.return_value.offset.return_value = [sub1]
102 session.return_value.query.return_value.filter.return_value.\
103 order_by.return_value = order_by
105 result = alarm_view.subscriptions(uow)
106 assert result['count'] == 1
107 ret_list = result['results']
108 assert str(ret_list[0].get("alarmSubscriptionId")) == subscription_id1
111 def test_view_alarm_subscription_one(mock_uow):
112 session, uow = mock_uow
114 subscription_id1 = str(uuid.uuid4())
115 session.return_value.query.return_value.filter_by.return_value.first.\
116 return_value.serialize.return_value = None
119 subscription_res = alarm_view.subscription_one(
120 subscription_id1, uow)
121 assert subscription_res is None
123 session.return_value.query.return_value.filter_by.return_value.first.\
124 return_value.serialize.return_value = {
125 "alarmSubscriptionId": subscription_id1,
128 subscription_res = alarm_view.subscription_one(
129 subscription_id1, uow)
130 assert str(subscription_res.get(
131 "alarmSubscriptionId")) == subscription_id1
134 def test_flask_get_list(mock_flask_uow):
135 session, app = mock_flask_uow
136 order_by = MagicMock()
137 order_by.count.return_value = 0
138 order_by.limit.return_value.offset.return_value = []
139 session.return_value.query.return_value.filter.return_value.\
140 order_by.return_value = order_by
141 apibase = config.get_o2ims_monitoring_api_base() + '/v1'
143 with app.test_client() as client:
144 # Get list and return empty list
145 ##########################
146 resp = client.get(apibase+"/alarms")
147 assert resp.get_data() == b'[]\n'
149 resp = client.get(apibase+"/alarmSubscriptions")
150 assert resp.get_data() == b'[]\n'
153 def test_flask_get_one(mock_flask_uow):
154 session, app = mock_flask_uow
156 session.return_value.query.return_value.filter_by.return_value.\
157 first.return_value = None
158 apibase = config.get_o2ims_monitoring_api_base() + '/v1'
160 with app.test_client() as client:
161 # Get one and return 404
162 ###########################
163 alarm_id1 = str(uuid.uuid4())
164 resp = client.get(apibase+"/alarms/"+alarm_id1)
165 assert resp.status_code == 404
167 sub_id1 = str(uuid.uuid4())
168 resp = client.get(apibase+"/alarmSubscriptions/"+sub_id1)
169 assert resp.status_code == 404
172 def test_flask_post(mock_flask_uow):
173 session, app = mock_flask_uow
174 apibase = config.get_o2ims_monitoring_api_base() + '/v1'
176 with app.test_client() as client:
177 session.return_value.execute.return_value = []
179 sub_callback = 'http://subscription/callback/url'
180 resp = client.post(apibase+'/alarmSubscriptions', json={
181 'callback': sub_callback,
182 'consumerSubscriptionId': 'consumerSubId1',
185 assert resp.status_code == 201
186 assert 'alarmSubscriptionId' in resp.get_json()
189 def test_flask_delete(mock_flask_uow):
190 session, app = mock_flask_uow
191 apibase = config.get_o2ims_monitoring_api_base() + '/v1'
193 with app.test_client() as client:
194 session.return_value.execute.return_value.first.return_value = {}
196 subscription_id1 = str(uuid.uuid4())
197 resp = client.delete(apibase+"/alarmSubscriptions/"+subscription_id1)
198 assert resp.status_code == 204
201 def test_flask_not_allowed(mock_flask_uow):
202 _, app = mock_flask_uow
203 apibase = config.get_o2ims_monitoring_api_base() + '/v1'
205 with app.test_client() as client:
206 # Testing resource type not support method
207 ##########################
208 uri = apibase + "/alarms"
209 resp = client.post(uri)
210 assert resp.status == '405 METHOD NOT ALLOWED'
211 resp = client.put(uri)
212 assert resp.status == '405 METHOD NOT ALLOWED'
213 resp = client.patch(uri)
214 assert resp.status == '405 METHOD NOT ALLOWED'
215 resp = client.delete(uri)
216 assert resp.status == '405 METHOD NOT ALLOWED'
219 class FakeAlarmClient(BaseClient):
222 fakeAlarm = alarm_obj.FaultGenericModel(ResourceTypeEnum.OCLOUD)
223 fakeAlarm.id = str(uuid.uuid4())
224 fakeAlarm.name = 'alarm'
225 fakeAlarm.content = json.dumps({})
226 fakeAlarm.createtime = datetime.now()
227 fakeAlarm.updatetime = datetime.now()
228 fakeAlarm.hash = str(hash((fakeAlarm.id, fakeAlarm.updatetime)))
229 self.fakeAlarm = fakeAlarm
231 def _get(self, id) -> alarm_obj.FaultGenericModel:
232 return self.fakeAlarm
235 return [self.fakeAlarm]
237 def _set_stx_client(self):
241 # class FakeStxObjRepo(StxObjectRepository):
242 # def __init__(self):
246 # def _add(self, alarm: alarm_obj.AlarmEventRecord):
247 # self.alarms.append(alarm)
249 # def _get(self, alarmid) -> alarm_obj.AlarmEventRecord:
250 # filtered = [a for a in self.alarms if a.id == alarmid]
251 # return filtered.pop()
253 # def _list(self) -> List[alarm_obj.AlarmEventRecord]:
254 # return [x for x in self.oclouds]
256 # def _update(self, alarm: alarm_obj.AlarmEventRecord):
257 # filtered = [a for a in self.alarms if a.id == alarm.id]
258 # assert len(filtered) == 1
259 # ocloud1 = filtered.pop()
260 # ocloud1.update_by(alarm)
263 class FakeUnitOfWork(AbstractUnitOfWork):
264 def __init__(self, session_factory=None):
265 self.session_factory = session_factory
268 self.session = self.session_factory
269 # self.stxobjects = FakeStxObjRepo()
270 return super().__enter__()
272 def __exit__(self, *args):
273 super().__exit__(*args)
274 # self.session.close()
278 # self.session.commit()
282 # self.session.rollback()
284 def collect_new_events(self):
286 # return super().collect_new_events()
289 def create_alarm_fake_bus(uow):
291 cmd: commands.UpdateAlarm,
292 uow: AbstractUnitOfWork,
296 handlers.EVENT_HANDLERS = {}
297 handlers.COMMAND_HANDLERS = {
298 commands.UpdateAlarm: update_alarm,
300 bus = bootstrap.bootstrap(False, uow)
304 def test_probe_new_alarm():
305 session = MagicMock()
306 session.return_value.execute.return_value = []
307 fakeuow = FakeUnitOfWork(session)
308 bus = create_alarm_fake_bus(fakeuow)
309 fakeClient = FakeAlarmClient()
310 alarmwatcher = AlarmWatcher(fakeClient, bus)
311 cmds = alarmwatcher.probe()
312 assert cmds is not None
313 assert len(cmds) == 1
314 assert cmds[0].data.name == "alarm"
315 # assert len(fakeuow.stxobjects.oclouds) == 1
316 # assert fakeuow.stxobjects.oclouds[0].name == "stx1"
319 def test_watchers_worker():
320 testedworker = worker.PollWorker()
322 class FakeAlarmWatcher(BaseWatcher):
323 def __init__(self, client: BaseClient,
324 bus: messagebus) -> None:
325 super().__init__(client, None)
326 self.fakeOcloudWatcherCounter = 0
327 self._client = client
330 def _targetname(self):
331 return "fakealarmwatcher"
333 def _probe(self, parent: object = None, tags=None):
334 # import pdb; pdb.set_trace()
335 self.fakeOcloudWatcherCounter += 1
336 # hacking to stop the blocking sched task
337 if self.fakeOcloudWatcherCounter > 2:
341 # fakeRepo = FakeOcloudRepo()
342 fakeuow = FakeUnitOfWork()
343 bus = create_alarm_fake_bus(fakeuow)
345 fakeClient = FakeAlarmClient()
346 fakewatcher = FakeAlarmWatcher(fakeClient, bus)
348 root = WatcherTree(fakewatcher)
350 testedworker.set_interval(1)
351 testedworker.add_watcher(root)
352 assert fakewatcher.fakeOcloudWatcherCounter == 0
354 count1 = fakewatcher.fakeOcloudWatcherCounter
357 assert fakewatcher.fakeOcloudWatcherCounter > count1
359 # assumed hacking: probe has stopped the sched task
360 count3 = fakewatcher.fakeOcloudWatcherCounter
362 assert fakewatcher.fakeOcloudWatcherCounter == count3