Release 1.0.0:
[ric-plt/lib/rmr.git] / src / bindings / rmr-python / tests / test_rmr.py
1 # vim: ts=4 sw=4 expandtab:
2 # =================================================================================2
3 #       Copyright (c) 2019 Nokia
4 #       Copyright (c) 2018-2019 AT&T Intellectual Property.
5 #
6 #   Licensed under the Apache License, Version 2.0 (the "License");
7 #   you may not use this file except in compliance with the License.
8 #   You may obtain a copy of the License at
9 #
10 #          http://www.apache.org/licenses/LICENSE-2.0
11 #
12 #   Unless required by applicable law or agreed to in writing, software
13 #   distributed under the License is distributed on an "AS IS" BASIS,
14 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 #   See the License for the specific language governing permissions and
16 #   limitations under the License.
17 # ==================================================================================
18 import time
19 import pytest
20 from rmr import rmr, helpers, exceptions
21
22
23 SIZE = 256
24 MRC_SEND = None
25 MRC_RCV = None
26
27
28 def setup_module():
29     """
30     test_rmr module setup
31     """
32     global MRC_SEND
33     MRC_SEND = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, 0x00)
34     while rmr.rmr_ready(MRC_SEND) == 0:
35         time.sleep(1)
36
37     global MRC_RCV
38     MRC_RCV = rmr.rmr_init(b"4563", rmr.RMR_MAX_RCV_BYTES, 0x00)
39     while rmr.rmr_ready(MRC_RCV) == 0:
40         time.sleep(1)
41
42     global MRC_BUF_RCV
43     MRC_BUF_RCV = rmr.rmr_init(b"4564", rmr.RMR_MAX_RCV_BYTES, 0x02)
44     while rmr.rmr_ready(MRC_RCV) == 0:
45         time.sleep(1)
46
47
48 def teardown_module():
49     """
50     test rmr module teardown
51     """
52     rmr.rmr_close(MRC_SEND)
53     rmr.rmr_close(MRC_RCV)
54
55
56 def _assert_new_sbuf(sbuf):
57     """
58     verify the initial state of an alloced message is what we expect
59     """
60     summary = rmr.message_summary(sbuf)
61     assert summary["payload"] == b""
62     assert summary["payload length"] == 0
63     assert summary["transaction id"] == b""
64     assert summary["message state"] == 0
65     assert summary["message status"] == "RMR_OK"
66     assert summary["meid"] == b""
67     assert summary["errno"] == 0
68
69
70 def test_get_constants(expected_constants):
71     """
72     test getting constants. We don't care what values are returned as those
73     should be meaningful only to RMR. We do care that all of the constants
74     which are defined in expected_contents are returned.  Further, we don't
75     consider it to be an error if the returned list has more constants than
76     what are in our list.
77
78     To avoid frustration, this should list all missing keys, not fail on the
79     first missing key.
80     """
81     errors = 0
82     econst = expected_constants
83     rconst = rmr._get_constants()
84     for key in econst:  # test all expected constants
85         if key not in rconst:  # expected value not listed by rmr
86             errors += 1
87             print("did not find required constant in list from RMR: %s" % key)
88
89     assert errors == 0
90
91
92 def test_get_mapping_dict(expected_states):
93     """
94     test getting mapping string
95     """
96     assert rmr._get_mapping_dict() == expected_states
97     assert rmr._state_to_status(0) == "RMR_OK"
98     assert rmr._state_to_status(12) == "RMR_ERR_TIMEOUT"
99     assert rmr._state_to_status(666) == "UNKNOWN STATE"
100
101
102 def test_meid():
103     """
104     test meid stringification
105     """
106     sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
107
108     rmr.rmr_set_meid(sbuf, b"\x01\x02")
109     assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"\x01\x02"
110     assert len(rmr.rmr_get_meid(sbuf)) == 2
111
112     rmr.rmr_set_meid(sbuf, b"\x00" * 32)
113     assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b""  # NULL bytes get truncated
114
115     rmr.rmr_set_meid(sbuf, b"6" * 32)
116     assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"6" * 32  # string in string out
117
118     rmr.rmr_set_meid(sbuf, b"\x01\x02")
119     assert (
120         rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == b"\x01\x02" + b"6" * 30
121     )  # bytes in string out, 6s left over
122     assert len(rmr.rmr_get_meid(sbuf)) == 32
123
124
125 def test_rmr_set_get():
126     """
127     test set functions
128     """
129     sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
130     _assert_new_sbuf(sbuf)
131
132     # test payload
133     pay = b"\x01\x00\x80"
134     rmr.set_payload_and_length(pay, sbuf)
135     summary = rmr.message_summary(sbuf)
136     assert summary["payload"] == pay
137     assert summary["payload length"] == 3
138
139     # test transid (note we cant test payload because it's randomly gen)
140     assert summary["transaction id"] == b""
141     assert len(summary["transaction id"]) == 0
142     rmr.generate_and_set_transaction_id(sbuf)
143     summary = rmr.message_summary(sbuf)
144     assert summary["transaction id"] != b""
145     assert len(summary["transaction id"]) == 32
146
147     # test meid
148     assert rmr.rmr_get_meid(sbuf) == summary["meid"] == b""
149     rmr.rmr_set_meid(sbuf, b"666\x01\x00\x01")
150     summary = rmr.message_summary(sbuf)
151     assert rmr.rmr_get_meid(sbuf) == summary["meid"] == b"666\x01"
152     assert (len(summary["meid"])) == 4
153
154
155 def test_rcv_timeout():
156     """
157     test torcv; this is a scary test because if it fails... it doesn't fail, it will run forever!
158     We receive a message (though nothing has been sent) and make sure the function doesn't block forever.
159
160     There is no unit test for rmr_rcv_msg; too dangerous, that is a blocking call that may never return.
161     """
162     sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
163     sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 50)  # should time out after 50ms
164     summary = rmr.message_summary(sbuf_rcv)
165     assert summary["message state"] == 12
166     assert summary["message status"] == "RMR_ERR_TIMEOUT"
167
168
169 def test_send_rcv():
170     """
171     test send and receive
172     """
173     pay = b"\x01\x00\x80"
174
175     # send a message
176     sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
177     _assert_new_sbuf(sbuf_send)
178     rmr.set_payload_and_length(pay, sbuf_send)
179     sbuf_send.contents.mtype = 0
180     sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
181     send_summary = rmr.message_summary(sbuf_send)
182     assert send_summary["message state"] == 0  # if send fails don't attempt receive
183     assert send_summary["message status"] == "RMR_OK"
184     time.sleep(0.5)
185
186     # receive it in other context
187     sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
188     sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 2000)
189     rcv_summary = rmr.message_summary(sbuf_rcv)
190     assert rcv_summary["message state"] == 0
191     assert rcv_summary["message status"] == "RMR_OK"
192     assert rcv_summary["message type"] == 0
193     assert rcv_summary["payload"] == pay
194
195     # send an ACK back
196     ack_pay = b"message received"
197     rmr.set_payload_and_length(ack_pay, sbuf_rcv)
198     sbuf_rcv = rmr.rmr_rts_msg(MRC_RCV, sbuf_rcv)
199     rcv_ack_summary = rmr.message_summary(sbuf_rcv)
200
201     # have the sender receive it
202     sbuf_send = rmr.rmr_torcv_msg(MRC_SEND, sbuf_send, 2000)
203     send_ack_summary = rmr.message_summary(sbuf_send)
204
205     assert send_ack_summary["message state"] == rcv_ack_summary["message state"] == 0
206     assert send_ack_summary["message status"] == rcv_ack_summary["message status"] == "RMR_OK"
207     assert send_ack_summary["payload"] == ack_pay
208
209
210 def send_burst(mrc, fmt, mtype=1, num=13, counter=0):
211     """
212         Internal function to support test_rcv_all.
213         Send a burst of messages optionally giving the type, payload
214         and number to send.
215     """
216     sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, SIZE)  # seed message buffer
217
218     for i in range(num):
219         payload = bytes(fmt % counter, "UTF-8")
220         counter += 1
221
222         rmr.set_payload_and_length(payload, sbuf_send)
223         sbuf_send.contents.mtype = mtype
224
225         max_retries = 5
226         while max_retries > 0:
227             sbuf_send = rmr.rmr_send_msg(mrc, sbuf_send)
228             ms = rmr.message_summary(sbuf_send)
229             if ms["message state"] != 10:  # 10 is retry
230                 break
231             max_retries -= 1
232             time.sleep(0.75)
233
234         assert ms["message state"] == 0
235         assert max_retries > 0
236
237
238 def test_rcv_all():
239     """
240     test the ability to receive a batch of queued messages.
241     """
242     pay_fmt = "send to ring msg: %d"  # dynamic message format with counter
243
244     send_burst(MRC_SEND, pay_fmt)  # send a bunch of 13 messages that should queue
245     time.sleep(1)  # ensure underlying transport gets cycles to send/receive
246
247     bundle = helpers.rmr_rcvall_msgs(MRC_BUF_RCV)  # use the buffered receiver to read all with a single call
248     assert len(bundle) == 13
249
250     for i in range(len(bundle)):
251         ms = bundle[i]  # validate each summary returned, and ordering preserved
252         assert ms["message state"] == 0
253         expected_pay = bytes(pay_fmt % i, "UTF-8")
254         assert ms["payload"] == expected_pay
255
256     send_burst(MRC_SEND, pay_fmt, mtype=1, num=10)  # send a second round with msg types 1 and 2 to test filter
257     send_burst(MRC_SEND, pay_fmt, mtype=2, num=8)
258     send_burst(MRC_SEND, pay_fmt, mtype=1, num=5)
259     send_burst(MRC_SEND, pay_fmt, mtype=2, num=4, counter=8)  # total of 12 messages with type 2 should be queued
260     time.sleep(1)  # ensure underlying transport gets cycles to send/receive
261
262     bundle = helpers.rmr_rcvall_msgs(MRC_BUF_RCV, [2])  # receive only message type 2 messages
263     assert len(bundle) == 12  # we should only get the second batch of 12 messages
264
265     for i in range(len(bundle)):
266         ms = bundle[i]  # validate each summary
267         assert ms["message state"] == 0  # all should be OK
268         assert ms["message type"] == 2  # only mtype 2 should have been received
269         expected_pay = bytes(pay_fmt % i, "UTF-8")  # ordering should still jive with the counter
270         assert ms["payload"] == expected_pay
271
272
273 def test_bad_buffer():
274     """test that we get a proper exception when the buffer has a null pointer"""
275     with pytest.raises(exceptions.BadBufferAllocation):
276         rmr.rmr_alloc_msg(None, 4096)
277
278
279 def test_alloc_fancy():
280     """test allocation with setting payload, trans, mtype"""
281     pay = b"yoo\x01\x00\x80"
282     sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE, payload=pay, gen_transaction_id=True, mtype=14, meid=b"asdf")
283     summary = rmr.message_summary(sbuf)
284     assert summary["payload"] == pay
285     assert summary["payload length"] == 6
286     assert summary["transaction id"] != b""  # hard to test what it will be, but make sure not empty
287     assert summary["message state"] == 0
288     assert summary["message type"] == 14
289     assert summary["meid"] == b"asdf"