1 # vim: ts=4 sw=4 expandtab:
2 # =================================================================================2
3 # Copyright (c) 2019 Nokia
4 # Copyright (c) 2018-2019 AT&T Intellectual Property.
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ==================================================================================
20 from rmr import rmr, helpers, exceptions
33 MRC_SEND = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, 0x00)
34 while rmr.rmr_ready(MRC_SEND) == 0:
38 MRC_RCV = rmr.rmr_init(b"4563", rmr.RMR_MAX_RCV_BYTES, 0x00)
39 while rmr.rmr_ready(MRC_RCV) == 0:
43 MRC_BUF_RCV = rmr.rmr_init(b"4564", rmr.RMR_MAX_RCV_BYTES, 0x02)
44 while rmr.rmr_ready(MRC_RCV) == 0:
48 def teardown_module():
50 test rmr module teardown
52 rmr.rmr_close(MRC_SEND)
53 rmr.rmr_close(MRC_RCV)
56 def _assert_new_sbuf(sbuf):
58 verify the initial state of an alloced message is what we expect
60 summary = rmr.message_summary(sbuf)
61 assert summary["payload"] == b""
62 assert summary["payload length"] == 0
63 assert summary["transaction id"] == b""
64 assert summary["message state"] == 0
65 assert summary["message status"] == "RMR_OK"
66 assert summary["meid"] == b""
67 assert summary["errno"] == 0
70 def test_get_constants(expected_constants):
72 test getting constants. We don't care what values are returned as those
73 should be meaningful only to RMR. We do care that all of the constants
74 which are defined in expected_contents are returned. Further, we don't
75 consider it to be an error if the returned list has more constants than
78 To avoid frustration, this should list all missing keys, not fail on the
82 econst = expected_constants
83 rconst = rmr._get_constants()
84 for key in econst: # test all expected constants
85 if key not in rconst: # expected value not listed by rmr
87 print("did not find required constant in list from RMR: %s" % key)
92 def test_get_mapping_dict(expected_states):
94 test getting mapping string
96 assert rmr._get_mapping_dict() == expected_states
97 assert rmr._state_to_status(0) == "RMR_OK"
98 assert rmr._state_to_status(12) == "RMR_ERR_TIMEOUT"
99 assert rmr._state_to_status(666) == "UNKNOWN STATE"
104 test meid stringification
106 sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
108 rmr.rmr_set_meid(sbuf, b"\x01\x02")
109 assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"\x01\x02"
110 assert len(rmr.rmr_get_meid(sbuf)) == 2
112 rmr.rmr_set_meid(sbuf, b"\x00" * 32)
113 assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"" # NULL bytes get truncated
115 rmr.rmr_set_meid(sbuf, b"6" * 32)
116 assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"6" * 32 # string in string out
118 rmr.rmr_set_meid(sbuf, b"\x01\x02")
120 rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"\x01\x02" + b"6" * 30
121 ) # bytes in string out, 6s left over
122 assert len(rmr.rmr_get_meid(sbuf)) == 32
125 def test_rmr_set_get():
129 sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
130 _assert_new_sbuf(sbuf)
133 pay = b"\x01\x00\x80"
134 rmr.set_payload_and_length(pay, sbuf)
135 summary = rmr.message_summary(sbuf)
136 assert summary["payload"] == pay
137 assert summary["payload length"] == 3
139 # test transid (note we cant test payload because it's randomly gen)
140 assert summary["transaction id"] == b""
141 assert len(summary["transaction id"]) == 0
142 rmr.generate_and_set_transaction_id(sbuf)
143 summary = rmr.message_summary(sbuf)
144 assert summary["transaction id"] != b""
145 assert len(summary["transaction id"]) == 32
148 assert rmr.rmr_get_meid(sbuf) == summary["meid"] == b""
149 rmr.rmr_set_meid(sbuf, b"666\x01\x00\x01")
150 summary = rmr.message_summary(sbuf)
151 assert rmr.rmr_get_meid(sbuf) == summary["meid"] == b"666\x01"
152 assert (len(summary["meid"])) == 4
155 def test_rcv_timeout():
157 test torcv; this is a scary test because if it fails... it doesn't fail, it will run forever!
158 We receive a message (though nothing has been sent) and make sure the function doesn't block forever.
160 There is no unit test for rmr_rcv_msg; too dangerous, that is a blocking call that may never return.
162 sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
163 sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 50) # should time out after 50ms
164 summary = rmr.message_summary(sbuf_rcv)
165 assert summary["message state"] == 12
166 assert summary["message status"] == "RMR_ERR_TIMEOUT"
171 test send and receive
173 pay = b"\x01\x00\x80"
176 sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
177 _assert_new_sbuf(sbuf_send)
178 rmr.set_payload_and_length(pay, sbuf_send)
179 sbuf_send.contents.mtype = 0
180 sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
181 send_summary = rmr.message_summary(sbuf_send)
182 assert send_summary["message state"] == 0 # if send fails don't attempt receive
183 assert send_summary["message status"] == "RMR_OK"
186 # receive it in other context
187 sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
188 sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 2000)
189 rcv_summary = rmr.message_summary(sbuf_rcv)
190 assert rcv_summary["message state"] == 0
191 assert rcv_summary["message status"] == "RMR_OK"
192 assert rcv_summary["message type"] == 0
193 assert rcv_summary["payload"] == pay
196 ack_pay = b"message received"
197 rmr.set_payload_and_length(ack_pay, sbuf_rcv)
198 sbuf_rcv = rmr.rmr_rts_msg(MRC_RCV, sbuf_rcv)
199 rcv_ack_summary = rmr.message_summary(sbuf_rcv)
201 # have the sender receive it
202 sbuf_send = rmr.rmr_torcv_msg(MRC_SEND, sbuf_send, 2000)
203 send_ack_summary = rmr.message_summary(sbuf_send)
205 assert send_ack_summary["message state"] == rcv_ack_summary["message state"] == 0
206 assert send_ack_summary["message status"] == rcv_ack_summary["message status"] == "RMR_OK"
207 assert send_ack_summary["payload"] == ack_pay
210 def send_burst(mrc, fmt, mtype=1, num=13, counter=0):
212 Internal function to support test_rcv_all.
213 Send a burst of messages optionally giving the type, payload
216 sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, SIZE) # seed message buffer
219 payload = bytes(fmt % counter, "UTF-8")
222 rmr.set_payload_and_length(payload, sbuf_send)
223 sbuf_send.contents.mtype = mtype
226 while max_retries > 0:
227 sbuf_send = rmr.rmr_send_msg(mrc, sbuf_send)
228 ms = rmr.message_summary(sbuf_send)
229 if ms["message state"] != 10: # 10 is retry
234 assert ms["message state"] == 0
235 assert max_retries > 0
240 test the ability to receive a batch of queued messages.
242 pay_fmt = "send to ring msg: %d" # dynamic message format with counter
244 send_burst(MRC_SEND, pay_fmt) # send a bunch of 13 messages that should queue
245 time.sleep(1) # ensure underlying transport gets cycles to send/receive
247 bundle = helpers.rmr_rcvall_msgs(MRC_BUF_RCV) # use the buffered receiver to read all with a single call
248 assert len(bundle) == 13
250 for i in range(len(bundle)):
251 ms = bundle[i] # validate each summary returned, and ordering preserved
252 assert ms["message state"] == 0
253 expected_pay = bytes(pay_fmt % i, "UTF-8")
254 assert ms["payload"] == expected_pay
256 send_burst(MRC_SEND, pay_fmt, mtype=1, num=10) # send a second round with msg types 1 and 2 to test filter
257 send_burst(MRC_SEND, pay_fmt, mtype=2, num=8)
258 send_burst(MRC_SEND, pay_fmt, mtype=1, num=5)
259 send_burst(MRC_SEND, pay_fmt, mtype=2, num=4, counter=8) # total of 12 messages with type 2 should be queued
260 time.sleep(1) # ensure underlying transport gets cycles to send/receive
262 bundle = helpers.rmr_rcvall_msgs(MRC_BUF_RCV, [2]) # receive only message type 2 messages
263 assert len(bundle) == 12 # we should only get the second batch of 12 messages
265 for i in range(len(bundle)):
266 ms = bundle[i] # validate each summary
267 assert ms["message state"] == 0 # all should be OK
268 assert ms["message type"] == 2 # only mtype 2 should have been received
269 expected_pay = bytes(pay_fmt % i, "UTF-8") # ordering should still jive with the counter
270 assert ms["payload"] == expected_pay
273 def test_bad_buffer():
274 """test that we get a proper exception when the buffer has a null pointer"""
275 with pytest.raises(exceptions.BadBufferAllocation):
276 rmr.rmr_alloc_msg(None, 4096)
279 def test_alloc_fancy():
280 """test allocation with setting payload, trans, mtype"""
281 pay = b"yoo\x01\x00\x80"
282 sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE, payload=pay, gen_transaction_id=True, mtype=14, meid=b"asdf")
283 summary = rmr.message_summary(sbuf)
284 assert summary["payload"] == pay
285 assert summary["payload length"] == 6
286 assert summary["transaction id"] != b"" # hard to test what it will be, but make sure not empty
287 assert summary["message state"] == 0
288 assert summary["message type"] == 14
289 assert summary["meid"] == b"asdf"
292 def test_resize_payload():
293 """test the ability to insert a larger payload into an existing message"""
297 mbuf = rmr.rmr_alloc_msg(MRC_SEND, 25) # allocate buffer with small payload
298 mbuf.contents.mtype = mtype # type and sub-id should not change
299 mbuf.contents.sub_id = subid
301 long_payload = b"This is a long payload that should force the message buffer to be reallocated"
302 rmr.set_payload_and_length(long_payload, mbuf)
303 summary = rmr.message_summary(mbuf)
304 assert summary["payload max size"] >= len(long_payload) # RMR may allocate a larger payload space
305 assert summary["payload length"] == len(long_payload) # however, the length must be exactly the same
306 assert summary["message type"] == mtype # both mtype and sub-id should be preserved in new
307 assert summary["subscription id"] == subid