1 # vim: ts=4 sw=4 expandtab:
2 # =================================================================================2
3 # Copyright (c) 2019 Nokia
4 # Copyright (c) 2018-2019 AT&T Intellectual Property.
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ==================================================================================
33 MRC_SEND = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, 0x00)
34 while rmr.rmr_ready(MRC_SEND) == 0:
38 MRC_RCV = rmr.rmr_init(b"4563", rmr.RMR_MAX_RCV_BYTES, 0x00)
39 while rmr.rmr_ready(MRC_RCV) == 0:
43 def teardown_module():
45 test rmr module teardown
47 rmr.rmr_close(MRC_SEND)
48 rmr.rmr_close(MRC_RCV)
51 def _assert_new_sbuf(sbuf):
53 verify the initial state of an alloced message is what we expect
55 summary = rmr.message_summary(sbuf)
56 assert summary["payload"] == b""
57 assert summary["payload length"] == 0
58 assert summary["transaction id"] == b""
59 assert summary["message state"] == 0
60 assert summary["message status"] == "RMR_OK"
61 assert summary["meid"] == ""
62 assert summary["errno"] == 0
65 def test_get_constants(expected_constants):
67 test getting constants. We don't care what values are returned as those
68 should be meaningful only to RMR. We do care that all of the constants
69 which are defined in expected_contents are returned. Further, we don't
70 consider it to be an error if the returned list has more constants than
73 To avoid frustration, this should list all missing keys, not fail on the
77 econst = expected_constants
78 rconst = rmr._get_constants()
79 for key in econst: # test all expected constants
80 if key not in rconst: # expected value not listed by rmr
82 print( "did not find required constant in list from RMR: %s" % key )
87 def test_get_mapping_dict(expected_states):
89 test getting mapping string
91 assert rmr._get_mapping_dict() == expected_states
92 assert rmr._state_to_status(0) == "RMR_OK"
93 assert rmr._state_to_status(12) == "RMR_ERR_TIMEOUT"
94 assert rmr._state_to_status(666) == "UNKNOWN STATE"
99 test meid stringification
101 sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
103 rmr.rmr_set_meid(sbuf, b"\x01\x02", 2)
104 assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == "\x01\x02"
105 assert len(rmr.rmr_get_meid(sbuf)) == 2
107 rmr.rmr_set_meid(sbuf, b"\x00" * 32, 32)
108 assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == "" # NULL bytes get truncated
110 rmr.rmr_set_meid(sbuf, b"6" * 32, 32)
111 assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == "6" * 32 # string in string out
113 rmr.rmr_set_meid(sbuf, b"\x01\x02", 2)
115 rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == "\x01\x02" + "6" * 30
116 ) # bytes in string out, 6s left over
117 assert len(rmr.rmr_get_meid(sbuf)) == 32
120 def test_rmr_set_get():
124 sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
125 _assert_new_sbuf(sbuf)
128 pay = b"\x01\x00\x80"
129 rmr.set_payload_and_length(pay, sbuf)
130 summary = rmr.message_summary(sbuf)
131 assert summary["payload"] == pay
132 assert summary["payload length"] == 3
134 # test transid (note we cant test payload because it's randomly gen)
135 assert summary["transaction id"] == b""
136 assert len(summary["transaction id"]) == 0
137 rmr.generate_and_set_transaction_id(sbuf)
138 summary = rmr.message_summary(sbuf)
139 assert summary["transaction id"] != b""
140 assert len(summary["transaction id"]) == 32
143 assert rmr.rmr_get_meid(sbuf) == summary["meid"] == ""
144 rmr.rmr_set_meid(sbuf, b"666\x01\x00\x01", 6)
145 summary = rmr.message_summary(sbuf)
146 assert rmr.rmr_get_meid(sbuf) == summary["meid"] == "666\x01"
147 assert (len(summary["meid"])) == 4
150 def test_rcv_timeout():
152 test torcv; this is a scary test because if it fails... it doesn't fail, it will run forever!
153 We recieve a message (though nothing has been sent) and make sure the function doesn't block forever.
155 There is no unit test for rmr_rcv_msg; too dangerous, that is a blocking call that may never return.
157 sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
158 sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 50) # should time out after 50ms
159 summary = rmr.message_summary(sbuf_rcv)
160 assert summary["message state"] == 12
161 assert summary["message status"] == "RMR_ERR_TIMEOUT"
166 test send and receive
168 pay = b"\x01\x00\x80"
171 sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
172 _assert_new_sbuf(sbuf_send)
173 rmr.set_payload_and_length(pay, sbuf_send)
174 sbuf_send.contents.mtype = 0
175 sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
176 send_summary = rmr.message_summary(sbuf_send)
178 # receive it in other context
179 sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
180 sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 2000)
181 rcv_summary = rmr.message_summary(sbuf_rcv)
182 assert rcv_summary["payload"] == pay
183 assert rcv_summary["message type"] == 0
184 assert send_summary["message state"] == rcv_summary["message state"] == 0
185 assert send_summary["message status"] == rcv_summary["message status"] == "RMR_OK"
188 ack_pay = b"message recieved"
189 rmr.set_payload_and_length(ack_pay, sbuf_rcv)
190 sbuf_rcv = rmr.rmr_rts_msg(MRC_RCV, sbuf_rcv)
191 rcv_ack_summary = rmr.message_summary(sbuf_rcv)
193 # have the sender recieve it
194 sbuf_send = rmr.rmr_torcv_msg(MRC_SEND, sbuf_send, 2000)
195 send_ack_summary = rmr.message_summary(sbuf_send)
197 assert send_ack_summary["payload"] == ack_pay
198 assert send_ack_summary["message state"] == rcv_ack_summary["message state"] == 0
199 assert send_ack_summary["message status"] == rcv_ack_summary["message status"] == "RMR_OK"