c7e872a1d76343dd3eee6bf00dff693a202c64cd
[ric-plt/lib/rmr.git] / src / bindings / rmr-python / tests / test_rmr.py
1 # vim: ts=4 sw=4 expandtab:
2 # =================================================================================2
3 #       Copyright (c) 2019 Nokia
4 #       Copyright (c) 2018-2019 AT&T Intellectual Property.
5 #
6 #   Licensed under the Apache License, Version 2.0 (the "License");
7 #   you may not use this file except in compliance with the License.
8 #   You may obtain a copy of the License at
9 #
10 #          http://www.apache.org/licenses/LICENSE-2.0
11 #
12 #   Unless required by applicable law or agreed to in writing, software
13 #   distributed under the License is distributed on an "AS IS" BASIS,
14 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 #   See the License for the specific language governing permissions and
16 #   limitations under the License.
17 # ==================================================================================
18 import time
19 import pytest
20 from rmr import rmr, helpers, exceptions
21
22
23 SIZE = 256
24 MRC_SEND = None
25 MRC_RCV = None
26
27
28 def setup_module():
29     """
30     test_rmr module setup
31     """
32     global MRC_SEND
33     MRC_SEND = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, 0x00)
34     while rmr.rmr_ready(MRC_SEND) == 0:
35         time.sleep(1)
36
37     global MRC_RCV
38     MRC_RCV = rmr.rmr_init(b"4563", rmr.RMR_MAX_RCV_BYTES, 0x00)
39     while rmr.rmr_ready(MRC_RCV) == 0:
40         time.sleep(1)
41
42     global MRC_BUF_RCV
43     MRC_BUF_RCV = rmr.rmr_init(b"4564", rmr.RMR_MAX_RCV_BYTES, 0x02)
44     while rmr.rmr_ready(MRC_BUF_RCV) == 0:
45         time.sleep(1)
46
47
48 def teardown_module():
49     """
50     test rmr module teardown
51     """
52     rmr.rmr_close(MRC_SEND)
53     rmr.rmr_close(MRC_RCV)
54
55
56 def _assert_new_sbuf(sbuf):
57     """
58     verify the initial state of an alloced message is what we expect
59     """
60     summary = rmr.message_summary(sbuf)
61     assert summary["payload"] == b""
62     assert summary["payload length"] == 0
63     assert summary["subscription id"] == -1
64     assert summary["transaction id"] == b""
65     assert summary["message state"] == 0
66     assert summary["message status"] == "RMR_OK"
67     assert summary["meid"] == b""
68     assert summary["errno"] == 0
69
70
71 def test_get_constants(expected_constants):
72     """
73     test getting constants. We don't care what values are returned as those
74     should be meaningful only to RMR. We do care that all of the constants
75     which are defined in expected_contents are returned.  Further, we don't
76     consider it to be an error if the returned list has more constants than
77     what are in our list.
78
79     To avoid frustration, this should list all missing keys, not fail on the
80     first missing key.
81     """
82     errors = 0
83     econst = expected_constants
84     rconst = rmr._get_constants()
85     for key in econst:  # test all expected constants
86         if key not in rconst:  # expected value not listed by rmr
87             errors += 1
88             print("did not find required constant in list from RMR: %s" % key)
89
90     assert errors == 0
91
92
93 def test_get_mapping_dict(expected_states):
94     """
95     test getting mapping string
96     """
97     assert rmr._get_mapping_dict() == expected_states
98     assert rmr._state_to_status(0) == "RMR_OK"
99     assert rmr._state_to_status(12) == "RMR_ERR_TIMEOUT"
100     assert rmr._state_to_status(666) == "UNKNOWN STATE"
101
102
103 def test_meid():
104     """
105     test meid stringification
106     """
107     sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
108
109     rmr.rmr_set_meid(sbuf, b"\x01\x02")
110     assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"\x01\x02"
111     assert len(rmr.rmr_get_meid(sbuf)) == 2
112
113     rmr.rmr_set_meid(sbuf, b"\x00" * 32)
114     assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b""  # NULL bytes get truncated
115
116     rmr.rmr_set_meid(sbuf, b"6" * 32)
117     assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"6" * 32  # string in string out
118
119     rmr.rmr_set_meid(sbuf, b"\x01\x02")
120     assert (
121         rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"\x01\x02" + b"6" * 30
122     )  # bytes in string out, 6s left over
123     assert len(rmr.rmr_get_meid(sbuf)) == 32
124
125
126 def test_rmr_set_get():
127     """
128     test set functions
129     """
130     sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
131     _assert_new_sbuf(sbuf)
132
133     # test payload
134     pay = b"\x01\x00\x80"
135     rmr.set_payload_and_length(pay, sbuf)
136     summary = rmr.message_summary(sbuf)
137     assert summary["payload"] == pay
138     assert summary["payload length"] == 3
139
140     # test transid (note we cant test payload because it's randomly gen)
141     assert summary["transaction id"] == b""
142     assert len(summary["transaction id"]) == 0
143     rmr.generate_and_set_transaction_id(sbuf)
144     summary = rmr.message_summary(sbuf)
145     assert summary["transaction id"] != b""
146     assert len(summary["transaction id"]) == 32
147
148     # test meid
149     assert rmr.rmr_get_meid(sbuf) == summary["meid"] == b""
150     rmr.rmr_set_meid(sbuf, b"666\x01\x00\x01")
151     summary = rmr.message_summary(sbuf)
152     assert rmr.rmr_get_meid(sbuf) == summary["meid"] == b"666\x01"
153     assert (len(summary["meid"])) == 4
154
155
156 def test_alloc_fancy():
157     """test allocation with setting payload, trans, mtype, subid"""
158     pay = b"yoo\x01\x00\x80"
159     sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE, payload=pay, gen_transaction_id=True, mtype=14, meid=b"asdf", sub_id=654321)
160     summary = rmr.message_summary(sbuf)
161     assert summary["payload"] == pay
162     assert summary["payload length"] == 6
163     assert summary["transaction id"] != b""  # hard to test what it will be, but make sure not empty
164     assert len(summary["transaction id"]) == 32
165     assert summary["message state"] == 0
166     assert summary["message type"] == sbuf.contents.mtype == 14
167     assert rmr.rmr_get_meid(sbuf) == summary["meid"] == b"asdf"
168     assert sbuf.contents.sub_id == summary["subscription id"] == 654321
169
170
171 def test_rcv_timeout():
172     """
173     test torcv; this is a scary test because if it fails... it doesn't fail, it will run forever!
174     We receive a message (though nothing has been sent) and make sure the function doesn't block forever.
175
176     There is no unit test for rmr_rcv_msg; too dangerous, that is a blocking call that may never return.
177     """
178     sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
179     sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 50)  # should time out after 50ms
180     summary = rmr.message_summary(sbuf_rcv)
181     assert summary["message state"] == 12
182     assert summary["message status"] == "RMR_ERR_TIMEOUT"
183
184
185 def test_send_rcv():
186     """
187     test send and receive
188     """
189     pay = b"\x01\x00\x80"
190
191     # send a message
192     sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
193     _assert_new_sbuf(sbuf_send)
194     rmr.set_payload_and_length(pay, sbuf_send)
195     sbuf_send.contents.mtype = 0
196     sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
197     send_summary = rmr.message_summary(sbuf_send)
198     assert send_summary["message state"] == 0  # if send fails don't attempt receive
199     assert send_summary["message status"] == "RMR_OK"
200     time.sleep(0.5)
201
202     # receive it in other context
203     sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
204     sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 2000)
205     rcv_summary = rmr.message_summary(sbuf_rcv)
206     assert rcv_summary["message state"] == 0
207     assert rcv_summary["message status"] == "RMR_OK"
208     assert rcv_summary["message type"] == 0
209     assert rcv_summary["payload"] == pay
210
211     # send an ACK back
212     ack_pay = b"message received"
213     sbuf_rcv = rmr.rmr_rts_msg(MRC_RCV, sbuf_rcv, payload=ack_pay, mtype=6666)
214     rcv_ack_summary = rmr.message_summary(sbuf_rcv)
215
216     # have the sender receive it
217     sbuf_send = rmr.rmr_torcv_msg(MRC_SEND, sbuf_send, 2000)
218     send_ack_summary = rmr.message_summary(sbuf_send)
219
220     assert send_ack_summary["message state"] == rcv_ack_summary["message state"] == 0
221     assert send_ack_summary["message status"] == rcv_ack_summary["message status"] == "RMR_OK"
222     assert send_ack_summary["payload"] == ack_pay
223     assert send_ack_summary["message type"] == 6666
224
225
226 def test_send_rcv_subid_good():
227     """
228     test send and receive where subid is used for routing
229     """
230     pay = b"\x01\x00\x80"
231     test_mtype = 46656
232     test_subid = 777
233
234     # send a message
235     sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, 3, pay, mtype=test_mtype, sub_id=test_subid)
236     pre_send_summary = rmr.message_summary(sbuf_send)
237     sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
238     send_summary = rmr.message_summary(sbuf_send)
239
240     # receive it in other context
241     time.sleep(0.5)
242     sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, 3)
243     sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 2000)
244     rcv_summary = rmr.message_summary(sbuf_rcv)
245
246     # asserts
247     assert send_summary["message state"] == rcv_summary["message state"] == 0
248     assert send_summary["message status"] == rcv_summary["message status"] == "RMR_OK"
249     assert pre_send_summary["payload"] == rcv_summary["payload"] == pay
250     assert pre_send_summary["message type"] == rcv_summary["message type"] == test_mtype
251     assert pre_send_summary["subscription id"] == rcv_summary["subscription id"] == test_subid
252
253
254 def test_send_rcv_subid_bad_subid():
255     """
256     test send and receive where subid is used for routing but nobody recieves this subid
257     """
258     sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, 3, b"\x01\x00\x80", mtype=46656, sub_id=778)
259     sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
260     assert rmr.message_summary(sbuf_send)["message state"] == 2
261     assert rmr.message_summary(sbuf_send)["message status"] == "RMR_ERR_NOENDPT"
262
263
264 def test_send_rcv_subid_bad_mtype():
265     """
266     test send and receive where subid is used for routing but nobody recieves this mtype
267     """
268     sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, 3, b"\x01\x00\x80", mtype=46657, sub_id=777)
269     sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
270     assert rmr.message_summary(sbuf_send)["message state"] == 2
271     assert rmr.message_summary(sbuf_send)["message status"] == "RMR_ERR_NOENDPT"
272
273
274 def send_burst(mrc, fmt, mtype=1, num=13, counter=0):
275     """
276         Internal function to support test_rcv_all.
277         Send a burst of messages optionally giving the type, payload
278         and number to send.
279     """
280     sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, SIZE)  # seed message buffer
281
282     for i in range(num):
283         payload = bytes(fmt % counter, "UTF-8")
284         counter += 1
285
286         rmr.set_payload_and_length(payload, sbuf_send)
287         sbuf_send.contents.mtype = mtype
288
289         max_retries = 5
290         while max_retries > 0:
291             sbuf_send = rmr.rmr_send_msg(mrc, sbuf_send)
292             ms = rmr.message_summary(sbuf_send)
293             if ms["message state"] != 10:  # 10 is retry
294                 break
295             max_retries -= 1
296             time.sleep(0.75)
297
298         assert ms["message state"] == 0
299         assert max_retries > 0
300
301
302 def test_rcv_all():
303     """
304     test the ability to receive a batch of queued messages.
305     """
306     pay_fmt = "send to ring msg: %d"  # dynamic message format with counter
307
308     send_burst(MRC_SEND, pay_fmt)  # send a bunch of 13 messages that should queue
309     time.sleep(1)  # ensure underlying transport gets cycles to send/receive
310
311     bundle = helpers.rmr_rcvall_msgs(MRC_BUF_RCV)  # use the buffered receiver to read all with a single call
312     assert len(bundle) == 13
313
314     for i, ms in enumerate(bundle):
315         ms = bundle[i]  # validate each summary returned, and ordering preserved
316         assert ms["message state"] == 0
317         expected_pay = bytes(pay_fmt % i, "UTF-8")
318         assert ms["payload"] == expected_pay
319
320     send_burst(MRC_SEND, pay_fmt, mtype=1, num=10)  # send a second round with msg types 1 and 2 to test filter
321     send_burst(MRC_SEND, pay_fmt, mtype=2, num=8)
322     send_burst(MRC_SEND, pay_fmt, mtype=1, num=5)
323     send_burst(MRC_SEND, pay_fmt, mtype=2, num=4, counter=8)  # total of 12 messages with type 2 should be queued
324     time.sleep(1)  # ensure underlying transport gets cycles to send/receive
325
326     bundle = helpers.rmr_rcvall_msgs_raw(MRC_BUF_RCV, [2])  # receive only message type 2 messages
327     assert len(bundle) == 12  # we should only get the second batch of 12 messages
328
329     for i, (ms, sbuf) in enumerate(bundle):  # test the raw version
330         test_summary = rmr.message_summary(sbuf)
331         assert test_summary == ms
332         assert ms["message state"] == 0  # all should be OK
333         assert ms["message type"] == 2  # only mtype 2 should have been received
334         expected_pay = bytes(pay_fmt % i, "UTF-8")  # ordering should still jive with the counter
335         assert ms["payload"] == expected_pay
336         rmr.rmr_free_msg(sbuf)
337
338
339 def test_bad_buffer():
340     """test that we get a proper exception when the buffer has a null pointer"""
341     with pytest.raises(exceptions.BadBufferAllocation):
342         rmr.rmr_alloc_msg(None, 4096)
343
344
345 def test_resize_payload():
346     """test the ability to insert a larger payload into an existing message"""
347     mtype = 99
348     subid = 100
349
350     mbuf = rmr.rmr_alloc_msg(MRC_SEND, 25)  # allocate buffer with small payload
351     mbuf.contents.mtype = mtype  # type and sub-id should not change
352     mbuf.contents.sub_id = subid
353
354     long_payload = b"This is a long payload that should force the message buffer to be reallocated"
355     rmr.set_payload_and_length(long_payload, mbuf)
356     summary = rmr.message_summary(mbuf)
357     assert summary["payload max size"] >= len(long_payload)  # RMR may allocate a larger payload space
358     assert summary["payload length"] == len(long_payload)  # however, the length must be exactly the same
359     assert summary["message type"] == mtype  # both mtype and sub-id should be preserved in new
360     assert summary["subscription id"] == subid