Move rmr python here.
[ric-plt/xapp-frame-py.git] / tests / test_rmr.py
1 # vim: ts=4 sw=4 expandtab:
2 # =================================================================================2
3 #       Copyright (c) 2019-2020 Nokia
4 #       Copyright (c) 2018-2020 AT&T Intellectual Property.
5 #
6 #   Licensed under the Apache License, Version 2.0 (the "License");
7 #   you may not use this file except in compliance with the License.
8 #   You may obtain a copy of the License at
9 #
10 #          http://www.apache.org/licenses/LICENSE-2.0
11 #
12 #   Unless required by applicable law or agreed to in writing, software
13 #   distributed under the License is distributed on an "AS IS" BASIS,
14 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 #   See the License for the specific language governing permissions and
16 #   limitations under the License.
17 # ==================================================================================
18 import time
19 import pytest
20 from ricxappframe.rmr import rmr, helpers, exceptions
21
22
23 SIZE = 256
24 MRC_SEND = None
25 MRC_RCV = None
26
27
28 def setup_module():
29     """
30     test_rmr module setup
31     """
32     global MRC_SEND
33     MRC_SEND = rmr.rmr_init(b"3562", rmr.RMR_MAX_RCV_BYTES, 0x00)
34     while rmr.rmr_ready(MRC_SEND) == 0:
35         time.sleep(1)
36
37     global MRC_RCV
38     MRC_RCV = rmr.rmr_init(b"3563", rmr.RMR_MAX_RCV_BYTES, 0x00)
39     while rmr.rmr_ready(MRC_RCV) == 0:
40         time.sleep(1)
41
42     global MRC_BUF_RCV
43     MRC_BUF_RCV = rmr.rmr_init(b"3564", rmr.RMR_MAX_RCV_BYTES, 0x02)
44     while rmr.rmr_ready(MRC_BUF_RCV) == 0:
45         time.sleep(1)
46
47
48 def teardown_module():
49     """
50     test rmr module teardown
51     """
52     rmr.rmr_close(MRC_SEND)
53     rmr.rmr_close(MRC_RCV)
54
55
56 def _assert_new_sbuf(sbuf):
57     """
58     verify the initial state of an alloced message is what we expect
59     """
60     summary = rmr.message_summary(sbuf)
61     assert summary["payload"] == b""
62     assert summary["payload length"] == 0
63     assert summary["subscription id"] == -1
64     assert summary["transaction id"] == b""
65     assert summary["message state"] == 0
66     assert summary["message status"] == "RMR_OK"
67     assert summary["meid"] == b""
68     assert summary["errno"] == 0
69
70
71 def test_get_constants(expected_constants):
72     """
73     test getting constants. We don't care what values are returned as those
74     should be meaningful only to RMR. We do care that all of the constants
75     which are defined in expected_contents are returned.  Further, we don't
76     consider it to be an error if the returned list has more constants than
77     what are in our list.
78
79     To avoid frustration, this should list all missing keys, not fail on the
80     first missing key.
81     """
82     errors = 0
83     econst = expected_constants
84     rconst = rmr._get_constants()
85     for key in econst:  # test all expected constants
86         if key not in rconst:  # expected value not listed by rmr
87             errors += 1
88             print("did not find required constant in list from RMR: %s" % key)
89
90     assert errors == 0
91
92
93 def test_get_mapping_dict(expected_states):
94     """
95     test getting mapping string
96     """
97     assert rmr._get_mapping_dict() == expected_states
98     assert rmr._state_to_status(0) == "RMR_OK"
99     assert rmr._state_to_status(12) == "RMR_ERR_TIMEOUT"
100     assert rmr._state_to_status(666) == "UNKNOWN STATE"
101
102
103 def test_meid():
104     """
105     test meid stringification
106     """
107     sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
108
109     rmr.rmr_set_meid(sbuf, b"\x01\x02")
110     assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"\x01\x02"
111     assert len(rmr.rmr_get_meid(sbuf)) == 2
112
113     rmr.rmr_set_meid(sbuf, b"\x00" * 31)
114     assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b""  # NULL bytes get truncated
115
116     rmr.rmr_set_meid(sbuf, b"6" * 31)
117     assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"6" * 31  # string in string out
118
119     rmr.rmr_set_meid(sbuf, b"\x01\x02")
120     assert (
121         rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"\x01\x02"
122     )  # Ctypes will chop at first nil, so expect only 2 bytes back
123
124     assert len(rmr.rmr_get_meid(sbuf)) == 2
125
126     # test that an exception is raised for buffers which are too long
127     with pytest.raises(exceptions.MeidSizeOutOfRange):
128         rmr.rmr_set_meid(sbuf, b"8" * 32)
129
130
131 def test_rmr_set_get():
132     """
133     test set functions
134     """
135     sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
136     _assert_new_sbuf(sbuf)
137
138     # test payload
139     pay = b"\x01\x00\x80"
140     rmr.set_payload_and_length(pay, sbuf)
141     summary = rmr.message_summary(sbuf)
142     assert summary["payload"] == pay
143     assert summary["payload length"] == 3
144
145     # test transid (note we cant test payload because it's randomly gen)
146     assert summary["transaction id"] == b""
147     assert len(summary["transaction id"]) == 0
148     rmr.generate_and_set_transaction_id(sbuf)
149     summary = rmr.message_summary(sbuf)
150     assert summary["transaction id"] != b""
151     assert len(summary["transaction id"]) == 32
152
153     # test meid
154     assert rmr.rmr_get_meid(sbuf) == summary["meid"] == b""
155     rmr.rmr_set_meid(sbuf, b"666\x01\x00\x01")
156     summary = rmr.message_summary(sbuf)
157     assert rmr.rmr_get_meid(sbuf) == summary["meid"] == b"666\x01"
158     assert (len(summary["meid"])) == 4
159
160
161 def test_alloc_fancy():
162     """test allocation with setting payload, trans, mtype, subid"""
163     pay = b"yoo\x01\x00\x80"
164     sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE, payload=pay, gen_transaction_id=True, mtype=14, meid=b"asdf", sub_id=654321)
165     summary = rmr.message_summary(sbuf)
166     assert summary["payload"] == pay
167     assert summary["payload length"] == 6
168     assert summary["transaction id"] != b""  # hard to test what it will be, but make sure not empty
169     assert len(summary["transaction id"]) == 32
170     assert summary["message state"] == 0
171     assert summary["message type"] == sbuf.contents.mtype == 14
172     assert rmr.rmr_get_meid(sbuf) == summary["meid"] == b"asdf"
173     assert sbuf.contents.sub_id == summary["subscription id"] == 654321
174
175
176 def test_alloc_overlapping_flags():
177     """test allocation with setting the transaction id"""
178     sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE, gen_transaction_id=True, fixed_transaction_id=b"6" * 32)
179     summary = rmr.message_summary(sbuf)
180     assert summary["transaction id"] == b"66666666666666666666666666666666"
181
182
183 def test_rcv_timeout():
184     """
185     test torcv; this is a scary test because if it fails... it doesn't fail, it will run forever!
186     We receive a message (though nothing has been sent) and make sure the function doesn't block forever.
187
188     There is no unit test for rmr_rcv_msg; too dangerous, that is a blocking call that may never return.
189     """
190     sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
191     sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 50)  # should time out after 50ms
192     summary = rmr.message_summary(sbuf_rcv)
193     assert summary["message state"] == 12
194     assert summary["message status"] == "RMR_ERR_TIMEOUT"
195
196
197 def test_send_rcv():
198     """
199     test send and receive
200     """
201     pay = b"\x01\x00\x80"
202
203     # send a message
204     sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
205     _assert_new_sbuf(sbuf_send)
206     rmr.set_payload_and_length(pay, sbuf_send)
207     sbuf_send.contents.mtype = 0
208     sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
209     send_summary = rmr.message_summary(sbuf_send)
210     assert send_summary["message state"] == 0  # if send fails don't attempt receive
211     assert send_summary["message status"] == "RMR_OK"
212     time.sleep(0.5)
213
214     # receive it in other context
215     sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
216     sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 2000)
217     rcv_summary = rmr.message_summary(sbuf_rcv)
218     assert rcv_summary["message state"] == 0
219     assert rcv_summary["message status"] == "RMR_OK"
220     assert rcv_summary["message type"] == 0
221     assert rcv_summary["payload"] == pay
222
223     # send an ACK back
224     ack_pay = b"message received"
225     sbuf_rcv = rmr.rmr_rts_msg(MRC_RCV, sbuf_rcv, payload=ack_pay, mtype=6666)
226     rcv_ack_summary = rmr.message_summary(sbuf_rcv)
227
228     # have the sender receive it
229     sbuf_send = rmr.rmr_torcv_msg(MRC_SEND, sbuf_send, 2000)
230     send_ack_summary = rmr.message_summary(sbuf_send)
231
232     assert send_ack_summary["message state"] == rcv_ack_summary["message state"] == 0
233     assert send_ack_summary["message status"] == rcv_ack_summary["message status"] == "RMR_OK"
234     assert send_ack_summary["payload"] == ack_pay
235     assert send_ack_summary["message type"] == 6666
236
237
238 def test_send_rcv_subid_good():
239     """
240     test send and receive where subid is used for routing
241     """
242     pay = b"\x01\x00\x80"
243     test_mtype = 46656
244     test_subid = 777
245
246     # send a message
247     sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, 3, pay, mtype=test_mtype, sub_id=test_subid)
248     pre_send_summary = rmr.message_summary(sbuf_send)
249     sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
250     send_summary = rmr.message_summary(sbuf_send)
251
252     # receive it in other context
253     time.sleep(0.5)
254     sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, 3)
255     sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 2000)
256     rcv_summary = rmr.message_summary(sbuf_rcv)
257
258     # asserts
259     assert send_summary["message state"] == rcv_summary["message state"] == 0
260     assert send_summary["message status"] == rcv_summary["message status"] == "RMR_OK"
261     assert pre_send_summary["payload"] == rcv_summary["payload"] == pay
262     assert pre_send_summary["message type"] == rcv_summary["message type"] == test_mtype
263     assert pre_send_summary["subscription id"] == rcv_summary["subscription id"] == test_subid
264
265
266 def test_send_rcv_subid_bad_subid():
267     """
268     test send and receive where subid is used for routing but nobody recieves this subid
269     """
270     sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, 3, b"\x01\x00\x80", mtype=46656, sub_id=778)
271     sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
272     assert rmr.message_summary(sbuf_send)["message state"] == 2
273     assert rmr.message_summary(sbuf_send)["message status"] == "RMR_ERR_NOENDPT"
274
275
276 def test_send_rcv_subid_bad_mtype():
277     """
278     test send and receive where subid is used for routing but nobody recieves this mtype
279     """
280     sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, 3, b"\x01\x00\x80", mtype=46657, sub_id=777)
281     sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
282     assert rmr.message_summary(sbuf_send)["message state"] == 2
283     assert rmr.message_summary(sbuf_send)["message status"] == "RMR_ERR_NOENDPT"
284
285
286 def send_burst(mrc, fmt, mtype=1, num=13, counter=0):
287     """
288         Internal function to support test_rcv_all.
289         Send a burst of messages optionally giving the type, payload
290         and number to send.
291     """
292     sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, SIZE)  # seed message buffer
293
294     for i in range(num):
295         payload = bytes(fmt % counter, "UTF-8")
296         counter += 1
297
298         rmr.set_payload_and_length(payload, sbuf_send)
299         sbuf_send.contents.mtype = mtype
300
301         max_retries = 5
302         while max_retries > 0:
303             sbuf_send = rmr.rmr_send_msg(mrc, sbuf_send)
304             ms = rmr.message_summary(sbuf_send)
305             if ms["message state"] != 10:  # 10 is retry
306                 break
307             max_retries -= 1
308             time.sleep(0.75)
309
310         assert ms["message state"] == 0
311         assert max_retries > 0
312
313
314 def test_rcv_all():
315     """
316     test the ability to receive a batch of queued messages.
317     """
318     pay_fmt = "send to ring msg: %d"  # dynamic message format with counter
319
320     send_burst(MRC_SEND, pay_fmt)  # send a bunch of 13 messages that should queue
321     time.sleep(1)  # ensure underlying transport gets cycles to send/receive
322
323     bundle = helpers.rmr_rcvall_msgs(MRC_BUF_RCV)  # use the buffered receiver to read all with a single call
324     assert len(bundle) == 13
325
326     for i, ms in enumerate(bundle):
327         ms = bundle[i]  # validate each summary returned, and ordering preserved
328         assert ms["message state"] == 0
329         expected_pay = bytes(pay_fmt % i, "UTF-8")
330         assert ms["payload"] == expected_pay
331
332     send_burst(MRC_SEND, pay_fmt, mtype=1, num=10)  # send a second round with msg types 1 and 2 to test filter
333     send_burst(MRC_SEND, pay_fmt, mtype=2, num=8)
334     send_burst(MRC_SEND, pay_fmt, mtype=1, num=5)
335     send_burst(MRC_SEND, pay_fmt, mtype=2, num=4, counter=8)  # total of 12 messages with type 2 should be queued
336     time.sleep(1)  # ensure underlying transport gets cycles to send/receive
337
338     bundle = helpers.rmr_rcvall_msgs_raw(MRC_BUF_RCV, [2])  # receive only message type 2 messages
339     assert len(bundle) == 12  # we should only get the second batch of 12 messages
340
341     for i, (ms, sbuf) in enumerate(bundle):  # test the raw version
342         test_summary = rmr.message_summary(sbuf)
343         assert test_summary == ms
344         assert ms["message state"] == 0  # all should be OK
345         assert ms["message type"] == 2  # only mtype 2 should have been received
346         expected_pay = bytes(pay_fmt % i, "UTF-8")  # ordering should still jive with the counter
347         assert ms["payload"] == expected_pay
348         rmr.rmr_free_msg(sbuf)
349
350
351 def test_bad_buffer():
352     """test that we get a proper exception when the buffer has a null pointer"""
353     with pytest.raises(exceptions.BadBufferAllocation):
354         rmr.rmr_alloc_msg(None, 4096)
355
356
357 def test_resize_payload():
358     """test the ability to insert a larger payload into an existing message"""
359     mtype = 99
360     subid = 100
361
362     mbuf = rmr.rmr_alloc_msg(MRC_SEND, 25)  # allocate buffer with small payload
363     mbuf.contents.mtype = mtype  # type and sub-id should not change
364     mbuf.contents.sub_id = subid
365
366     long_payload = b"This is a long payload that should force the message buffer to be reallocated"
367     rmr.set_payload_and_length(long_payload, mbuf)
368     summary = rmr.message_summary(mbuf)
369     assert summary["payload max size"] >= len(long_payload)  # RMR may allocate a larger payload space
370     assert summary["payload length"] == len(long_payload)  # however, the length must be exactly the same
371     assert summary["message type"] == mtype  # both mtype and sub-id should be preserved in new
372     assert summary["subscription id"] == subid