0fad272c7cf8c8a965e6526983663c036081853a
[ric-plt/xapp-frame-py.git] / tests / test_alarm.py
1 # =================================================================================2
2 #       Copyright (c) 2020 AT&T Intellectual Property.
3 #       Copyright (c) 2020 Nokia
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 # ==================================================================================
17 import json
18 import time
19 from ricxappframe.alarm import alarm
20 from ricxappframe.alarm.alarm import AlarmAction, AlarmDetail, AlarmManager, AlarmSeverity
21 from ricxappframe.rmr import rmr
22
23 MRC_SEND = None
24 MRC_RCV = None
25 SIZE = 256
26
27
28 def setup_module():
29     """
30     test alarm module setup
31     """
32     global MRC_SEND
33     MRC_SEND = rmr.rmr_init(b"4566", rmr.RMR_MAX_RCV_BYTES, 0x00)
34     while rmr.rmr_ready(MRC_SEND) == 0:
35         time.sleep(1)
36
37     global MRC_RCV
38     MRC_RCV = rmr.rmr_init(b"4567", rmr.RMR_MAX_RCV_BYTES, 0x00)
39     while rmr.rmr_ready(MRC_RCV) == 0:
40         time.sleep(1)
41
42
43 def teardown_module():
44     """
45     test alarm module teardown
46     """
47     rmr.rmr_close(MRC_SEND)
48
49
50 def test_alarm_set_get():
51     """
52     test set functions
53     """
54     act = AlarmAction.RAISE
55     assert act is not None
56
57     sev = AlarmSeverity.CRITICAL
58     assert sev is not None
59
60     det = AlarmDetail("1", "2", 3, AlarmSeverity.MINOR, "4", "5")
61     assert det[alarm.KEY_MANAGED_OBJECT_ID] == "1"
62     assert det[alarm.KEY_APPLICATION_ID] == "2"
63     assert det[alarm.KEY_SPECIFIC_PROBLEM] == 3
64     assert det[alarm.KEY_PERCEIVED_SEVERITY] == AlarmSeverity.MINOR.name
65     assert det[alarm.KEY_IDENTIFYING_INFO] == "4"
66     assert det[alarm.KEY_ADDITIONAL_INFO] == "5"
67
68     mgr = alarm.AlarmManager(MRC_SEND, "moid2", "appid2")
69     assert mgr is not None
70     assert mgr.managed_object_id == "moid2"
71     assert mgr.application_id == "appid2"
72
73
74 def _receive_alarm_msg(action: AlarmAction):
75     """
76     delays briefly, receives a message, checks the message type and action
77     """
78     time.sleep(0.5)
79     sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
80     sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 2000)
81     rcv_summary = rmr.message_summary(sbuf_rcv)
82     assert rcv_summary[rmr.RMR_MS_MSG_STATE] == rmr.RMR_OK
83     assert rcv_summary[rmr.RMR_MS_MSG_TYPE] == alarm.RIC_ALARM_UPDATE
84     # parse JSON
85     data = json.loads(rcv_summary[rmr.RMR_MS_PAYLOAD].decode())
86     assert data[alarm.KEY_ALARM_ACTION] == action.name
87
88
89 def test_alarm_manager():
90     """
91     test send functions and ensure a message arrives
92     """
93     mgr = AlarmManager(MRC_SEND, "moid", "appid")
94     assert mgr is not None
95
96     det = mgr.create_alarm(3, AlarmSeverity.DEFAULT, "identifying", "additional")
97     assert det is not None
98
99     success = mgr.raise_alarm(det)
100     assert success
101     _receive_alarm_msg(AlarmAction.RAISE)
102
103     success = mgr.clear_alarm(det)
104     assert success
105     _receive_alarm_msg(AlarmAction.CLEAR)
106
107     success = mgr.reraise_alarm(det)
108     assert success
109     _receive_alarm_msg(AlarmAction.CLEAR)
110     _receive_alarm_msg(AlarmAction.RAISE)
111
112     success = mgr.clear_all_alarms()
113     assert success
114     _receive_alarm_msg(AlarmAction.CLEARALL)