1 # vim: ts=4 sw=4 expandtab:
2 # =================================================================================2
3 # Copyright (c) 2019-2020 Nokia
4 # Copyright (c) 2018-2020 AT&T Intellectual Property.
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ==================================================================================
20 from rmr import rmr, helpers, exceptions
33 MRC_SEND = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, 0x00)
34 while rmr.rmr_ready(MRC_SEND) == 0:
38 MRC_RCV = rmr.rmr_init(b"4563", rmr.RMR_MAX_RCV_BYTES, 0x00)
39 while rmr.rmr_ready(MRC_RCV) == 0:
43 MRC_BUF_RCV = rmr.rmr_init(b"4564", rmr.RMR_MAX_RCV_BYTES, 0x02)
44 while rmr.rmr_ready(MRC_BUF_RCV) == 0:
48 def teardown_module():
50 test rmr module teardown
52 rmr.rmr_close(MRC_SEND)
53 rmr.rmr_close(MRC_RCV)
56 def _assert_new_sbuf(sbuf):
58 verify the initial state of an alloced message is what we expect
60 summary = rmr.message_summary(sbuf)
61 assert summary["payload"] == b""
62 assert summary["payload length"] == 0
63 assert summary["subscription id"] == -1
64 assert summary["transaction id"] == b""
65 assert summary["message state"] == 0
66 assert summary["message status"] == "RMR_OK"
67 assert summary["meid"] == b""
68 assert summary["errno"] == 0
71 def test_get_constants(expected_constants):
73 test getting constants. We don't care what values are returned as those
74 should be meaningful only to RMR. We do care that all of the constants
75 which are defined in expected_contents are returned. Further, we don't
76 consider it to be an error if the returned list has more constants than
79 To avoid frustration, this should list all missing keys, not fail on the
83 econst = expected_constants
84 rconst = rmr._get_constants()
85 for key in econst: # test all expected constants
86 if key not in rconst: # expected value not listed by rmr
88 print("did not find required constant in list from RMR: %s" % key)
93 def test_get_mapping_dict(expected_states):
95 test getting mapping string
97 assert rmr._get_mapping_dict() == expected_states
98 assert rmr._state_to_status(0) == "RMR_OK"
99 assert rmr._state_to_status(12) == "RMR_ERR_TIMEOUT"
100 assert rmr._state_to_status(666) == "UNKNOWN STATE"
105 test meid stringification
107 sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
109 rmr.rmr_set_meid(sbuf, b"\x01\x02")
110 assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"\x01\x02"
111 assert len(rmr.rmr_get_meid(sbuf)) == 2
113 rmr.rmr_set_meid(sbuf, b"\x00" * 31)
114 assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"" # NULL bytes get truncated
116 rmr.rmr_set_meid(sbuf, b"6" * 31)
117 assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"6" * 31 # string in string out
119 rmr.rmr_set_meid(sbuf, b"\x01\x02")
121 rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"\x01\x02"
122 ) # Ctypes will chop at first nil, so expect only 2 bytes back
124 assert len(rmr.rmr_get_meid(sbuf)) == 2
126 # test that an exception is raised for buffers which are too long
127 with pytest.raises(exceptions.MeidSizeOutOfRange):
128 rmr.rmr_set_meid(sbuf, b"8" * 32)
132 def test_rmr_set_get():
136 sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
137 _assert_new_sbuf(sbuf)
140 pay = b"\x01\x00\x80"
141 rmr.set_payload_and_length(pay, sbuf)
142 summary = rmr.message_summary(sbuf)
143 assert summary["payload"] == pay
144 assert summary["payload length"] == 3
146 # test transid (note we cant test payload because it's randomly gen)
147 assert summary["transaction id"] == b""
148 assert len(summary["transaction id"]) == 0
149 rmr.generate_and_set_transaction_id(sbuf)
150 summary = rmr.message_summary(sbuf)
151 assert summary["transaction id"] != b""
152 assert len(summary["transaction id"]) == 32
155 assert rmr.rmr_get_meid(sbuf) == summary["meid"] == b""
156 rmr.rmr_set_meid(sbuf, b"666\x01\x00\x01")
157 summary = rmr.message_summary(sbuf)
158 assert rmr.rmr_get_meid(sbuf) == summary["meid"] == b"666\x01"
159 assert (len(summary["meid"])) == 4
162 def test_alloc_fancy():
163 """test allocation with setting payload, trans, mtype, subid"""
164 pay = b"yoo\x01\x00\x80"
165 sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE, payload=pay, gen_transaction_id=True, mtype=14, meid=b"asdf", sub_id=654321)
166 summary = rmr.message_summary(sbuf)
167 assert summary["payload"] == pay
168 assert summary["payload length"] == 6
169 assert summary["transaction id"] != b"" # hard to test what it will be, but make sure not empty
170 assert len(summary["transaction id"]) == 32
171 assert summary["message state"] == 0
172 assert summary["message type"] == sbuf.contents.mtype == 14
173 assert rmr.rmr_get_meid(sbuf) == summary["meid"] == b"asdf"
174 assert sbuf.contents.sub_id == summary["subscription id"] == 654321
177 def test_alloc_overlapping_flags():
178 """test allocation with setting the transaction id"""
179 sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE, gen_transaction_id=True, fixed_transaction_id=b"6" * 32)
180 summary = rmr.message_summary(sbuf)
181 assert summary["transaction id"] == b"66666666666666666666666666666666"
184 def test_rcv_timeout():
186 test torcv; this is a scary test because if it fails... it doesn't fail, it will run forever!
187 We receive a message (though nothing has been sent) and make sure the function doesn't block forever.
189 There is no unit test for rmr_rcv_msg; too dangerous, that is a blocking call that may never return.
191 sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
192 sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 50) # should time out after 50ms
193 summary = rmr.message_summary(sbuf_rcv)
194 assert summary["message state"] == 12
195 assert summary["message status"] == "RMR_ERR_TIMEOUT"
200 test send and receive
202 pay = b"\x01\x00\x80"
205 sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
206 _assert_new_sbuf(sbuf_send)
207 rmr.set_payload_and_length(pay, sbuf_send)
208 sbuf_send.contents.mtype = 0
209 sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
210 send_summary = rmr.message_summary(sbuf_send)
211 assert send_summary["message state"] == 0 # if send fails don't attempt receive
212 assert send_summary["message status"] == "RMR_OK"
215 # receive it in other context
216 sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
217 sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 2000)
218 rcv_summary = rmr.message_summary(sbuf_rcv)
219 assert rcv_summary["message state"] == 0
220 assert rcv_summary["message status"] == "RMR_OK"
221 assert rcv_summary["message type"] == 0
222 assert rcv_summary["payload"] == pay
225 ack_pay = b"message received"
226 sbuf_rcv = rmr.rmr_rts_msg(MRC_RCV, sbuf_rcv, payload=ack_pay, mtype=6666)
227 rcv_ack_summary = rmr.message_summary(sbuf_rcv)
229 # have the sender receive it
230 sbuf_send = rmr.rmr_torcv_msg(MRC_SEND, sbuf_send, 2000)
231 send_ack_summary = rmr.message_summary(sbuf_send)
233 assert send_ack_summary["message state"] == rcv_ack_summary["message state"] == 0
234 assert send_ack_summary["message status"] == rcv_ack_summary["message status"] == "RMR_OK"
235 assert send_ack_summary["payload"] == ack_pay
236 assert send_ack_summary["message type"] == 6666
239 def test_send_rcv_subid_good():
241 test send and receive where subid is used for routing
243 pay = b"\x01\x00\x80"
248 sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, 3, pay, mtype=test_mtype, sub_id=test_subid)
249 pre_send_summary = rmr.message_summary(sbuf_send)
250 sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
251 send_summary = rmr.message_summary(sbuf_send)
253 # receive it in other context
255 sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, 3)
256 sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 2000)
257 rcv_summary = rmr.message_summary(sbuf_rcv)
260 assert send_summary["message state"] == rcv_summary["message state"] == 0
261 assert send_summary["message status"] == rcv_summary["message status"] == "RMR_OK"
262 assert pre_send_summary["payload"] == rcv_summary["payload"] == pay
263 assert pre_send_summary["message type"] == rcv_summary["message type"] == test_mtype
264 assert pre_send_summary["subscription id"] == rcv_summary["subscription id"] == test_subid
267 def test_send_rcv_subid_bad_subid():
269 test send and receive where subid is used for routing but nobody recieves this subid
271 sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, 3, b"\x01\x00\x80", mtype=46656, sub_id=778)
272 sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
273 assert rmr.message_summary(sbuf_send)["message state"] == 2
274 assert rmr.message_summary(sbuf_send)["message status"] == "RMR_ERR_NOENDPT"
277 def test_send_rcv_subid_bad_mtype():
279 test send and receive where subid is used for routing but nobody recieves this mtype
281 sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, 3, b"\x01\x00\x80", mtype=46657, sub_id=777)
282 sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
283 assert rmr.message_summary(sbuf_send)["message state"] == 2
284 assert rmr.message_summary(sbuf_send)["message status"] == "RMR_ERR_NOENDPT"
287 def send_burst(mrc, fmt, mtype=1, num=13, counter=0):
289 Internal function to support test_rcv_all.
290 Send a burst of messages optionally giving the type, payload
293 sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, SIZE) # seed message buffer
296 payload = bytes(fmt % counter, "UTF-8")
299 rmr.set_payload_and_length(payload, sbuf_send)
300 sbuf_send.contents.mtype = mtype
303 while max_retries > 0:
304 sbuf_send = rmr.rmr_send_msg(mrc, sbuf_send)
305 ms = rmr.message_summary(sbuf_send)
306 if ms["message state"] != 10: # 10 is retry
311 assert ms["message state"] == 0
312 assert max_retries > 0
317 test the ability to receive a batch of queued messages.
319 pay_fmt = "send to ring msg: %d" # dynamic message format with counter
321 send_burst(MRC_SEND, pay_fmt) # send a bunch of 13 messages that should queue
322 time.sleep(1) # ensure underlying transport gets cycles to send/receive
324 bundle = helpers.rmr_rcvall_msgs(MRC_BUF_RCV) # use the buffered receiver to read all with a single call
325 assert len(bundle) == 13
327 for i, ms in enumerate(bundle):
328 ms = bundle[i] # validate each summary returned, and ordering preserved
329 assert ms["message state"] == 0
330 expected_pay = bytes(pay_fmt % i, "UTF-8")
331 assert ms["payload"] == expected_pay
333 send_burst(MRC_SEND, pay_fmt, mtype=1, num=10) # send a second round with msg types 1 and 2 to test filter
334 send_burst(MRC_SEND, pay_fmt, mtype=2, num=8)
335 send_burst(MRC_SEND, pay_fmt, mtype=1, num=5)
336 send_burst(MRC_SEND, pay_fmt, mtype=2, num=4, counter=8) # total of 12 messages with type 2 should be queued
337 time.sleep(1) # ensure underlying transport gets cycles to send/receive
339 bundle = helpers.rmr_rcvall_msgs_raw(MRC_BUF_RCV, [2]) # receive only message type 2 messages
340 assert len(bundle) == 12 # we should only get the second batch of 12 messages
342 for i, (ms, sbuf) in enumerate(bundle): # test the raw version
343 test_summary = rmr.message_summary(sbuf)
344 assert test_summary == ms
345 assert ms["message state"] == 0 # all should be OK
346 assert ms["message type"] == 2 # only mtype 2 should have been received
347 expected_pay = bytes(pay_fmt % i, "UTF-8") # ordering should still jive with the counter
348 assert ms["payload"] == expected_pay
349 rmr.rmr_free_msg(sbuf)
352 def test_bad_buffer():
353 """test that we get a proper exception when the buffer has a null pointer"""
354 with pytest.raises(exceptions.BadBufferAllocation):
355 rmr.rmr_alloc_msg(None, 4096)
358 def test_resize_payload():
359 """test the ability to insert a larger payload into an existing message"""
363 mbuf = rmr.rmr_alloc_msg(MRC_SEND, 25) # allocate buffer with small payload
364 mbuf.contents.mtype = mtype # type and sub-id should not change
365 mbuf.contents.sub_id = subid
367 long_payload = b"This is a long payload that should force the message buffer to be reallocated"
368 rmr.set_payload_and_length(long_payload, mbuf)
369 summary = rmr.message_summary(mbuf)
370 assert summary["payload max size"] >= len(long_payload) # RMR may allocate a larger payload space
371 assert summary["payload length"] == len(long_payload) # however, the length must be exactly the same
372 assert summary["message type"] == mtype # both mtype and sub-id should be preserved in new
373 assert summary["subscription id"] == subid