Add helpers module to python wrapper
[ric-plt/lib/rmr.git] / src / bindings / rmr-python / tests / test_rmr.py
1 # vim: ts=4 sw=4 expandtab:
2 # =================================================================================2
3 #       Copyright (c) 2019 Nokia
4 #       Copyright (c) 2018-2019 AT&T Intellectual Property.
5 #
6 #   Licensed under the Apache License, Version 2.0 (the "License");
7 #   you may not use this file except in compliance with the License.
8 #   You may obtain a copy of the License at
9 #
10 #          http://www.apache.org/licenses/LICENSE-2.0
11 #
12 #   Unless required by applicable law or agreed to in writing, software
13 #   distributed under the License is distributed on an "AS IS" BASIS,
14 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 #   See the License for the specific language governing permissions and
16 #   limitations under the License.
17 # ==================================================================================
18 import time
19 import pytest
20 from rmr import rmr
21
22
23 SIZE = 256
24 MRC_SEND = None
25 MRC_RCV = None
26
27
28 def setup_module():
29     """
30     test_rmr module setup
31     """
32     global MRC_SEND
33     MRC_SEND = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, 0x00)
34     while rmr.rmr_ready(MRC_SEND) == 0:
35         time.sleep(1)
36
37     global MRC_RCV
38     MRC_RCV = rmr.rmr_init(b"4563", rmr.RMR_MAX_RCV_BYTES, 0x00)
39     while rmr.rmr_ready(MRC_RCV) == 0:
40         time.sleep(1)
41
42
43 def teardown_module():
44     """
45     test rmr module teardown
46     """
47     rmr.rmr_close(MRC_SEND)
48     rmr.rmr_close(MRC_RCV)
49
50
51 def _assert_new_sbuf(sbuf):
52     """
53     verify the initial state of an alloced message is what we expect
54     """
55     summary = rmr.message_summary(sbuf)
56     assert summary["payload"] == b""
57     assert summary["payload length"] == 0
58     assert summary["transaction id"] == b""
59     assert summary["message state"] == 0
60     assert summary["message status"] == "RMR_OK"
61     assert summary["meid"] == ""
62     assert summary["errno"] == 0
63
64
65 def test_get_constants(expected_constants):
66     """
67     test getting constants. We don't care what values are returned as those
68     should be meaningful only to RMR. We do care that all of the constants
69     which are defined in expected_contents are returned.  Further, we don't
70     consider it to be an error if the returned list has more constants than
71     what are in our list.
72
73     To avoid frustration, this should list all missing keys, not fail on the
74     first missing key.
75     """
76     errors = 0
77     econst = expected_constants
78     rconst = rmr._get_constants()
79     for key in econst:                   # test all expected constants
80         if key not in rconst:            # expected value not listed by rmr
81             errors += 1
82             print( "did not find required constant in list from RMR: %s" % key )
83
84     assert errors == 0
85
86
87 def test_get_mapping_dict(expected_states):
88     """
89     test getting mapping string
90     """
91     assert rmr._get_mapping_dict() == expected_states
92     assert rmr._state_to_status(0) == "RMR_OK"
93     assert rmr._state_to_status(12) == "RMR_ERR_TIMEOUT"
94     assert rmr._state_to_status(666) == "UNKNOWN STATE"
95
96
97 def test_meid():
98     """
99     test meid stringification
100     """
101     sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
102
103     rmr.rmr_set_meid(sbuf, b"\x01\x02", 2)
104     assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == "\x01\x02"
105     assert len(rmr.rmr_get_meid(sbuf)) == 2
106
107     rmr.rmr_set_meid(sbuf, b"\x00" * 32, 32)
108     assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == ""  # NULL bytes get truncated
109
110     rmr.rmr_set_meid(sbuf, b"6" * 32, 32)
111     assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == "6" * 32  # string in string out
112
113     rmr.rmr_set_meid(sbuf, b"\x01\x02", 2)
114     assert (
115         rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == "\x01\x02" + "6" * 30
116     )  # bytes in string out, 6s left over
117     assert len(rmr.rmr_get_meid(sbuf)) == 32
118
119
120 def test_rmr_set_get():
121     """
122     test set functions
123     """
124     sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
125     _assert_new_sbuf(sbuf)
126
127     # test payload
128     pay = b"\x01\x00\x80"
129     rmr.set_payload_and_length(pay, sbuf)
130     summary = rmr.message_summary(sbuf)
131     assert summary["payload"] == pay
132     assert summary["payload length"] == 3
133
134     # test transid (note we cant test payload because it's randomly gen)
135     assert summary["transaction id"] == b""
136     assert len(summary["transaction id"]) == 0
137     rmr.generate_and_set_transaction_id(sbuf)
138     summary = rmr.message_summary(sbuf)
139     assert summary["transaction id"] != b""
140     assert len(summary["transaction id"]) == 32
141
142     # test meid
143     assert rmr.rmr_get_meid(sbuf) == summary["meid"] == ""
144     rmr.rmr_set_meid(sbuf, b"666\x01\x00\x01", 6)
145     summary = rmr.message_summary(sbuf)
146     assert rmr.rmr_get_meid(sbuf) == summary["meid"] == "666\x01"
147     assert (len(summary["meid"])) == 4
148
149
150 def test_rcv_timeout():
151     """
152     test torcv; this is a scary test because if it fails... it doesn't fail, it will run forever!
153     We recieve a message (though nothing has been sent) and make sure the function doesn't block forever.
154
155     There is no unit test for rmr_rcv_msg; too dangerous, that is a blocking call that may never return.
156     """
157     sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
158     sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 50)  # should time out after 50ms
159     summary = rmr.message_summary(sbuf_rcv)
160     assert summary["message state"] == 12
161     assert summary["message status"] == "RMR_ERR_TIMEOUT"
162
163
164 def test_send_rcv():
165     """
166     test send and receive
167     """
168     pay = b"\x01\x00\x80"
169
170     # send a message
171     sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
172     _assert_new_sbuf(sbuf_send)
173     rmr.set_payload_and_length(pay, sbuf_send)
174     sbuf_send.contents.mtype = 0
175     sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
176     send_summary = rmr.message_summary(sbuf_send)
177
178     # receive it in other context
179     sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
180     sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 2000)
181     rcv_summary = rmr.message_summary(sbuf_rcv)
182     assert rcv_summary["payload"] == pay
183     assert rcv_summary["message type"] == 0
184     assert send_summary["message state"] == rcv_summary["message state"] == 0
185     assert send_summary["message status"] == rcv_summary["message status"] == "RMR_OK"
186
187     # send an ACK back
188     ack_pay = b"message recieved"
189     rmr.set_payload_and_length(ack_pay, sbuf_rcv)
190     sbuf_rcv = rmr.rmr_rts_msg(MRC_RCV, sbuf_rcv)
191     rcv_ack_summary = rmr.message_summary(sbuf_rcv)
192
193     # have the sender recieve it
194     sbuf_send = rmr.rmr_torcv_msg(MRC_SEND, sbuf_send, 2000)
195     send_ack_summary = rmr.message_summary(sbuf_send)
196
197     assert send_ack_summary["payload"] == ack_pay
198     assert send_ack_summary["message state"] == rcv_ack_summary["message state"] == 0
199     assert send_ack_summary["message status"] == rcv_ack_summary["message status"] == "RMR_OK"