X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=src%2Fbindings%2Frmr-python%2Ftests%2Ftest_rmr.py;h=d5023b306695c11161ff0dc57efdf408a284663a;hb=refs%2Fchanges%2F09%2F1209%2F1;hp=172cc677ab03f769d8214d5ca34bc0abafe97041;hpb=ab4019f2cdf80f10323e70199d38817d19b7482b;p=ric-plt%2Flib%2Frmr.git diff --git a/src/bindings/rmr-python/tests/test_rmr.py b/src/bindings/rmr-python/tests/test_rmr.py index 172cc67..d5023b3 100644 --- a/src/bindings/rmr-python/tests/test_rmr.py +++ b/src/bindings/rmr-python/tests/test_rmr.py @@ -17,7 +17,7 @@ # ================================================================================== import time import pytest -from rmr import rmr, helpers +from rmr import rmr, helpers, exceptions SIZE = 256 @@ -44,6 +44,7 @@ def setup_module(): while rmr.rmr_ready(MRC_RCV) == 0: time.sleep(1) + def teardown_module(): """ test rmr module teardown @@ -80,10 +81,10 @@ def test_get_constants(expected_constants): errors = 0 econst = expected_constants rconst = rmr._get_constants() - for key in econst: # test all expected constants - if key not in rconst: # expected value not listed by rmr + for key in econst: # test all expected constants + if key not in rconst: # expected value not listed by rmr errors += 1 - print( "did not find required constant in list from RMR: %s" % key ) + print("did not find required constant in list from RMR: %s" % key) assert errors == 0 @@ -178,9 +179,9 @@ def test_send_rcv(): sbuf_send.contents.mtype = 0 sbuf_send = rmr.rmr_send_msg(MRC_SEND, sbuf_send) send_summary = rmr.message_summary(sbuf_send) - assert send_summary["message state"] == 0 # if send fails don't attempt receive + assert send_summary["message state"] == 0 # if send fails don't attempt receive assert send_summary["message status"] == "RMR_OK" - time.sleep(.5) + time.sleep(0.5) # receive it in other context sbuf_rcv = rmr.rmr_alloc_msg(MRC_RCV, SIZE) @@ -212,7 +213,7 @@ def send_burst(mrc, fmt, mtype=1, num=13, counter=0): Send a burst of messages optionally giving the type, payload and number to send. """ - sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, SIZE) # seed message buffer + sbuf_send = rmr.rmr_alloc_msg(MRC_SEND, SIZE) # seed message buffer for i in range(num): payload = bytes(fmt % counter, "UTF-8") @@ -225,10 +226,10 @@ def send_burst(mrc, fmt, mtype=1, num=13, counter=0): while max_retries > 0: sbuf_send = rmr.rmr_send_msg(mrc, sbuf_send) ms = rmr.message_summary(sbuf_send) - if ms["message state"] != 10: # 10 is retry + if ms["message state"] != 10: # 10 is retry break max_retries -= 1 - time.sleep(.75) + time.sleep(0.75) assert ms["message state"] == 0 assert max_retries > 0 @@ -236,34 +237,40 @@ def send_burst(mrc, fmt, mtype=1, num=13, counter=0): def test_rcv_all(): """ - test the ability to receive a batch of queued messages. + test the ability to receive a batch of queued messages. """ - pay_fmt = "send to ring msg: %d" # dynamic message format with counter + pay_fmt = "send to ring msg: %d" # dynamic message format with counter - send_burst(MRC_SEND, pay_fmt) # send a bunch of 13 messages that should queue - time.sleep(1) # ensure underlying transport gets cycles to send/receive + send_burst(MRC_SEND, pay_fmt) # send a bunch of 13 messages that should queue + time.sleep(1) # ensure underlying transport gets cycles to send/receive - bundle = helpers.rmr_rcvall_msgs(MRC_BUF_RCV) # use the buffered receiver to read all with a single call + bundle = helpers.rmr_rcvall_msgs(MRC_BUF_RCV) # use the buffered receiver to read all with a single call assert len(bundle) == 13 for i in range(len(bundle)): - ms = bundle[i] # validate each summary returned, and ordering preserved + ms = bundle[i] # validate each summary returned, and ordering preserved assert ms["message state"] == 0 expected_pay = bytes(pay_fmt % i, "UTF-8") assert ms["payload"] == expected_pay - send_burst(MRC_SEND, pay_fmt, mtype=1, num=10) # send a second round with msg types 1 and 2 to test filter + send_burst(MRC_SEND, pay_fmt, mtype=1, num=10) # send a second round with msg types 1 and 2 to test filter send_burst(MRC_SEND, pay_fmt, mtype=2, num=8) send_burst(MRC_SEND, pay_fmt, mtype=1, num=5) - send_burst(MRC_SEND, pay_fmt, mtype=2, num=4, counter=8) # total of 12 messages with type 2 should be queued - time.sleep(1) # ensure underlying transport gets cycles to send/receive + send_burst(MRC_SEND, pay_fmt, mtype=2, num=4, counter=8) # total of 12 messages with type 2 should be queued + time.sleep(1) # ensure underlying transport gets cycles to send/receive - bundle = helpers.rmr_rcvall_msgs(MRC_BUF_RCV, [2]) # receive only message type 2 messages - assert len(bundle) == 12 # we should only get the second batch of 12 messages + bundle = helpers.rmr_rcvall_msgs(MRC_BUF_RCV, [2]) # receive only message type 2 messages + assert len(bundle) == 12 # we should only get the second batch of 12 messages for i in range(len(bundle)): - ms = bundle[i] # validate each summary - assert ms["message state"] == 0 # all should be OK - assert ms["message type"] == 2 # only mtype 2 should have been received - expected_pay = bytes(pay_fmt % i, "UTF-8") # ordering should still jive with the counter + ms = bundle[i] # validate each summary + assert ms["message state"] == 0 # all should be OK + assert ms["message type"] == 2 # only mtype 2 should have been received + expected_pay = bytes(pay_fmt % i, "UTF-8") # ordering should still jive with the counter assert ms["payload"] == expected_pay + + +def test_bad_buffer(): + """test that we get a proper exception when the buffer has a null pointer""" + with pytest.raises(exceptions.BadBufferAllocation): + rmr.rmr_alloc_msg(None, 4096)