SIZE = 256
MRC_SEND = None
MRC_RCV = None
+MRC_BUF_RCV = None
def setup_module():
time.sleep(1)
global MRC_BUF_RCV
- MRC_BUF_RCV = rmr.rmr_init(b"3564", rmr.RMR_MAX_RCV_BYTES, 0x02)
+ MRC_BUF_RCV = rmr.rmr_init(b"3564", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
while rmr.rmr_ready(MRC_BUF_RCV) == 0:
time.sleep(1)
"""
rmr.rmr_close(MRC_SEND)
rmr.rmr_close(MRC_RCV)
+ rmr.rmr_close(MRC_BUF_RCV)
def _assert_new_sbuf(sbuf):
assert summary["errno"] == 0
-def test_get_constants(expected_constants):
- """
- test getting constants. We don't care what values are returned as those
- should be meaningful only to RMR. We do care that all of the constants
- which are defined in expected_contents are returned. Further, we don't
- consider it to be an error if the returned list has more constants than
- what are in our list.
-
- To avoid frustration, this should list all missing keys, not fail on the
- first missing key.
- """
- errors = 0
- econst = expected_constants
- rconst = rmr._get_constants()
- for key in econst: # test all expected constants
- if key not in rconst: # expected value not listed by rmr
- errors += 1
- print("did not find required constant in list from RMR: %s" % key)
-
- assert errors == 0
-
-
-def test_get_mapping_dict(expected_states):
- """
- test getting mapping string
- """
- assert rmr._get_mapping_dict() == expected_states
- assert rmr._state_to_status(0) == "RMR_OK"
- assert rmr._state_to_status(12) == "RMR_ERR_TIMEOUT"
- assert rmr._state_to_status(666) == "UNKNOWN STATE"
-
-
def test_meid():
"""
test meid stringification
There is no unit test for rmr_rcv_msg; too dangerous, that is a blocking call that may never return.
"""
sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE)
- sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 50) # should time out after 50ms
+ start_rcv_sec = time.time()
+ sbuf_rcv = rmr.rmr_torcv_msg(MRC_RCV, sbuf_rcv, 500) # should wait a bit before returning
summary = rmr.message_summary(sbuf_rcv)
assert summary["message state"] == 12
assert summary["message status"] == "RMR_ERR_TIMEOUT"
+ assert(time.time() - start_rcv_sec > 0.5) # test duration should be longer than the timeout
def test_send_rcv():
time.sleep(1) # ensure underlying transport gets cycles to send/receive
bundle = helpers.rmr_rcvall_msgs_raw(MRC_BUF_RCV, [2]) # receive only message type 2 messages
- assert len(bundle) == 12 # we should only get the second batch of 12 messages
+ assert len(bundle) == 12 # we should only get the type 2 batch of 12 messages
for i, (ms, sbuf) in enumerate(bundle): # test the raw version
test_summary = rmr.message_summary(sbuf)
assert ms["payload"] == expected_pay
rmr.rmr_free_msg(sbuf)
+ # check the timeout scenarios
+ start_rcv_sec = time.time()
+ bundle = helpers.rmr_rcvall_msgs(MRC_RCV, timeout=1001) # non-zero timeout means wait
+ assert len(bundle) == 0 # we should get none
+ assert(time.time() - start_rcv_sec > 1) # test duration should be longer than 1 second
+
+ start_rcv_sec = time.time()
+ bundle = helpers.rmr_rcvall_msgs_raw(MRC_RCV, timeout=1002) # non-zero timeout means wait
+ assert len(bundle) == 0 # we should get none
+ assert(time.time() - start_rcv_sec > 1) # test duration should be longer than 1 second
+
def test_bad_buffer():
"""test that we get a proper exception when the buffer has a null pointer"""