SIZE = 256
MRC_SEND = None
MRC_RCV = None
+MRC_BUF_RCV = None
def setup_module():
time.sleep(1)
global MRC_BUF_RCV
- MRC_BUF_RCV = rmr.rmr_init(b"3564", rmr.RMR_MAX_RCV_BYTES, 0x02)
+ MRC_BUF_RCV = rmr.rmr_init(b"3564", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
while rmr.rmr_ready(MRC_BUF_RCV) == 0:
time.sleep(1)
"""
rmr.rmr_close(MRC_SEND)
rmr.rmr_close(MRC_RCV)
+ rmr.rmr_close(MRC_BUF_RCV)
def _assert_new_sbuf(sbuf):
There is no unit test for rmr_rcv_msg; too dangerous, that is a blocking call that may never return.
"""
sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
- sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 50) # should time out after 50ms
+ start_rcv_sec = time.time()
+ sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 500) # should wait a bit before returning
summary = rmr.message_summary(sbuf_rcv)
assert summary["message state"] == 12
assert summary["message status"] == "RMR_ERR_TIMEOUT"
+ assert(time.time() - start_rcv_sec > 0.5) # test duration should be longer than the timeout
def test_send_rcv():
time.sleep(1) # ensure underlying transport gets cycles to send/receive
bundle = helpers.rmr_rcvall_msgs_raw(MRC_BUF_RCV, [2]) # receive only message type 2 messages
- assert len(bundle) == 12 # we should only get the second batch of 12 messages
+ assert len(bundle) == 12 # we should only get the type 2 batch of 12 messages
for i, (ms, sbuf) in enumerate(bundle): # test the raw version
test_summary = rmr.message_summary(sbuf)
assert ms["payload"] == expected_pay
rmr.rmr_free_msg(sbuf)
+ # check the timeout scenarios
+ start_rcv_sec = time.time()
+ bundle = helpers.rmr_rcvall_msgs(MRC_RCV, timeout=1001) # non-zero timeout means wait
+ assert len(bundle) == 0 # we should get none
+ assert(time.time() - start_rcv_sec > 1) # test duration should be longer than 1 second
+
+ start_rcv_sec = time.time()
+ bundle = helpers.rmr_rcvall_msgs_raw(MRC_RCV, timeout=1002) # non-zero timeout means wait
+ assert len(bundle) == 0 # we should get none
+ assert(time.time() - start_rcv_sec > 1) # test duration should be longer than 1 second
+
def test_bad_buffer():
"""test that we get a proper exception when the buffer has a null pointer"""