Complete rmr test coverage, restructure get_meid to be correct.
[ric-plt/lib/rmr.git] / src / bindings / rmr-python / tests / test_rmr.py
1 # ==================================================================================
2 #       Copyright (c) 2019 Nokia
3 #       Copyright (c) 2018-2019 AT&T Intellectual Property.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 # ==================================================================================
17 import time
18 import pytest
19 from rmr import rmr
20
21
22 SIZE = 256
23 MRC_SEND = None
24 MRC_RCV = None
25
26
27 def setup_module():
28     """
29     test_rmr module setup
30     """
31     global MRC_SEND
32     MRC_SEND = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, 0x00)
33     while rmr.rmr_ready(MRC_SEND) == 0:
34         time.sleep(1)
35
36     global MRC_RCV
37     MRC_RCV = rmr.rmr_init(b"4563", rmr.RMR_MAX_RCV_BYTES, 0x00)
38     while rmr.rmr_ready(MRC_RCV) == 0:
39         time.sleep(1)
40
41
42 def teardown_module():
43     """
44     test rmr module teardown
45     """
46     rmr.rmr_close(MRC_SEND)
47     rmr.rmr_close(MRC_RCV)
48
49
50 def _assert_new_sbuf(sbuf):
51     """
52     verify the initial state of an alloced message is what we expect
53     """
54     summary = rmr.message_summary(sbuf)
55     assert summary["payload"] == b""
56     assert summary["payload length"] == 0
57     assert summary["transaction id"] == b""
58     assert summary["message state"] == 0
59     assert summary["message status"] == "RMR_OK"
60     assert summary["meid"] == ""
61     assert summary["errno"] == 0
62
63
64 def test_get_constants(expected_constants):
65     """
66     test getting constants
67     """
68     assert rmr._get_constants() == expected_constants
69
70
71 def test_get_mapping_dict(expected_states):
72     """
73     test getting mapping string
74     """
75     assert rmr._get_mapping_dict() == expected_states
76     assert rmr._state_to_status(0) == "RMR_OK"
77     assert rmr._state_to_status(12) == "RMR_ERR_TIMEOUT"
78     assert rmr._state_to_status(666) == "UNKNOWN STATE"
79
80
81 def test_meid():
82     """
83     test meid stringification
84     """
85     sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
86
87     rmr.rmr_set_meid(sbuf, b"\x01\x02", 2)
88     assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == "\x01\x02"
89     assert len(rmr.rmr_get_meid(sbuf)) == 2
90
91     rmr.rmr_set_meid(sbuf, b"\x00" * 32, 32)
92     assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == ""  # NULL bytes get truncated
93
94     rmr.rmr_set_meid(sbuf, b"6" * 32, 32)
95     assert rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == "6" * 32  # string in string out
96
97     rmr.rmr_set_meid(sbuf, b"\x01\x02", 2)
98     assert (
99         rmr.rmr_get_meid(sbuf) == rmr.message_summary(sbuf)["meid"] == "\x01\x02" + "6" * 30
100     )  # bytes in string out, 6s left over
101     assert len(rmr.rmr_get_meid(sbuf)) == 32
102
103
104 def test_rmr_set_get():
105     """
106     test set functions
107     """
108     sbuf = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
109     _assert_new_sbuf(sbuf)
110
111     # test payload
112     pay = b"\x01\x00\x80"
113     rmr.set_payload_and_length(pay, sbuf)
114     summary = rmr.message_summary(sbuf)
115     assert summary["payload"] == pay
116     assert summary["payload length"] == 3
117
118     # test transid (note we cant test payload because it's randomly gen)
119     assert summary["transaction id"] == b""
120     assert len(summary["transaction id"]) == 0
121     rmr.generate_and_set_transaction_id(sbuf)
122     summary = rmr.message_summary(sbuf)
123     assert summary["transaction id"] != b""
124     assert len(summary["transaction id"]) == 32
125
126     # test meid
127     assert rmr.rmr_get_meid(sbuf) == summary["meid"] == ""
128     rmr.rmr_set_meid(sbuf, b"666\x01\x00\x01", 6)
129     summary = rmr.message_summary(sbuf)
130     assert rmr.rmr_get_meid(sbuf) == summary["meid"] == "666\x01"
131     assert (len(summary["meid"])) == 4
132
133
134 def test_rcv_timeout():
135     """
136     test torcv; this is a scary test because if it fails... it doesn't fail, it will run forever!
137     We recieve a message (though nothing has been sent) and make sure the function doesn't block forever.
138
139     There is no unit test for rmr_rcv_msg; too dangerous, that is a blocking call that may never return.
140     """
141     sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
142     sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 50)  # should time out after 50ms
143     summary = rmr.message_summary(sbuf_rcv)
144     assert summary["message state"] == 12
145     assert summary["message status"] == "RMR_ERR_TIMEOUT"
146
147
148 def test_send_rcv():
149     """
150     test send and receive
151     """
152     pay = b"\x01\x00\x80"
153
154     # send a message
155     sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, SIZE)
156     _assert_new_sbuf(sbuf_send)
157     rmr.set_payload_and_length(pay, sbuf_send)
158     sbuf_send.contents.mtype = 0
159     sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send)
160     send_summary = rmr.message_summary(sbuf_send)
161
162     # receive it in other context
163     sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
164     sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 2000)
165     rcv_summary = rmr.message_summary(sbuf_rcv)
166     assert rcv_summary["payload"] == pay
167     assert rcv_summary["message type"] == 0
168     assert send_summary["message state"] == rcv_summary["message state"] == 0
169     assert send_summary["message status"] == rcv_summary["message status"] == "RMR_OK"
170
171     # send an ACK back
172     ack_pay = b"message recieved"
173     rmr.set_payload_and_length(ack_pay, sbuf_rcv)
174     sbuf_rcv = rmr.rmr_rts_msg(MRC_RCV, sbuf_rcv)
175     rcv_ack_summary = rmr.message_summary(sbuf_rcv)
176
177     # have the sender recieve it
178     sbuf_send = rmr.rmr_torcv_msg(MRC_SEND, sbuf_send, 2000)
179     send_ack_summary = rmr.message_summary(sbuf_send)
180
181     assert send_ack_summary["payload"] == ack_pay
182     assert send_ack_summary["message state"] == rcv_ack_summary["message state"] == 0
183     assert send_ack_summary["message status"] == rcv_ack_summary["message status"] == "RMR_OK"