1 # ==================================================================================
2 # Copyright (c) 2019 Nokia
3 # Copyright (c) 2018-2019 AT&T Intellectual Property.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ==================================================================================
32 MRC_SEND = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, 0x00)
33 while rmr.rmr_ready(MRC_SEND) == 0:
37 MRC_RCV = rmr.rmr_init(b"4563", rmr.RMR_MAX_RCV_BYTES, 0x00)
38 while rmr.rmr_ready(MRC_RCV) == 0:
42 def teardown_module():
44 test rmr module teardown
46 rmr.rmr_close(MRC_SEND)
47 rmr.rmr_close(MRC_RCV)
50 def _assert_new_sbuf(sbuf):
52 verify the initial state of an alloced message is what we expect
54 summary = rmr.message_summary(sbuf)
55 assert summary["payload"] == b""
56 assert summary["payload length"] == 0
57 assert summary["transaction id"] == b""
58 assert summary["message state"] == 0
59 assert summary["message status"] == "RMR_OK"
60 assert summary["meid"] == ""
61 assert summary["errno"] == 0
64 def test_get_constants(expected_constants):
66 test getting constants
68 assert rmr._get_constants() == expected_constants
71 def test_get_mapping_dict(expected_states):
73 test getting mapping string
75 assert rmr._get_mapping_dict() == expected_states
76 assert rmr._state_to_status(0) == "RMR_OK"
77 assert rmr._state_to_status(12) == "RMR_ERR_TIMEOUT"
78 assert rmr._state_to_status(666) == "UNKNOWN STATE"
83 test meid stringification
85 sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
87 rmr.rmr_set_meid(sbuf, b"\x01\x02", 2)
88 assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == "\x01\x02"
89 assert len(rmr.rmr_get_meid(sbuf)) == 2
91 rmr.rmr_set_meid(sbuf, b"\x00" * 32, 32)
92 assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == "" # NULL bytes get truncated
94 rmr.rmr_set_meid(sbuf, b"6" * 32, 32)
95 assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == "6" * 32 # string in string out
97 rmr.rmr_set_meid(sbuf, b"\x01\x02", 2)
99 rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == "\x01\x02" + "6" * 30
100 ) # bytes in string out, 6s left over
101 assert len(rmr.rmr_get_meid(sbuf)) == 32
104 def test_rmr_set_get():
108 sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
109 _assert_new_sbuf(sbuf)
112 pay = b"\x01\x00\x80"
113 rmr.set_payload_and_length(pay, sbuf)
114 summary = rmr.message_summary(sbuf)
115 assert summary["payload"] == pay
116 assert summary["payload length"] == 3
118 # test transid (note we cant test payload because it's randomly gen)
119 assert summary["transaction id"] == b""
120 assert len(summary["transaction id"]) == 0
121 rmr.generate_and_set_transaction_id(sbuf)
122 summary = rmr.message_summary(sbuf)
123 assert summary["transaction id"] != b""
124 assert len(summary["transaction id"]) == 32
127 assert rmr.rmr_get_meid(sbuf) == summary["meid"] == ""
128 rmr.rmr_set_meid(sbuf, b"666\x01\x00\x01", 6)
129 summary = rmr.message_summary(sbuf)
130 assert rmr.rmr_get_meid(sbuf) == summary["meid"] == "666\x01"
131 assert (len(summary["meid"])) == 4
134 def test_rcv_timeout():
136 test torcv; this is a scary test because if it fails... it doesn't fail, it will run forever!
137 We recieve a message (though nothing has been sent) and make sure the function doesn't block forever.
139 There is no unit test for rmr_rcv_msg; too dangerous, that is a blocking call that may never return.
141 sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
142 sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 50) # should time out after 50ms
143 summary = rmr.message_summary(sbuf_rcv)
144 assert summary["message state"] == 12
145 assert summary["message status"] == "RMR_ERR_TIMEOUT"
150 test send and receive
152 pay = b"\x01\x00\x80"
155 sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
156 _assert_new_sbuf(sbuf_send)
157 rmr.set_payload_and_length(pay, sbuf_send)
158 sbuf_send.contents.mtype = 0
159 sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
160 send_summary = rmr.message_summary(sbuf_send)
162 # receive it in other context
163 sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
164 sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 2000)
165 rcv_summary = rmr.message_summary(sbuf_rcv)
166 assert rcv_summary["payload"] == pay
167 assert rcv_summary["message type"] == 0
168 assert send_summary["message state"] == rcv_summary["message state"] == 0
169 assert send_summary["message status"] == rcv_summary["message status"] == "RMR_OK"
172 ack_pay = b"message recieved"
173 rmr.set_payload_and_length(ack_pay, sbuf_rcv)
174 sbuf_rcv = rmr.rmr_rts_msg(MRC_RCV, sbuf_rcv)
175 rcv_ack_summary = rmr.message_summary(sbuf_rcv)
177 # have the sender recieve it
178 sbuf_send = rmr.rmr_torcv_msg(MRC_SEND, sbuf_send, 2000)
179 send_ack_summary = rmr.message_summary(sbuf_send)
181 assert send_ack_summary["payload"] == ack_pay
182 assert send_ack_summary["message state"] == rcv_ack_summary["message state"] == 0
183 assert send_ack_summary["message status"] == rcv_ack_summary["message status"] == "RMR_OK"