1 # vim: ts=4 sw=4 expandtab:
2 # =================================================================================2
3 # Copyright (c) 2019 Nokia
4 # Copyright (c) 2018-2019 AT&T Intellectual Property.
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ==================================================================================
20 from rmr import rmr, helpers, exceptions
33 MRC_SEND = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, 0x00)
34 while rmr.rmr_ready(MRC_SEND) == 0:
38 MRC_RCV = rmr.rmr_init(b"4563", rmr.RMR_MAX_RCV_BYTES, 0x00)
39 while rmr.rmr_ready(MRC_RCV) == 0:
43 MRC_BUF_RCV = rmr.rmr_init(b"4564", rmr.RMR_MAX_RCV_BYTES, 0x02)
44 while rmr.rmr_ready(MRC_BUF_RCV) == 0:
48 def teardown_module():
50 test rmr module teardown
52 rmr.rmr_close(MRC_SEND)
53 rmr.rmr_close(MRC_RCV)
56 def _assert_new_sbuf(sbuf):
58 verify the initial state of an alloced message is what we expect
60 summary = rmr.message_summary(sbuf)
61 assert summary["payload"] == b""
62 assert summary["payload length"] == 0
63 assert summary["subscription id"] == -1
64 assert summary["transaction id"] == b""
65 assert summary["message state"] == 0
66 assert summary["message status"] == "RMR_OK"
67 assert summary["meid"] == b""
68 assert summary["errno"] == 0
71 def test_get_constants(expected_constants):
73 test getting constants. We don't care what values are returned as those
74 should be meaningful only to RMR. We do care that all of the constants
75 which are defined in expected_contents are returned. Further, we don't
76 consider it to be an error if the returned list has more constants than
79 To avoid frustration, this should list all missing keys, not fail on the
83 econst = expected_constants
84 rconst = rmr._get_constants()
85 for key in econst: # test all expected constants
86 if key not in rconst: # expected value not listed by rmr
88 print("did not find required constant in list from RMR: %s" % key)
93 def test_get_mapping_dict(expected_states):
95 test getting mapping string
97 assert rmr._get_mapping_dict() == expected_states
98 assert rmr._state_to_status(0) == "RMR_OK"
99 assert rmr._state_to_status(12) == "RMR_ERR_TIMEOUT"
100 assert rmr._state_to_status(666) == "UNKNOWN STATE"
105 test meid stringification
107 sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
109 rmr.rmr_set_meid(sbuf, b"\x01\x02")
110 assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"\x01\x02"
111 assert len(rmr.rmr_get_meid(sbuf)) == 2
113 rmr.rmr_set_meid(sbuf, b"\x00" * 32)
114 assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"" # NULL bytes get truncated
116 rmr.rmr_set_meid(sbuf, b"6" * 32)
117 assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"6" * 32 # string in string out
119 rmr.rmr_set_meid(sbuf, b"\x01\x02")
121 rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"\x01\x02" + b"6" * 30
122 ) # bytes in string out, 6s left over
123 assert len(rmr.rmr_get_meid(sbuf)) == 32
126 def test_rmr_set_get():
130 sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
131 _assert_new_sbuf(sbuf)
134 pay = b"\x01\x00\x80"
135 rmr.set_payload_and_length(pay, sbuf)
136 summary = rmr.message_summary(sbuf)
137 assert summary["payload"] == pay
138 assert summary["payload length"] == 3
140 # test transid (note we cant test payload because it's randomly gen)
141 assert summary["transaction id"] == b""
142 assert len(summary["transaction id"]) == 0
143 rmr.generate_and_set_transaction_id(sbuf)
144 summary = rmr.message_summary(sbuf)
145 assert summary["transaction id"] != b""
146 assert len(summary["transaction id"]) == 32
149 assert rmr.rmr_get_meid(sbuf) == summary["meid"] == b""
150 rmr.rmr_set_meid(sbuf, b"666\x01\x00\x01")
151 summary = rmr.message_summary(sbuf)
152 assert rmr.rmr_get_meid(sbuf) == summary["meid"] == b"666\x01"
153 assert (len(summary["meid"])) == 4
156 def test_alloc_fancy():
157 """test allocation with setting payload, trans, mtype, subid"""
158 pay = b"yoo\x01\x00\x80"
159 sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE, payload=pay, gen_transaction_id=True, mtype=14, meid=b"asdf", sub_id=654321)
160 summary = rmr.message_summary(sbuf)
161 assert summary["payload"] == pay
162 assert summary["payload length"] == 6
163 assert summary["transaction id"] != b"" # hard to test what it will be, but make sure not empty
164 assert len(summary["transaction id"]) == 32
165 assert summary["message state"] == 0
166 assert summary["message type"] == sbuf.contents.mtype == 14
167 assert rmr.rmr_get_meid(sbuf) == summary["meid"] == b"asdf"
168 assert sbuf.contents.sub_id == summary["subscription id"] == 654321
171 def test_alloc_overlapping_flags():
172 """test allocation with setting the transaction id"""
173 sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE, gen_transaction_id=True, fixed_transaction_id=b"6" * 32)
174 summary = rmr.message_summary(sbuf)
175 assert summary["transaction id"] == b"66666666666666666666666666666666"
178 def test_rcv_timeout():
180 test torcv; this is a scary test because if it fails... it doesn't fail, it will run forever!
181 We receive a message (though nothing has been sent) and make sure the function doesn't block forever.
183 There is no unit test for rmr_rcv_msg; too dangerous, that is a blocking call that may never return.
185 sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
186 sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 50) # should time out after 50ms
187 summary = rmr.message_summary(sbuf_rcv)
188 assert summary["message state"] == 12
189 assert summary["message status"] == "RMR_ERR_TIMEOUT"
194 test send and receive
196 pay = b"\x01\x00\x80"
199 sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
200 _assert_new_sbuf(sbuf_send)
201 rmr.set_payload_and_length(pay, sbuf_send)
202 sbuf_send.contents.mtype = 0
203 sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
204 send_summary = rmr.message_summary(sbuf_send)
205 assert send_summary["message state"] == 0 # if send fails don't attempt receive
206 assert send_summary["message status"] == "RMR_OK"
209 # receive it in other context
210 sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
211 sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 2000)
212 rcv_summary = rmr.message_summary(sbuf_rcv)
213 assert rcv_summary["message state"] == 0
214 assert rcv_summary["message status"] == "RMR_OK"
215 assert rcv_summary["message type"] == 0
216 assert rcv_summary["payload"] == pay
219 ack_pay = b"message received"
220 sbuf_rcv = rmr.rmr_rts_msg(MRC_RCV, sbuf_rcv, payload=ack_pay, mtype=6666)
221 rcv_ack_summary = rmr.message_summary(sbuf_rcv)
223 # have the sender receive it
224 sbuf_send = rmr.rmr_torcv_msg(MRC_SEND, sbuf_send, 2000)
225 send_ack_summary = rmr.message_summary(sbuf_send)
227 assert send_ack_summary["message state"] == rcv_ack_summary["message state"] == 0
228 assert send_ack_summary["message status"] == rcv_ack_summary["message status"] == "RMR_OK"
229 assert send_ack_summary["payload"] == ack_pay
230 assert send_ack_summary["message type"] == 6666
233 def test_send_rcv_subid_good():
235 test send and receive where subid is used for routing
237 pay = b"\x01\x00\x80"
242 sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, 3, pay, mtype=test_mtype, sub_id=test_subid)
243 pre_send_summary = rmr.message_summary(sbuf_send)
244 sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
245 send_summary = rmr.message_summary(sbuf_send)
247 # receive it in other context
249 sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, 3)
250 sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 2000)
251 rcv_summary = rmr.message_summary(sbuf_rcv)
254 assert send_summary["message state"] == rcv_summary["message state"] == 0
255 assert send_summary["message status"] == rcv_summary["message status"] == "RMR_OK"
256 assert pre_send_summary["payload"] == rcv_summary["payload"] == pay
257 assert pre_send_summary["message type"] == rcv_summary["message type"] == test_mtype
258 assert pre_send_summary["subscription id"] == rcv_summary["subscription id"] == test_subid
261 def test_send_rcv_subid_bad_subid():
263 test send and receive where subid is used for routing but nobody recieves this subid
265 sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, 3, b"\x01\x00\x80", mtype=46656, sub_id=778)
266 sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
267 assert rmr.message_summary(sbuf_send)["message state"] == 2
268 assert rmr.message_summary(sbuf_send)["message status"] == "RMR_ERR_NOENDPT"
271 def test_send_rcv_subid_bad_mtype():
273 test send and receive where subid is used for routing but nobody recieves this mtype
275 sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, 3, b"\x01\x00\x80", mtype=46657, sub_id=777)
276 sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
277 assert rmr.message_summary(sbuf_send)["message state"] == 2
278 assert rmr.message_summary(sbuf_send)["message status"] == "RMR_ERR_NOENDPT"
281 def send_burst(mrc, fmt, mtype=1, num=13, counter=0):
283 Internal function to support test_rcv_all.
284 Send a burst of messages optionally giving the type, payload
287 sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, SIZE) # seed message buffer
290 payload = bytes(fmt % counter, "UTF-8")
293 rmr.set_payload_and_length(payload, sbuf_send)
294 sbuf_send.contents.mtype = mtype
297 while max_retries > 0:
298 sbuf_send = rmr.rmr_send_msg(mrc, sbuf_send)
299 ms = rmr.message_summary(sbuf_send)
300 if ms["message state"] != 10: # 10 is retry
305 assert ms["message state"] == 0
306 assert max_retries > 0
311 test the ability to receive a batch of queued messages.
313 pay_fmt = "send to ring msg: %d" # dynamic message format with counter
315 send_burst(MRC_SEND, pay_fmt) # send a bunch of 13 messages that should queue
316 time.sleep(1) # ensure underlying transport gets cycles to send/receive
318 bundle = helpers.rmr_rcvall_msgs(MRC_BUF_RCV) # use the buffered receiver to read all with a single call
319 assert len(bundle) == 13
321 for i, ms in enumerate(bundle):
322 ms = bundle[i] # validate each summary returned, and ordering preserved
323 assert ms["message state"] == 0
324 expected_pay = bytes(pay_fmt % i, "UTF-8")
325 assert ms["payload"] == expected_pay
327 send_burst(MRC_SEND, pay_fmt, mtype=1, num=10) # send a second round with msg types 1 and 2 to test filter
328 send_burst(MRC_SEND, pay_fmt, mtype=2, num=8)
329 send_burst(MRC_SEND, pay_fmt, mtype=1, num=5)
330 send_burst(MRC_SEND, pay_fmt, mtype=2, num=4, counter=8) # total of 12 messages with type 2 should be queued
331 time.sleep(1) # ensure underlying transport gets cycles to send/receive
333 bundle = helpers.rmr_rcvall_msgs_raw(MRC_BUF_RCV, [2]) # receive only message type 2 messages
334 assert len(bundle) == 12 # we should only get the second batch of 12 messages
336 for i, (ms, sbuf) in enumerate(bundle): # test the raw version
337 test_summary = rmr.message_summary(sbuf)
338 assert test_summary == ms
339 assert ms["message state"] == 0 # all should be OK
340 assert ms["message type"] == 2 # only mtype 2 should have been received
341 expected_pay = bytes(pay_fmt % i, "UTF-8") # ordering should still jive with the counter
342 assert ms["payload"] == expected_pay
343 rmr.rmr_free_msg(sbuf)
346 def test_bad_buffer():
347 """test that we get a proper exception when the buffer has a null pointer"""
348 with pytest.raises(exceptions.BadBufferAllocation):
349 rmr.rmr_alloc_msg(None, 4096)
352 def test_resize_payload():
353 """test the ability to insert a larger payload into an existing message"""
357 mbuf = rmr.rmr_alloc_msg(MRC_SEND, 25) # allocate buffer with small payload
358 mbuf.contents.mtype = mtype # type and sub-id should not change
359 mbuf.contents.sub_id = subid
361 long_payload = b"This is a long payload that should force the message buffer to be reallocated"
362 rmr.set_payload_and_length(long_payload, mbuf)
363 summary = rmr.message_summary(mbuf)
364 assert summary["payload max size"] >= len(long_payload) # RMR may allocate a larger payload space
365 assert summary["payload length"] == len(long_payload) # however, the length must be exactly the same
366 assert summary["message type"] == mtype # both mtype and sub-id should be preserved in new
367 assert summary["subscription id"] == subid