Add ability to set xaction deterministically, bug fixes
[ric-plt/lib/rmr.git] / src / bindings / rmr-python / tests / test_rmr.py
1 # vim: ts=4 sw=4 expandtab:
2 # =================================================================================2
3 #       Copyright (c) 2019 Nokia
4 #       Copyright (c) 2018-2019 AT&T Intellectual Property.
5 #
6 #   Licensed under the Apache License, Version 2.0 (the "License");
7 #   you may not use this file except in compliance with the License.
8 #   You may obtain a copy of the License at
9 #
10 #          http://www.apache.org/licenses/LICENSE-2.0
11 #
12 #   Unless required by applicable law or agreed to in writing, software
13 #   distributed under the License is distributed on an "AS IS" BASIS,
14 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 #   See the License for the specific language governing permissions and
16 #   limitations under the License.
17 # ==================================================================================
18 import time
19 import pytest
20 from rmr import rmr, helpers, exceptions
21
22
23 SIZE = 256
24 MRC_SEND = None
25 MRC_RCV = None
26
27
28 def setup_module():
29     """
30     test_rmr module setup
31     """
32     global MRC_SEND
33     MRC_SEND = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, 0x00)
34     while rmr.rmr_ready(MRC_SEND) == 0:
35         time.sleep(1)
36
37     global MRC_RCV
38     MRC_RCV = rmr.rmr_init(b"4563", rmr.RMR_MAX_RCV_BYTES, 0x00)
39     while rmr.rmr_ready(MRC_RCV) == 0:
40         time.sleep(1)
41
42     global MRC_BUF_RCV
43     MRC_BUF_RCV = rmr.rmr_init(b"4564", rmr.RMR_MAX_RCV_BYTES, 0x02)
44     while rmr.rmr_ready(MRC_BUF_RCV) == 0:
45         time.sleep(1)
46
47
48 def teardown_module():
49     """
50     test rmr module teardown
51     """
52     rmr.rmr_close(MRC_SEND)
53     rmr.rmr_close(MRC_RCV)
54
55
56 def _assert_new_sbuf(sbuf):
57     """
58     verify the initial state of an alloced message is what we expect
59     """
60     summary = rmr.message_summary(sbuf)
61     assert summary["payload"] == b""
62     assert summary["payload length"] == 0
63     assert summary["subscription id"] == -1
64     assert summary["transaction id"] == b""
65     assert summary["message state"] == 0
66     assert summary["message status"] == "RMR_OK"
67     assert summary["meid"] == b""
68     assert summary["errno"] == 0
69
70
71 def test_get_constants(expected_constants):
72     """
73     test getting constants. We don't care what values are returned as those
74     should be meaningful only to RMR. We do care that all of the constants
75     which are defined in expected_contents are returned.  Further, we don't
76     consider it to be an error if the returned list has more constants than
77     what are in our list.
78
79     To avoid frustration, this should list all missing keys, not fail on the
80     first missing key.
81     """
82     errors = 0
83     econst = expected_constants
84     rconst = rmr._get_constants()
85     for key in econst:  # test all expected constants
86         if key not in rconst:  # expected value not listed by rmr
87             errors += 1
88             print("did not find required constant in list from RMR: %s" % key)
89
90     assert errors == 0
91
92
93 def test_get_mapping_dict(expected_states):
94     """
95     test getting mapping string
96     """
97     assert rmr._get_mapping_dict() == expected_states
98     assert rmr._state_to_status(0) == "RMR_OK"
99     assert rmr._state_to_status(12) == "RMR_ERR_TIMEOUT"
100     assert rmr._state_to_status(666) == "UNKNOWN STATE"
101
102
103 def test_meid():
104     """
105     test meid stringification
106     """
107     sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
108
109     rmr.rmr_set_meid(sbuf, b"\x01\x02")
110     assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"\x01\x02"
111     assert len(rmr.rmr_get_meid(sbuf)) == 2
112
113     rmr.rmr_set_meid(sbuf, b"\x00" * 32)
114     assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b""  # NULL bytes get truncated
115
116     rmr.rmr_set_meid(sbuf, b"6" * 32)
117     assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"6" * 32  # string in string out
118
119     rmr.rmr_set_meid(sbuf, b"\x01\x02")
120     assert (
121         rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"\x01\x02" + b"6" * 30
122     )  # bytes in string out, 6s left over
123     assert len(rmr.rmr_get_meid(sbuf)) == 32
124
125
126 def test_rmr_set_get():
127     """
128     test set functions
129     """
130     sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
131     _assert_new_sbuf(sbuf)
132
133     # test payload
134     pay = b"\x01\x00\x80"
135     rmr.set_payload_and_length(pay, sbuf)
136     summary = rmr.message_summary(sbuf)
137     assert summary["payload"] == pay
138     assert summary["payload length"] == 3
139
140     # test transid (note we cant test payload because it's randomly gen)
141     assert summary["transaction id"] == b""
142     assert len(summary["transaction id"]) == 0
143     rmr.generate_and_set_transaction_id(sbuf)
144     summary = rmr.message_summary(sbuf)
145     assert summary["transaction id"] != b""
146     assert len(summary["transaction id"]) == 32
147
148     # test meid
149     assert rmr.rmr_get_meid(sbuf) == summary["meid"] == b""
150     rmr.rmr_set_meid(sbuf, b"666\x01\x00\x01")
151     summary = rmr.message_summary(sbuf)
152     assert rmr.rmr_get_meid(sbuf) == summary["meid"] == b"666\x01"
153     assert (len(summary["meid"])) == 4
154
155
156 def test_alloc_fancy():
157     """test allocation with setting payload, trans, mtype, subid"""
158     pay = b"yoo\x01\x00\x80"
159     sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE, payload=pay, gen_transaction_id=True, mtype=14, meid=b"asdf", sub_id=654321)
160     summary = rmr.message_summary(sbuf)
161     assert summary["payload"] == pay
162     assert summary["payload length"] == 6
163     assert summary["transaction id"] != b""  # hard to test what it will be, but make sure not empty
164     assert len(summary["transaction id"]) == 32
165     assert summary["message state"] == 0
166     assert summary["message type"] == sbuf.contents.mtype == 14
167     assert rmr.rmr_get_meid(sbuf) == summary["meid"] == b"asdf"
168     assert sbuf.contents.sub_id == summary["subscription id"] == 654321
169
170
171 def test_alloc_overlapping_flags():
172     """test allocation with setting the transaction id"""
173     sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE, gen_transaction_id=True, fixed_transaction_id=b"6" * 32)
174     summary = rmr.message_summary(sbuf)
175     assert summary["transaction id"] == b"66666666666666666666666666666666"
176
177
178 def test_rcv_timeout():
179     """
180     test torcv; this is a scary test because if it fails... it doesn't fail, it will run forever!
181     We receive a message (though nothing has been sent) and make sure the function doesn't block forever.
182
183     There is no unit test for rmr_rcv_msg; too dangerous, that is a blocking call that may never return.
184     """
185     sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
186     sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 50)  # should time out after 50ms
187     summary = rmr.message_summary(sbuf_rcv)
188     assert summary["message state"] == 12
189     assert summary["message status"] == "RMR_ERR_TIMEOUT"
190
191
192 def test_send_rcv():
193     """
194     test send and receive
195     """
196     pay = b"\x01\x00\x80"
197
198     # send a message
199     sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
200     _assert_new_sbuf(sbuf_send)
201     rmr.set_payload_and_length(pay, sbuf_send)
202     sbuf_send.contents.mtype = 0
203     sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
204     send_summary = rmr.message_summary(sbuf_send)
205     assert send_summary["message state"] == 0  # if send fails don't attempt receive
206     assert send_summary["message status"] == "RMR_OK"
207     time.sleep(0.5)
208
209     # receive it in other context
210     sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
211     sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 2000)
212     rcv_summary = rmr.message_summary(sbuf_rcv)
213     assert rcv_summary["message state"] == 0
214     assert rcv_summary["message status"] == "RMR_OK"
215     assert rcv_summary["message type"] == 0
216     assert rcv_summary["payload"] == pay
217
218     # send an ACK back
219     ack_pay = b"message received"
220     sbuf_rcv = rmr.rmr_rts_msg(MRC_RCV, sbuf_rcv, payload=ack_pay, mtype=6666)
221     rcv_ack_summary = rmr.message_summary(sbuf_rcv)
222
223     # have the sender receive it
224     sbuf_send = rmr.rmr_torcv_msg(MRC_SEND, sbuf_send, 2000)
225     send_ack_summary = rmr.message_summary(sbuf_send)
226
227     assert send_ack_summary["message state"] == rcv_ack_summary["message state"] == 0
228     assert send_ack_summary["message status"] == rcv_ack_summary["message status"] == "RMR_OK"
229     assert send_ack_summary["payload"] == ack_pay
230     assert send_ack_summary["message type"] == 6666
231
232
233 def test_send_rcv_subid_good():
234     """
235     test send and receive where subid is used for routing
236     """
237     pay = b"\x01\x00\x80"
238     test_mtype = 46656
239     test_subid = 777
240
241     # send a message
242     sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, 3, pay, mtype=test_mtype, sub_id=test_subid)
243     pre_send_summary = rmr.message_summary(sbuf_send)
244     sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
245     send_summary = rmr.message_summary(sbuf_send)
246
247     # receive it in other context
248     time.sleep(0.5)
249     sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, 3)
250     sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 2000)
251     rcv_summary = rmr.message_summary(sbuf_rcv)
252
253     # asserts
254     assert send_summary["message state"] == rcv_summary["message state"] == 0
255     assert send_summary["message status"] == rcv_summary["message status"] == "RMR_OK"
256     assert pre_send_summary["payload"] == rcv_summary["payload"] == pay
257     assert pre_send_summary["message type"] == rcv_summary["message type"] == test_mtype
258     assert pre_send_summary["subscription id"] == rcv_summary["subscription id"] == test_subid
259
260
261 def test_send_rcv_subid_bad_subid():
262     """
263     test send and receive where subid is used for routing but nobody recieves this subid
264     """
265     sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, 3, b"\x01\x00\x80", mtype=46656, sub_id=778)
266     sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
267     assert rmr.message_summary(sbuf_send)["message state"] == 2
268     assert rmr.message_summary(sbuf_send)["message status"] == "RMR_ERR_NOENDPT"
269
270
271 def test_send_rcv_subid_bad_mtype():
272     """
273     test send and receive where subid is used for routing but nobody recieves this mtype
274     """
275     sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, 3, b"\x01\x00\x80", mtype=46657, sub_id=777)
276     sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
277     assert rmr.message_summary(sbuf_send)["message state"] == 2
278     assert rmr.message_summary(sbuf_send)["message status"] == "RMR_ERR_NOENDPT"
279
280
281 def send_burst(mrc, fmt, mtype=1, num=13, counter=0):
282     """
283         Internal function to support test_rcv_all.
284         Send a burst of messages optionally giving the type, payload
285         and number to send.
286     """
287     sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, SIZE)  # seed message buffer
288
289     for i in range(num):
290         payload = bytes(fmt % counter, "UTF-8")
291         counter += 1
292
293         rmr.set_payload_and_length(payload, sbuf_send)
294         sbuf_send.contents.mtype = mtype
295
296         max_retries = 5
297         while max_retries > 0:
298             sbuf_send = rmr.rmr_send_msg(mrc, sbuf_send)
299             ms = rmr.message_summary(sbuf_send)
300             if ms["message state"] != 10:  # 10 is retry
301                 break
302             max_retries -= 1
303             time.sleep(0.75)
304
305         assert ms["message state"] == 0
306         assert max_retries > 0
307
308
309 def test_rcv_all():
310     """
311     test the ability to receive a batch of queued messages.
312     """
313     pay_fmt = "send to ring msg: %d"  # dynamic message format with counter
314
315     send_burst(MRC_SEND, pay_fmt)  # send a bunch of 13 messages that should queue
316     time.sleep(1)  # ensure underlying transport gets cycles to send/receive
317
318     bundle = helpers.rmr_rcvall_msgs(MRC_BUF_RCV)  # use the buffered receiver to read all with a single call
319     assert len(bundle) == 13
320
321     for i, ms in enumerate(bundle):
322         ms = bundle[i]  # validate each summary returned, and ordering preserved
323         assert ms["message state"] == 0
324         expected_pay = bytes(pay_fmt % i, "UTF-8")
325         assert ms["payload"] == expected_pay
326
327     send_burst(MRC_SEND, pay_fmt, mtype=1, num=10)  # send a second round with msg types 1 and 2 to test filter
328     send_burst(MRC_SEND, pay_fmt, mtype=2, num=8)
329     send_burst(MRC_SEND, pay_fmt, mtype=1, num=5)
330     send_burst(MRC_SEND, pay_fmt, mtype=2, num=4, counter=8)  # total of 12 messages with type 2 should be queued
331     time.sleep(1)  # ensure underlying transport gets cycles to send/receive
332
333     bundle = helpers.rmr_rcvall_msgs_raw(MRC_BUF_RCV, [2])  # receive only message type 2 messages
334     assert len(bundle) == 12  # we should only get the second batch of 12 messages
335
336     for i, (ms, sbuf) in enumerate(bundle):  # test the raw version
337         test_summary = rmr.message_summary(sbuf)
338         assert test_summary == ms
339         assert ms["message state"] == 0  # all should be OK
340         assert ms["message type"] == 2  # only mtype 2 should have been received
341         expected_pay = bytes(pay_fmt % i, "UTF-8")  # ordering should still jive with the counter
342         assert ms["payload"] == expected_pay
343         rmr.rmr_free_msg(sbuf)
344
345
346 def test_bad_buffer():
347     """test that we get a proper exception when the buffer has a null pointer"""
348     with pytest.raises(exceptions.BadBufferAllocation):
349         rmr.rmr_alloc_msg(None, 4096)
350
351
352 def test_resize_payload():
353     """test the ability to insert a larger payload into an existing message"""
354     mtype = 99
355     subid = 100
356
357     mbuf = rmr.rmr_alloc_msg(MRC_SEND, 25)  # allocate buffer with small payload
358     mbuf.contents.mtype = mtype  # type and sub-id should not change
359     mbuf.contents.sub_id = subid
360
361     long_payload = b"This is a long payload that should force the message buffer to be reallocated"
362     rmr.set_payload_and_length(long_payload, mbuf)
363     summary = rmr.message_summary(mbuf)
364     assert summary["payload max size"] >= len(long_payload)  # RMR may allocate a larger payload space
365     assert summary["payload length"] == len(long_payload)  # however, the length must be exactly the same
366     assert summary["message type"] == mtype  # both mtype and sub-id should be preserved in new
367     assert summary["subscription id"] == subid