1 # =================================================================================2
2 # Copyright (c) 2020 AT&T Intellectual Property.
3 # Copyright (c) 2020 Nokia
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ==================================================================================
20 from ricxappframe.alarm import alarm
21 from ricxappframe.alarm.alarm import AlarmAction, AlarmDetail, AlarmManager, AlarmSeverity, ALARM_MGR_SERVICE_NAME_ENV, ALARM_MGR_SERVICE_PORT_ENV
22 from ricxappframe.alarm.exceptions import InitFailed
23 from ricxappframe.rmr import rmr
32 test alarm module setup
35 MRC_SEND = rmr.rmr_init(b"4566", rmr.RMR_MAX_RCV_BYTES, 0x00)
36 while rmr.rmr_ready(MRC_SEND) == 0:
40 MRC_RCV = rmr.rmr_init(b"4567", rmr.RMR_MAX_RCV_BYTES, 0x00)
41 while rmr.rmr_ready(MRC_RCV) == 0:
45 def teardown_module():
47 test alarm module teardown
49 rmr.rmr_close(MRC_SEND)
50 rmr.rmr_close(MRC_RCV)
53 def test_alarm_set_get(monkeypatch):
57 act = AlarmAction.RAISE
58 assert act is not None
60 sev = AlarmSeverity.CRITICAL
61 assert sev is not None
63 det = AlarmDetail("1", "2", 3, AlarmSeverity.MINOR, "4", "5")
64 assert det[alarm.KEY_MANAGED_OBJECT_ID] == "1"
65 assert det[alarm.KEY_APPLICATION_ID] == "2"
66 assert det[alarm.KEY_SPECIFIC_PROBLEM] == 3
67 assert det[alarm.KEY_PERCEIVED_SEVERITY] == AlarmSeverity.MINOR.name
68 assert det[alarm.KEY_IDENTIFYING_INFO] == "4"
69 assert det[alarm.KEY_ADDITIONAL_INFO] == "5"
71 # missing environment variables
72 with pytest.raises(InitFailed):
73 alarm.AlarmManager(MRC_SEND, "missing", "envvars")
75 # invalid environment variables
76 monkeypatch.setenv(ALARM_MGR_SERVICE_NAME_ENV, "0")
77 monkeypatch.setenv(ALARM_MGR_SERVICE_PORT_ENV, "a")
78 with pytest.raises(InitFailed):
79 alarm.AlarmManager(MRC_SEND, "bogus", "envvars")
81 # good environment variables
82 monkeypatch.setenv(ALARM_MGR_SERVICE_NAME_ENV, "127.0.0.1") # do NOT use localhost
83 monkeypatch.setenv(ALARM_MGR_SERVICE_PORT_ENV, "4567") # any int is ok here
84 mgr = alarm.AlarmManager(MRC_SEND, "moid2", "appid2")
85 assert mgr is not None
86 assert mgr.managed_object_id == "moid2"
87 assert mgr.application_id == "appid2"
90 def _receive_alarm_msg(action: AlarmAction):
92 delays briefly, receives a message, checks the message type and action
95 sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
96 sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 2000)
97 rcv_summary = rmr.message_summary(sbuf_rcv)
98 assert rcv_summary[rmr.RMR_MS_MSG_STATE] == rmr.RMR_OK
99 assert rcv_summary[rmr.RMR_MS_MSG_TYPE] == alarm.RIC_ALARM_UPDATE
101 data = json.loads(rcv_summary[rmr.RMR_MS_PAYLOAD].decode())
102 assert data[alarm.KEY_ALARM_ACTION] == action.name
105 def test_alarm_manager(monkeypatch):
107 test send functions and ensure a message arrives
109 monkeypatch.setenv(ALARM_MGR_SERVICE_NAME_ENV, "127.0.0.1") # do NOT use localhost
110 monkeypatch.setenv(ALARM_MGR_SERVICE_PORT_ENV, "4567") # must match rcv port above
111 mgr = AlarmManager(MRC_SEND, "moid", "appid")
112 assert mgr is not None
114 det = mgr.create_alarm(3, AlarmSeverity.DEFAULT, "identifying", "additional")
115 assert det is not None
117 success = mgr.raise_alarm(det)
119 _receive_alarm_msg(AlarmAction.RAISE)
121 success = mgr.clear_alarm(det)
123 _receive_alarm_msg(AlarmAction.CLEAR)
125 success = mgr.reraise_alarm(det)
127 _receive_alarm_msg(AlarmAction.CLEAR)
128 _receive_alarm_msg(AlarmAction.RAISE)
130 success = mgr.clear_all_alarms()
132 _receive_alarm_msg(AlarmAction.CLEARALL)