1 # Copyright (C) 2021 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
18 from datetime import datetime
19 from unittest.mock import MagicMock
20 from typing import Callable
22 from o2common.service.watcher import worker
23 from o2common.service.unit_of_work import AbstractUnitOfWork
24 from o2common.service.client.base_client import BaseClient
25 from o2common.service.watcher.base import BaseWatcher, WatcherTree
26 from o2common.service import messagebus
27 from o2common.config import config
29 from o2ims.domain.resource_type import ResourceTypeEnum
30 from o2ims.domain import alarm_obj
31 from o2ims.domain import commands
32 from o2ims.views import alarm_view
33 from o2ims.service.watcher.alarm_watcher import AlarmWatcher
35 from o2app.service import handlers
36 from o2app import bootstrap
39 def test_new_alarm_event_record():
40 alarm_event_record_id1 = str(uuid.uuid4())
41 alarm_event_record = alarm_obj.AlarmEventRecord(
42 alarm_event_record_id1, '',
43 '', '', '', alarm_obj.PerceivedSeverityEnum.CRITICAL)
44 assert alarm_event_record_id1 is not None and \
45 alarm_event_record.alarmEventRecordId == alarm_event_record_id1
48 def test_view_alarm_event_records(mock_uow):
49 session, uow = mock_uow
51 alarm_event_record_id1 = str(uuid.uuid4())
52 alarm_event_record1 = MagicMock()
53 alarm_event_record1.serialize.return_value = {
54 "alarmEventRecordId": alarm_event_record_id1}
55 session.return_value.query.return_value = [alarm_event_record1]
57 alarm_event_record_list = alarm_view.alarm_event_records(uow)
58 assert str(alarm_event_record_list[0].get(
59 "alarmEventRecordId")) == alarm_event_record_id1
62 def test_view_alarm_event_record_one(mock_uow):
63 session, uow = mock_uow
65 alarm_event_record_id1 = str(uuid.uuid4())
66 session.return_value.query.return_value.filter_by.return_value.first.\
67 return_value.serialize.return_value = None
70 alarm_event_record1 = alarm_view.alarm_event_record_one(
71 alarm_event_record_id1, uow)
72 assert alarm_event_record1 is None
74 session.return_value.query.return_value.filter_by.return_value.first.\
75 return_value.serialize.return_value = {
76 "alarmEventRecordId": alarm_event_record_id1}
78 alarm_event_record1 = alarm_view.alarm_event_record_one(
79 alarm_event_record_id1, uow)
80 assert str(alarm_event_record1.get(
81 "alarmEventRecordId")) == alarm_event_record_id1
84 def test_alarm_dictionary(mock_uow):
85 session, uow = mock_uow
86 alarm_dict1 = alarm_obj.AlarmDictionary('test1')
87 alarm_dict1.entityType = 'test1'
89 uow.alarm_dictionaries.add(alarm_dict1)
91 alarm_dict2 = uow.alarm_dictionaries.get('test1')
92 assert alarm_dict1 == alarm_dict2
94 dict_list = uow.alarm_dictionaries.list()
95 assert len(dict_list) > 0
98 def test_flask_get_list(mock_flask_uow):
99 session, app = mock_flask_uow
100 session.query.return_value = []
101 apibase = config.get_o2ims_monitoring_api_base()
103 with app.test_client() as client:
104 # Get list and return empty list
105 ##########################
106 resp = client.get(apibase+"/alarms")
107 assert resp.get_data() == b'[]\n'
109 resp = client.get(apibase+"/alarmSubscriptions")
110 assert resp.get_data() == b'[]\n'
113 def test_flask_get_one(mock_flask_uow):
114 session, app = mock_flask_uow
116 session.return_value.query.return_value.filter_by.return_value.\
117 first.return_value = None
118 apibase = config.get_o2ims_monitoring_api_base()
120 with app.test_client() as client:
121 # Get one and return 404
122 ###########################
123 alarm_id1 = str(uuid.uuid4())
124 resp = client.get(apibase+"/alarms/"+alarm_id1)
125 assert resp.status_code == 404
127 sub_id1 = str(uuid.uuid4())
128 resp = client.get(apibase+"/alarmSubscriptions/"+sub_id1)
129 assert resp.status_code == 404
132 def test_flask_post(mock_flask_uow):
133 session, app = mock_flask_uow
134 apibase = config.get_o2ims_monitoring_api_base()
136 with app.test_client() as client:
137 session.return_value.execute.return_value = []
139 sub_callback = 'http://subscription/callback/url'
140 resp = client.post(apibase+'/alarmSubscriptions', json={
141 'callback': sub_callback,
142 'consumerSubscriptionId': 'consumerSubId1',
145 assert resp.status_code == 201
146 assert 'alarmSubscriptionId' in resp.get_json()
149 def test_flask_delete(mock_flask_uow):
150 session, app = mock_flask_uow
151 apibase = config.get_o2ims_monitoring_api_base()
153 with app.test_client() as client:
154 session.return_value.execute.return_value.first.return_value = {}
156 subscription_id1 = str(uuid.uuid4())
157 resp = client.delete(apibase+"/alarmSubscriptions/"+subscription_id1)
158 assert resp.status_code == 204
161 def test_flask_not_allowed(mock_flask_uow):
162 _, app = mock_flask_uow
163 apibase = config.get_o2ims_monitoring_api_base()
165 with app.test_client() as client:
166 # Testing resource type not support method
167 ##########################
168 uri = apibase + "/alarms"
169 resp = client.post(uri)
170 assert resp.status == '405 METHOD NOT ALLOWED'
171 resp = client.put(uri)
172 assert resp.status == '405 METHOD NOT ALLOWED'
173 resp = client.patch(uri)
174 assert resp.status == '405 METHOD NOT ALLOWED'
175 resp = client.delete(uri)
176 assert resp.status == '405 METHOD NOT ALLOWED'
179 class FakeAlarmClient(BaseClient):
182 fakeAlarm = alarm_obj.FaultGenericModel(ResourceTypeEnum.OCLOUD)
183 fakeAlarm.id = str(uuid.uuid4())
184 fakeAlarm.name = 'alarm'
185 fakeAlarm.content = json.dumps({})
186 fakeAlarm.createtime = datetime.now()
187 fakeAlarm.updatetime = datetime.now()
188 fakeAlarm.hash = str(hash((fakeAlarm.id, fakeAlarm.updatetime)))
189 self.fakeAlarm = fakeAlarm
191 def _get(self, id) -> alarm_obj.FaultGenericModel:
192 return self.fakeAlarm
195 return [self.fakeAlarm]
197 def _set_stx_client(self):
201 # class FakeStxObjRepo(StxObjectRepository):
202 # def __init__(self):
206 # def _add(self, alarm: alarm_obj.AlarmEventRecord):
207 # self.alarms.append(alarm)
209 # def _get(self, alarmid) -> alarm_obj.AlarmEventRecord:
210 # filtered = [a for a in self.alarms if a.id == alarmid]
211 # return filtered.pop()
213 # def _list(self) -> List[alarm_obj.AlarmEventRecord]:
214 # return [x for x in self.oclouds]
216 # def _update(self, alarm: alarm_obj.AlarmEventRecord):
217 # filtered = [a for a in self.alarms if a.id == alarm.id]
218 # assert len(filtered) == 1
219 # ocloud1 = filtered.pop()
220 # ocloud1.update_by(alarm)
223 class FakeUnitOfWork(AbstractUnitOfWork):
224 def __init__(self, session_factory=None):
225 self.session_factory = session_factory
228 self.session = self.session_factory
229 # self.stxobjects = FakeStxObjRepo()
230 return super().__enter__()
232 def __exit__(self, *args):
233 super().__exit__(*args)
234 # self.session.close()
238 # self.session.commit()
242 # self.session.rollback()
244 def collect_new_events(self):
246 # return super().collect_new_events()
249 def create_alarm_fake_bus(uow):
251 cmd: commands.UpdateAlarm,
252 uow: AbstractUnitOfWork,
256 handlers.EVENT_HANDLERS = {}
257 handlers.COMMAND_HANDLERS = {
258 commands.UpdateAlarm: update_alarm,
260 bus = bootstrap.bootstrap(False, uow)
264 def test_probe_new_alarm():
265 session = MagicMock()
266 session.return_value.execute.return_value = []
267 fakeuow = FakeUnitOfWork(session)
268 bus = create_alarm_fake_bus(fakeuow)
269 fakeClient = FakeAlarmClient()
270 alarmwatcher = AlarmWatcher(fakeClient, bus)
271 cmds = alarmwatcher.probe()
272 assert cmds is not None
273 assert len(cmds) == 1
274 assert cmds[0].data.name == "alarm"
275 # assert len(fakeuow.stxobjects.oclouds) == 1
276 # assert fakeuow.stxobjects.oclouds[0].name == "stx1"
279 def test_watchers_worker():
280 testedworker = worker.PollWorker()
282 class FakeAlarmWatcher(BaseWatcher):
283 def __init__(self, client: BaseClient,
284 bus: messagebus) -> None:
285 super().__init__(client, None)
286 self.fakeOcloudWatcherCounter = 0
287 self._client = client
290 def _targetname(self):
291 return "fakealarmwatcher"
293 def _probe(self, parent: object = None, tags=None):
294 # import pdb; pdb.set_trace()
295 self.fakeOcloudWatcherCounter += 1
296 # hacking to stop the blocking sched task
297 if self.fakeOcloudWatcherCounter > 2:
301 # fakeRepo = FakeOcloudRepo()
302 fakeuow = FakeUnitOfWork()
303 bus = create_alarm_fake_bus(fakeuow)
305 fakeClient = FakeAlarmClient()
306 fakewatcher = FakeAlarmWatcher(fakeClient, bus)
308 root = WatcherTree(fakewatcher)
310 testedworker.set_interval(1)
311 testedworker.add_watcher(root)
312 assert fakewatcher.fakeOcloudWatcherCounter == 0
314 count1 = fakewatcher.fakeOcloudWatcherCounter
317 assert fakewatcher.fakeOcloudWatcherCounter > count1
319 # assumed hacking: probe has stopped the sched task
320 count3 = fakewatcher.fakeOcloudWatcherCounter
322 assert fakewatcher.fakeOcloudWatcherCounter == count3