1 # vim: ts=4 sw=4 expandtab:
2 # =================================================================================2
3 # Copyright (c) 2019 Nokia
4 # Copyright (c) 2018-2019 AT&T Intellectual Property.
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ==================================================================================
20 from rmr import rmr, helpers
33 MRC_SEND = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, 0x00)
34 while rmr.rmr_ready(MRC_SEND) == 0:
38 MRC_RCV = rmr.rmr_init(b"4563", rmr.RMR_MAX_RCV_BYTES, 0x00)
39 while rmr.rmr_ready(MRC_RCV) == 0:
43 MRC_BUF_RCV = rmr.rmr_init(b"4564", rmr.RMR_MAX_RCV_BYTES, 0x02)
44 while rmr.rmr_ready(MRC_RCV) == 0:
47 def teardown_module():
49 test rmr module teardown
51 rmr.rmr_close(MRC_SEND)
52 rmr.rmr_close(MRC_RCV)
55 def _assert_new_sbuf(sbuf):
57 verify the initial state of an alloced message is what we expect
59 summary = rmr.message_summary(sbuf)
60 assert summary["payload"] == b""
61 assert summary["payload length"] == 0
62 assert summary["transaction id"] == b""
63 assert summary["message state"] == 0
64 assert summary["message status"] == "RMR_OK"
65 assert summary["meid"] == ""
66 assert summary["errno"] == 0
69 def test_get_constants(expected_constants):
71 test getting constants. We don't care what values are returned as those
72 should be meaningful only to RMR. We do care that all of the constants
73 which are defined in expected_contents are returned. Further, we don't
74 consider it to be an error if the returned list has more constants than
77 To avoid frustration, this should list all missing keys, not fail on the
81 econst = expected_constants
82 rconst = rmr._get_constants()
83 for key in econst: # test all expected constants
84 if key not in rconst: # expected value not listed by rmr
86 print( "did not find required constant in list from RMR: %s" % key )
91 def test_get_mapping_dict(expected_states):
93 test getting mapping string
95 assert rmr._get_mapping_dict() == expected_states
96 assert rmr._state_to_status(0) == "RMR_OK"
97 assert rmr._state_to_status(12) == "RMR_ERR_TIMEOUT"
98 assert rmr._state_to_status(666) == "UNKNOWN STATE"
103 test meid stringification
105 sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
107 rmr.rmr_set_meid(sbuf, b"\x01\x02", 2)
108 assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == "\x01\x02"
109 assert len(rmr.rmr_get_meid(sbuf)) == 2
111 rmr.rmr_set_meid(sbuf, b"\x00" * 32, 32)
112 assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == "" # NULL bytes get truncated
114 rmr.rmr_set_meid(sbuf, b"6" * 32, 32)
115 assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == "6" * 32 # string in string out
117 rmr.rmr_set_meid(sbuf, b"\x01\x02", 2)
119 rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == "\x01\x02" + "6" * 30
120 ) # bytes in string out, 6s left over
121 assert len(rmr.rmr_get_meid(sbuf)) == 32
124 def test_rmr_set_get():
128 sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
129 _assert_new_sbuf(sbuf)
132 pay = b"\x01\x00\x80"
133 rmr.set_payload_and_length(pay, sbuf)
134 summary = rmr.message_summary(sbuf)
135 assert summary["payload"] == pay
136 assert summary["payload length"] == 3
138 # test transid (note we cant test payload because it's randomly gen)
139 assert summary["transaction id"] == b""
140 assert len(summary["transaction id"]) == 0
141 rmr.generate_and_set_transaction_id(sbuf)
142 summary = rmr.message_summary(sbuf)
143 assert summary["transaction id"] != b""
144 assert len(summary["transaction id"]) == 32
147 assert rmr.rmr_get_meid(sbuf) == summary["meid"] == ""
148 rmr.rmr_set_meid(sbuf, b"666\x01\x00\x01", 6)
149 summary = rmr.message_summary(sbuf)
150 assert rmr.rmr_get_meid(sbuf) == summary["meid"] == "666\x01"
151 assert (len(summary["meid"])) == 4
154 def test_rcv_timeout():
156 test torcv; this is a scary test because if it fails... it doesn't fail, it will run forever!
157 We receive a message (though nothing has been sent) and make sure the function doesn't block forever.
159 There is no unit test for rmr_rcv_msg; too dangerous, that is a blocking call that may never return.
161 sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
162 sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 50) # should time out after 50ms
163 summary = rmr.message_summary(sbuf_rcv)
164 assert summary["message state"] == 12
165 assert summary["message status"] == "RMR_ERR_TIMEOUT"
170 test send and receive
172 pay = b"\x01\x00\x80"
175 sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
176 _assert_new_sbuf(sbuf_send)
177 rmr.set_payload_and_length(pay, sbuf_send)
178 sbuf_send.contents.mtype = 0
179 sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
180 send_summary = rmr.message_summary(sbuf_send)
181 assert send_summary["message state"] == 0 # if send fails don't attempt receive
182 assert send_summary["message status"] == "RMR_OK"
185 # receive it in other context
186 sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
187 sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 2000)
188 rcv_summary = rmr.message_summary(sbuf_rcv)
189 assert rcv_summary["message state"] == 0
190 assert rcv_summary["message status"] == "RMR_OK"
191 assert rcv_summary["message type"] == 0
192 assert rcv_summary["payload"] == pay
195 ack_pay = b"message received"
196 rmr.set_payload_and_length(ack_pay, sbuf_rcv)
197 sbuf_rcv = rmr.rmr_rts_msg(MRC_RCV, sbuf_rcv)
198 rcv_ack_summary = rmr.message_summary(sbuf_rcv)
200 # have the sender receive it
201 sbuf_send = rmr.rmr_torcv_msg(MRC_SEND, sbuf_send, 2000)
202 send_ack_summary = rmr.message_summary(sbuf_send)
204 assert send_ack_summary["message state"] == rcv_ack_summary["message state"] == 0
205 assert send_ack_summary["message status"] == rcv_ack_summary["message status"] == "RMR_OK"
206 assert send_ack_summary["payload"] == ack_pay
209 def send_burst(mrc, fmt, mtype=1, num=13, counter=0):
211 Internal function to support test_rcv_all.
212 Send a burst of messages optionally giving the type, payload
215 sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, SIZE) # seed message buffer
218 payload = bytes(fmt % counter, "UTF-8")
221 rmr.set_payload_and_length(payload, sbuf_send)
222 sbuf_send.contents.mtype = mtype
225 while max_retries > 0:
226 sbuf_send = rmr.rmr_send_msg(mrc, sbuf_send)
227 ms = rmr.message_summary(sbuf_send)
228 if ms["message state"] != 10: # 10 is retry
233 assert ms["message state"] == 0
234 assert max_retries > 0
239 test the ability to receive a batch of queued messages.
241 pay_fmt = "send to ring msg: %d" # dynamic message format with counter
243 send_burst(MRC_SEND, pay_fmt) # send a bunch of 13 messages that should queue
244 time.sleep(1) # ensure underlying transport gets cycles to send/receive
246 bundle = helpers.rmr_rcvall_msgs(MRC_BUF_RCV) # use the buffered receiver to read all with a single call
247 assert len(bundle) == 13
249 for i in range(len(bundle)):
250 ms = bundle[i] # validate each summary returned, and ordering preserved
251 assert ms["message state"] == 0
252 expected_pay = bytes(pay_fmt % i, "UTF-8")
253 assert ms["payload"] == expected_pay
255 send_burst(MRC_SEND, pay_fmt, mtype=1, num=10) # send a second round with msg types 1 and 2 to test filter
256 send_burst(MRC_SEND, pay_fmt, mtype=2, num=8)
257 send_burst(MRC_SEND, pay_fmt, mtype=1, num=5)
258 send_burst(MRC_SEND, pay_fmt, mtype=2, num=4, counter=8) # total of 12 messages with type 2 should be queued
259 time.sleep(1) # ensure underlying transport gets cycles to send/receive
261 bundle = helpers.rmr_rcvall_msgs(MRC_BUF_RCV, [2]) # receive only message type 2 messages
262 assert len(bundle) == 12 # we should only get the second batch of 12 messages
264 for i in range(len(bundle)):
265 ms = bundle[i] # validate each summary
266 assert ms["message state"] == 0 # all should be OK
267 assert ms["message type"] == 2 # only mtype 2 should have been received
268 expected_pay = bytes(pay_fmt % i, "UTF-8") # ordering should still jive with the counter
269 assert ms["payload"] == expected_pay