1 # vim: ts=4 sw=4 expandtab:
2 # =================================================================================2
3 # Copyright (c) 2019 Nokia
4 # Copyright (c) 2018-2019 AT&T Intellectual Property.
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ==================================================================================
20 from rmr import rmr, helpers, exceptions
33 MRC_SEND = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, 0x00)
34 while rmr.rmr_ready(MRC_SEND) == 0:
38 MRC_RCV = rmr.rmr_init(b"4563", rmr.RMR_MAX_RCV_BYTES, 0x00)
39 while rmr.rmr_ready(MRC_RCV) == 0:
43 MRC_BUF_RCV = rmr.rmr_init(b"4564", rmr.RMR_MAX_RCV_BYTES, 0x02)
44 while rmr.rmr_ready(MRC_BUF_RCV) == 0:
48 def teardown_module():
50 test rmr module teardown
52 rmr.rmr_close(MRC_SEND)
53 rmr.rmr_close(MRC_RCV)
56 def _assert_new_sbuf(sbuf):
58 verify the initial state of an alloced message is what we expect
60 summary = rmr.message_summary(sbuf)
61 assert summary["payload"] == b""
62 assert summary["payload length"] == 0
63 assert summary["subscription id"] == -1
64 assert summary["transaction id"] == b""
65 assert summary["message state"] == 0
66 assert summary["message status"] == "RMR_OK"
67 assert summary["meid"] == b""
68 assert summary["errno"] == 0
71 def test_get_constants(expected_constants):
73 test getting constants. We don't care what values are returned as those
74 should be meaningful only to RMR. We do care that all of the constants
75 which are defined in expected_contents are returned. Further, we don't
76 consider it to be an error if the returned list has more constants than
79 To avoid frustration, this should list all missing keys, not fail on the
83 econst = expected_constants
84 rconst = rmr._get_constants()
85 for key in econst: # test all expected constants
86 if key not in rconst: # expected value not listed by rmr
88 print("did not find required constant in list from RMR: %s" % key)
93 def test_get_mapping_dict(expected_states):
95 test getting mapping string
97 assert rmr._get_mapping_dict() == expected_states
98 assert rmr._state_to_status(0) == "RMR_OK"
99 assert rmr._state_to_status(12) == "RMR_ERR_TIMEOUT"
100 assert rmr._state_to_status(666) == "UNKNOWN STATE"
105 test meid stringification
107 sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
109 rmr.rmr_set_meid(sbuf, b"\x01\x02")
110 assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"\x01\x02"
111 assert len(rmr.rmr_get_meid(sbuf)) == 2
113 rmr.rmr_set_meid(sbuf, b"\x00" * 32)
114 assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"" # NULL bytes get truncated
116 rmr.rmr_set_meid(sbuf, b"6" * 32)
117 assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"6" * 32 # string in string out
119 rmr.rmr_set_meid(sbuf, b"\x01\x02")
121 rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"\x01\x02" + b"6" * 30
122 ) # bytes in string out, 6s left over
123 assert len(rmr.rmr_get_meid(sbuf)) == 32
126 def test_rmr_set_get():
130 sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
131 _assert_new_sbuf(sbuf)
134 pay = b"\x01\x00\x80"
135 rmr.set_payload_and_length(pay, sbuf)
136 summary = rmr.message_summary(sbuf)
137 assert summary["payload"] == pay
138 assert summary["payload length"] == 3
140 # test transid (note we cant test payload because it's randomly gen)
141 assert summary["transaction id"] == b""
142 assert len(summary["transaction id"]) == 0
143 rmr.generate_and_set_transaction_id(sbuf)
144 summary = rmr.message_summary(sbuf)
145 assert summary["transaction id"] != b""
146 assert len(summary["transaction id"]) == 32
149 assert rmr.rmr_get_meid(sbuf) == summary["meid"] == b""
150 rmr.rmr_set_meid(sbuf, b"666\x01\x00\x01")
151 summary = rmr.message_summary(sbuf)
152 assert rmr.rmr_get_meid(sbuf) == summary["meid"] == b"666\x01"
153 assert (len(summary["meid"])) == 4
156 def test_alloc_fancy():
157 """test allocation with setting payload, trans, mtype, subid"""
158 pay = b"yoo\x01\x00\x80"
159 sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE, payload=pay, gen_transaction_id=True, mtype=14, meid=b"asdf", sub_id=654321)
160 summary = rmr.message_summary(sbuf)
161 assert summary["payload"] == pay
162 assert summary["payload length"] == 6
163 assert summary["transaction id"] != b"" # hard to test what it will be, but make sure not empty
164 assert len(summary["transaction id"]) == 32
165 assert summary["message state"] == 0
166 assert summary["message type"] == sbuf.contents.mtype == 14
167 assert rmr.rmr_get_meid(sbuf) == summary["meid"] == b"asdf"
168 assert sbuf.contents.sub_id == summary["subscription id"] == 654321
171 def test_rcv_timeout():
173 test torcv; this is a scary test because if it fails... it doesn't fail, it will run forever!
174 We receive a message (though nothing has been sent) and make sure the function doesn't block forever.
176 There is no unit test for rmr_rcv_msg; too dangerous, that is a blocking call that may never return.
178 sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
179 sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 50) # should time out after 50ms
180 summary = rmr.message_summary(sbuf_rcv)
181 assert summary["message state"] == 12
182 assert summary["message status"] == "RMR_ERR_TIMEOUT"
187 test send and receive
189 pay = b"\x01\x00\x80"
192 sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
193 _assert_new_sbuf(sbuf_send)
194 rmr.set_payload_and_length(pay, sbuf_send)
195 sbuf_send.contents.mtype = 0
196 sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
197 send_summary = rmr.message_summary(sbuf_send)
198 assert send_summary["message state"] == 0 # if send fails don't attempt receive
199 assert send_summary["message status"] == "RMR_OK"
202 # receive it in other context
203 sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
204 sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 2000)
205 rcv_summary = rmr.message_summary(sbuf_rcv)
206 assert rcv_summary["message state"] == 0
207 assert rcv_summary["message status"] == "RMR_OK"
208 assert rcv_summary["message type"] == 0
209 assert rcv_summary["payload"] == pay
212 ack_pay = b"message received"
213 rmr.set_payload_and_length(ack_pay, sbuf_rcv)
214 sbuf_rcv = rmr.rmr_rts_msg(MRC_RCV, sbuf_rcv)
215 rcv_ack_summary = rmr.message_summary(sbuf_rcv)
217 # have the sender receive it
218 sbuf_send = rmr.rmr_torcv_msg(MRC_SEND, sbuf_send, 2000)
219 send_ack_summary = rmr.message_summary(sbuf_send)
221 assert send_ack_summary["message state"] == rcv_ack_summary["message state"] == 0
222 assert send_ack_summary["message status"] == rcv_ack_summary["message status"] == "RMR_OK"
223 assert send_ack_summary["payload"] == ack_pay
226 def test_send_rcv_subid_good():
228 test send and receive where subid is used for routing
230 pay = b"\x01\x00\x80"
235 sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, 3, pay, mtype=test_mtype, sub_id=test_subid)
236 pre_send_summary = rmr.message_summary(sbuf_send)
237 sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
238 send_summary = rmr.message_summary(sbuf_send)
240 # receive it in other context
242 sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, 3)
243 sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 2000)
244 rcv_summary = rmr.message_summary(sbuf_rcv)
247 assert send_summary["message state"] == rcv_summary["message state"] == 0
248 assert send_summary["message status"] == rcv_summary["message status"] == "RMR_OK"
249 assert pre_send_summary["payload"] == rcv_summary["payload"] == pay
250 assert pre_send_summary["message type"] == rcv_summary["message type"] == test_mtype
251 assert pre_send_summary["subscription id"] == rcv_summary["subscription id"] == test_subid
254 def test_send_rcv_subid_bad_subid():
256 test send and receive where subid is used for routing but nobody recieves this subid
258 sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, 3, b"\x01\x00\x80", mtype=46656, sub_id=778)
259 sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
260 assert rmr.message_summary(sbuf_send)["message state"] == 2
261 assert rmr.message_summary(sbuf_send)["message status"] == "RMR_ERR_NOENDPT"
264 def test_send_rcv_subid_bad_mtype():
266 test send and receive where subid is used for routing but nobody recieves this mtype
268 sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, 3, b"\x01\x00\x80", mtype=46657, sub_id=777)
269 sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
270 assert rmr.message_summary(sbuf_send)["message state"] == 2
271 assert rmr.message_summary(sbuf_send)["message status"] == "RMR_ERR_NOENDPT"
274 def send_burst(mrc, fmt, mtype=1, num=13, counter=0):
276 Internal function to support test_rcv_all.
277 Send a burst of messages optionally giving the type, payload
280 sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, SIZE) # seed message buffer
283 payload = bytes(fmt % counter, "UTF-8")
286 rmr.set_payload_and_length(payload, sbuf_send)
287 sbuf_send.contents.mtype = mtype
290 while max_retries > 0:
291 sbuf_send = rmr.rmr_send_msg(mrc, sbuf_send)
292 ms = rmr.message_summary(sbuf_send)
293 if ms["message state"] != 10: # 10 is retry
298 assert ms["message state"] == 0
299 assert max_retries > 0
304 test the ability to receive a batch of queued messages.
306 pay_fmt = "send to ring msg: %d" # dynamic message format with counter
308 send_burst(MRC_SEND, pay_fmt) # send a bunch of 13 messages that should queue
309 time.sleep(1) # ensure underlying transport gets cycles to send/receive
311 bundle = helpers.rmr_rcvall_msgs(MRC_BUF_RCV) # use the buffered receiver to read all with a single call
312 assert len(bundle) == 13
314 for i in range(len(bundle)):
315 ms = bundle[i] # validate each summary returned, and ordering preserved
316 assert ms["message state"] == 0
317 expected_pay = bytes(pay_fmt % i, "UTF-8")
318 assert ms["payload"] == expected_pay
320 send_burst(MRC_SEND, pay_fmt, mtype=1, num=10) # send a second round with msg types 1 and 2 to test filter
321 send_burst(MRC_SEND, pay_fmt, mtype=2, num=8)
322 send_burst(MRC_SEND, pay_fmt, mtype=1, num=5)
323 send_burst(MRC_SEND, pay_fmt, mtype=2, num=4, counter=8) # total of 12 messages with type 2 should be queued
324 time.sleep(1) # ensure underlying transport gets cycles to send/receive
326 bundle = helpers.rmr_rcvall_msgs(MRC_BUF_RCV, [2]) # receive only message type 2 messages
327 assert len(bundle) == 12 # we should only get the second batch of 12 messages
329 for i in range(len(bundle)):
330 ms = bundle[i] # validate each summary
331 assert ms["message state"] == 0 # all should be OK
332 assert ms["message type"] == 2 # only mtype 2 should have been received
333 expected_pay = bytes(pay_fmt % i, "UTF-8") # ordering should still jive with the counter
334 assert ms["payload"] == expected_pay
337 def test_bad_buffer():
338 """test that we get a proper exception when the buffer has a null pointer"""
339 with pytest.raises(exceptions.BadBufferAllocation):
340 rmr.rmr_alloc_msg(None, 4096)
343 def test_resize_payload():
344 """test the ability to insert a larger payload into an existing message"""
348 mbuf = rmr.rmr_alloc_msg(MRC_SEND, 25) # allocate buffer with small payload
349 mbuf.contents.mtype = mtype # type and sub-id should not change
350 mbuf.contents.sub_id = subid
352 long_payload = b"This is a long payload that should force the message buffer to be reallocated"
353 rmr.set_payload_and_length(long_payload, mbuf)
354 summary = rmr.message_summary(mbuf)
355 assert summary["payload max size"] >= len(long_payload) # RMR may allocate a larger payload space
356 assert summary["payload length"] == len(long_payload) # however, the length must be exactly the same
357 assert summary["message type"] == mtype # both mtype and sub-id should be preserved in new
358 assert summary["subscription id"] == subid