1 # vim: ts=4 sw=4 expandtab:
2 # =================================================================================2
3 # Copyright (c) 2019-2020 Nokia
4 # Copyright (c) 2018-2020 AT&T Intellectual Property.
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ==================================================================================
20 from ricxappframe.rmr import rmr, helpers, exceptions
33 MRC_SEND = rmr.rmr_init(b"3562", rmr.RMR_MAX_RCV_BYTES, 0x00)
34 while rmr.rmr_ready(MRC_SEND) == 0:
38 MRC_RCV = rmr.rmr_init(b"3563", rmr.RMR_MAX_RCV_BYTES, 0x00)
39 while rmr.rmr_ready(MRC_RCV) == 0:
43 MRC_BUF_RCV = rmr.rmr_init(b"3564", rmr.RMR_MAX_RCV_BYTES, 0x02)
44 while rmr.rmr_ready(MRC_BUF_RCV) == 0:
48 def teardown_module():
50 test rmr module teardown
52 rmr.rmr_close(MRC_SEND)
53 rmr.rmr_close(MRC_RCV)
56 def _assert_new_sbuf(sbuf):
58 verify the initial state of an alloced message is what we expect
60 summary = rmr.message_summary(sbuf)
61 assert summary["payload"] == b""
62 assert summary["payload length"] == 0
63 assert summary["subscription id"] == -1
64 assert summary["transaction id"] == b""
65 assert summary["message state"] == 0
66 assert summary["message status"] == "RMR_OK"
67 assert summary["meid"] == b""
68 assert summary["errno"] == 0
71 def test_get_constants(expected_constants):
73 test getting constants. We don't care what values are returned as those
74 should be meaningful only to RMR. We do care that all of the constants
75 which are defined in expected_contents are returned. Further, we don't
76 consider it to be an error if the returned list has more constants than
79 To avoid frustration, this should list all missing keys, not fail on the
83 econst = expected_constants
84 rconst = rmr._get_constants()
85 for key in econst: # test all expected constants
86 if key not in rconst: # expected value not listed by rmr
88 print("did not find required constant in list from RMR: %s" % key)
93 def test_get_mapping_dict(expected_states):
95 test getting mapping string
97 assert rmr._get_mapping_dict() == expected_states
98 assert rmr._state_to_status(0) == "RMR_OK"
99 assert rmr._state_to_status(12) == "RMR_ERR_TIMEOUT"
100 assert rmr._state_to_status(666) == "UNKNOWN STATE"
105 test meid stringification
107 sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
109 rmr.rmr_set_meid(sbuf, b"\x01\x02")
110 assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"\x01\x02"
111 assert len(rmr.rmr_get_meid(sbuf)) == 2
113 rmr.rmr_set_meid(sbuf, b"\x00" * 31)
114 assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"" # NULL bytes get truncated
116 rmr.rmr_set_meid(sbuf, b"6" * 31)
117 assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"6" * 31 # string in string out
119 rmr.rmr_set_meid(sbuf, b"\x01\x02")
121 rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"\x01\x02"
122 ) # Ctypes will chop at first nil, so expect only 2 bytes back
124 assert len(rmr.rmr_get_meid(sbuf)) == 2
126 # test that an exception is raised for buffers which are too long
127 with pytest.raises(exceptions.MeidSizeOutOfRange):
128 rmr.rmr_set_meid(sbuf, b"8" * 32)
131 def test_rmr_set_get():
135 sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
136 _assert_new_sbuf(sbuf)
139 pay = b"\x01\x00\x80"
140 rmr.set_payload_and_length(pay, sbuf)
141 summary = rmr.message_summary(sbuf)
142 assert summary["payload"] == pay
143 assert summary["payload length"] == 3
145 # test transid (note we cant test payload because it's randomly gen)
146 assert summary["transaction id"] == b""
147 assert len(summary["transaction id"]) == 0
148 rmr.generate_and_set_transaction_id(sbuf)
149 summary = rmr.message_summary(sbuf)
150 assert summary["transaction id"] != b""
151 assert len(summary["transaction id"]) == 32
154 assert rmr.rmr_get_meid(sbuf) == summary["meid"] == b""
155 rmr.rmr_set_meid(sbuf, b"666\x01\x00\x01")
156 summary = rmr.message_summary(sbuf)
157 assert rmr.rmr_get_meid(sbuf) == summary["meid"] == b"666\x01"
158 assert (len(summary["meid"])) == 4
161 def test_alloc_fancy():
162 """test allocation with setting payload, trans, mtype, subid"""
163 pay = b"yoo\x01\x00\x80"
164 sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE, payload=pay, gen_transaction_id=True, mtype=14, meid=b"asdf", sub_id=654321)
165 summary = rmr.message_summary(sbuf)
166 assert summary["payload"] == pay
167 assert summary["payload length"] == 6
168 assert summary["transaction id"] != b"" # hard to test what it will be, but make sure not empty
169 assert len(summary["transaction id"]) == 32
170 assert summary["message state"] == 0
171 assert summary["message type"] == sbuf.contents.mtype == 14
172 assert rmr.rmr_get_meid(sbuf) == summary["meid"] == b"asdf"
173 assert sbuf.contents.sub_id == summary["subscription id"] == 654321
176 def test_alloc_overlapping_flags():
177 """test allocation with setting the transaction id"""
178 sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE, gen_transaction_id=True, fixed_transaction_id=b"6" * 32)
179 summary = rmr.message_summary(sbuf)
180 assert summary["transaction id"] == b"66666666666666666666666666666666"
183 def test_rcv_timeout():
185 test torcv; this is a scary test because if it fails... it doesn't fail, it will run forever!
186 We receive a message (though nothing has been sent) and make sure the function doesn't block forever.
188 There is no unit test for rmr_rcv_msg; too dangerous, that is a blocking call that may never return.
190 sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
191 sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 50) # should time out after 50ms
192 summary = rmr.message_summary(sbuf_rcv)
193 assert summary["message state"] == 12
194 assert summary["message status"] == "RMR_ERR_TIMEOUT"
199 test send and receive
201 pay = b"\x01\x00\x80"
204 sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
205 _assert_new_sbuf(sbuf_send)
206 rmr.set_payload_and_length(pay, sbuf_send)
207 sbuf_send.contents.mtype = 0
208 sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
209 send_summary = rmr.message_summary(sbuf_send)
210 assert send_summary["message state"] == 0 # if send fails don't attempt receive
211 assert send_summary["message status"] == "RMR_OK"
214 # receive it in other context
215 sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
216 sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 2000)
217 rcv_summary = rmr.message_summary(sbuf_rcv)
218 assert rcv_summary["message state"] == 0
219 assert rcv_summary["message status"] == "RMR_OK"
220 assert rcv_summary["message type"] == 0
221 assert rcv_summary["payload"] == pay
224 ack_pay = b"message received"
225 sbuf_rcv = rmr.rmr_rts_msg(MRC_RCV, sbuf_rcv, payload=ack_pay, mtype=6666)
226 rcv_ack_summary = rmr.message_summary(sbuf_rcv)
228 # have the sender receive it
229 sbuf_send = rmr.rmr_torcv_msg(MRC_SEND, sbuf_send, 2000)
230 send_ack_summary = rmr.message_summary(sbuf_send)
232 assert send_ack_summary["message state"] == rcv_ack_summary["message state"] == 0
233 assert send_ack_summary["message status"] == rcv_ack_summary["message status"] == "RMR_OK"
234 assert send_ack_summary["payload"] == ack_pay
235 assert send_ack_summary["message type"] == 6666
238 def test_send_rcv_subid_good():
240 test send and receive where subid is used for routing
242 pay = b"\x01\x00\x80"
247 sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, 3, pay, mtype=test_mtype, sub_id=test_subid)
248 pre_send_summary = rmr.message_summary(sbuf_send)
249 sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
250 send_summary = rmr.message_summary(sbuf_send)
252 # receive it in other context
254 sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, 3)
255 sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 2000)
256 rcv_summary = rmr.message_summary(sbuf_rcv)
259 assert send_summary["message state"] == rcv_summary["message state"] == 0
260 assert send_summary["message status"] == rcv_summary["message status"] == "RMR_OK"
261 assert pre_send_summary["payload"] == rcv_summary["payload"] == pay
262 assert pre_send_summary["message type"] == rcv_summary["message type"] == test_mtype
263 assert pre_send_summary["subscription id"] == rcv_summary["subscription id"] == test_subid
266 def test_send_rcv_subid_bad_subid():
268 test send and receive where subid is used for routing but nobody recieves this subid
270 sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, 3, b"\x01\x00\x80", mtype=46656, sub_id=778)
271 sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
272 assert rmr.message_summary(sbuf_send)["message state"] == 2
273 assert rmr.message_summary(sbuf_send)["message status"] == "RMR_ERR_NOENDPT"
276 def test_send_rcv_subid_bad_mtype():
278 test send and receive where subid is used for routing but nobody recieves this mtype
280 sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, 3, b"\x01\x00\x80", mtype=46657, sub_id=777)
281 sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
282 assert rmr.message_summary(sbuf_send)["message state"] == 2
283 assert rmr.message_summary(sbuf_send)["message status"] == "RMR_ERR_NOENDPT"
286 def send_burst(mrc, fmt, mtype=1, num=13, counter=0):
288 Internal function to support test_rcv_all.
289 Send a burst of messages optionally giving the type, payload
292 sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, SIZE) # seed message buffer
295 payload = bytes(fmt % counter, "UTF-8")
298 rmr.set_payload_and_length(payload, sbuf_send)
299 sbuf_send.contents.mtype = mtype
302 while max_retries > 0:
303 sbuf_send = rmr.rmr_send_msg(mrc, sbuf_send)
304 ms = rmr.message_summary(sbuf_send)
305 if ms["message state"] != 10: # 10 is retry
310 assert ms["message state"] == 0
311 assert max_retries > 0
316 test the ability to receive a batch of queued messages.
318 pay_fmt = "send to ring msg: %d" # dynamic message format with counter
320 send_burst(MRC_SEND, pay_fmt) # send a bunch of 13 messages that should queue
321 time.sleep(1) # ensure underlying transport gets cycles to send/receive
323 bundle = helpers.rmr_rcvall_msgs(MRC_BUF_RCV) # use the buffered receiver to read all with a single call
324 assert len(bundle) == 13
326 for i, ms in enumerate(bundle):
327 ms = bundle[i] # validate each summary returned, and ordering preserved
328 assert ms["message state"] == 0
329 expected_pay = bytes(pay_fmt % i, "UTF-8")
330 assert ms["payload"] == expected_pay
332 send_burst(MRC_SEND, pay_fmt, mtype=1, num=10) # send a second round with msg types 1 and 2 to test filter
333 send_burst(MRC_SEND, pay_fmt, mtype=2, num=8)
334 send_burst(MRC_SEND, pay_fmt, mtype=1, num=5)
335 send_burst(MRC_SEND, pay_fmt, mtype=2, num=4, counter=8) # total of 12 messages with type 2 should be queued
336 time.sleep(1) # ensure underlying transport gets cycles to send/receive
338 bundle = helpers.rmr_rcvall_msgs_raw(MRC_BUF_RCV, [2]) # receive only message type 2 messages
339 assert len(bundle) == 12 # we should only get the second batch of 12 messages
341 for i, (ms, sbuf) in enumerate(bundle): # test the raw version
342 test_summary = rmr.message_summary(sbuf)
343 assert test_summary == ms
344 assert ms["message state"] == 0 # all should be OK
345 assert ms["message type"] == 2 # only mtype 2 should have been received
346 expected_pay = bytes(pay_fmt % i, "UTF-8") # ordering should still jive with the counter
347 assert ms["payload"] == expected_pay
348 rmr.rmr_free_msg(sbuf)
351 def test_bad_buffer():
352 """test that we get a proper exception when the buffer has a null pointer"""
353 with pytest.raises(exceptions.BadBufferAllocation):
354 rmr.rmr_alloc_msg(None, 4096)
357 def test_resize_payload():
358 """test the ability to insert a larger payload into an existing message"""
362 mbuf = rmr.rmr_alloc_msg(MRC_SEND, 25) # allocate buffer with small payload
363 mbuf.contents.mtype = mtype # type and sub-id should not change
364 mbuf.contents.sub_id = subid
366 long_payload = b"This is a long payload that should force the message buffer to be reallocated"
367 rmr.set_payload_and_length(long_payload, mbuf)
368 summary = rmr.message_summary(mbuf)
369 assert summary["payload max size"] >= len(long_payload) # RMR may allocate a larger payload space
370 assert summary["payload length"] == len(long_payload) # however, the length must be exactly the same
371 assert summary["message type"] == mtype # both mtype and sub-id should be preserved in new
372 assert summary["subscription id"] == subid