From c5de5c7413901b8de1da928b084a3ca2e88d490a Mon Sep 17 00:00:00 2001 From: Tommy Carpenter Date: Wed, 4 Dec 2019 13:26:58 -0500 Subject: [PATCH] Add new func, update rts Signed-off-by: Tommy Carpenter Change-Id: I2e0830eefe425d268045af6c2e5d1a1231fe8d56 --- src/bindings/rmr-python/docs/release-notes.rst | 10 +++ src/bindings/rmr-python/rmr/helpers.py | 85 ++++++++++++++++------ src/bindings/rmr-python/rmr/rmr.py | 13 +++- src/bindings/rmr-python/rmr/rmr_mocks/rmr_mocks.py | 4 + src/bindings/rmr-python/setup.py | 2 +- src/bindings/rmr-python/tests/test_rmr.py | 14 ++-- 6 files changed, 96 insertions(+), 32 deletions(-) diff --git a/src/bindings/rmr-python/docs/release-notes.rst b/src/bindings/rmr-python/docs/release-notes.rst index 25ab2ea..b4ed2dc 100644 --- a/src/bindings/rmr-python/docs/release-notes.rst +++ b/src/bindings/rmr-python/docs/release-notes.rst @@ -7,6 +7,16 @@ The format is based on `Keep a Changelog `__ and this project adheres to `Semantic Versioning `__. +[2.1.0] - 12/4/2019 +-------------------- + +:: + + * Allow the setting of payload and message type in rts + * Add a new function, helpers.rmr_rcvall_msgs_raw, that also returns the sbuf alongside the summary + * Add a mock for free_msg + + [2.0.0] - 12/2/2019 -------------------- diff --git a/src/bindings/rmr-python/rmr/helpers.py b/src/bindings/rmr-python/rmr/helpers.py index 1f6ef28..ac4a46e 100644 --- a/src/bindings/rmr-python/rmr/helpers.py +++ b/src/bindings/rmr-python/rmr/helpers.py @@ -27,39 +27,76 @@ from rmr import rmr def rmr_rcvall_msgs(mrc, pass_filter=[]): """ - Assemble an array of all messages which can be received without - blocking. Effectively draining the message queue if RMR is started - in mt-call mode, or draining any waiting TCP buffers. If the - pass_filter parameter is supplied it is treated as one or more message - types to accept (pass through). Using the default, an empty list, results - in messages with any type being captured. - - Parameters - ---------- - mrc: ctypes c_void_p - Pointer to the RMR context - - pass_filter: list (optional) - The message type(s) to capture. - - Returns - ------- - list - List of message summaries, one for each message captured. + Assemble an array of all messages which can be received without + blocking. Effectively draining the message queue if RMR is started + in mt-call mode, or draining any waiting TCP buffers. If the + pass_filter parameter is supplied it is treated as one or more message + types to accept (pass through). Using the default, an empty list, results + in messages with any type being captured. + + Parameters + ---------- + mrc: ctypes c_void_p + Pointer to the RMR context + + pass_filter: list (optional) + The message type(s) to capture. + + Returns + ------- + list of dict + List of message summaries, one for each message captured. """ new_messages = [] - mbuf = rmr.rmr_alloc_msg(mrc, 4096) # allocate buffer to have something for a return status + mbuf = rmr.rmr_alloc_msg(mrc, 4096) # allocate buffer to have something for a return status while True: mbuf = rmr.rmr_torcv_msg(mrc, mbuf, 0) # set the timeout to 0 so this doesn't block!! summary = rmr.message_summary(mbuf) - if summary["message status"] != "RMR_OK": # ok indicates msg received, stop on all other states + if summary["message status"] != "RMR_OK": # ok indicates msg received, stop on all other states break + + if len(pass_filter) == 0 or summary["message type"] in pass_filter: # no filter, or passes; capture it + new_messages.append(summary) + + rmr.rmr_free_msg(mbuf) # must free message to avoid leak + return new_messages + + +def rmr_rcvall_msgs_raw(mrc, pass_filter=[]): + """ + Same as rmr_rcvall_msgs, but the raw sbuf is also returned. + Useful, for example, if rts is to be used. + + Parameters + ---------- + mrc: ctypes c_void_p + Pointer to the RMR context + + pass_filter: list (optional) + The message type(s) to capture. + + Returns + ------- + list of tuple:$ + List of tuples [(S, sbuf),...] where S is a message summary and sbuf is the raw message$ + the caller is responsible for calling rmr.rmr_free_msg(sbuf) for each sbuf afterwards to prevent memory leaks. + """ + + new_messages = [] + + while True: + mbuf = rmr.rmr_alloc_msg(mrc, 4096) # allocate buffer to have something for a return status + mbuf = rmr.rmr_torcv_msg(mrc, mbuf, 0) # set the timeout to 0 so this doesn't block!! + summary = rmr.message_summary(mbuf) + if summary["message status"] != "RMR_OK": + break + + if len(pass_filter) == 0 or mbuf.contents.mtype in pass_filter: # no filter, or passes; capture it + new_messages.append((summary, mbuf)) else: - if len(pass_filter) == 0 or summary["message type"] in pass_filter: # no filter, or passes; capture it - new_messages.append(summary) + rmr.rmr_free_msg(mbuf) - rmr.rmr_free_msg(mbuf) # must free message to avoid leak return new_messages diff --git a/src/bindings/rmr-python/rmr/rmr.py b/src/bindings/rmr-python/rmr/rmr.py index b264cd5..45515c8 100644 --- a/src/bindings/rmr-python/rmr/rmr.py +++ b/src/bindings/rmr-python/rmr/rmr.py @@ -343,11 +343,22 @@ _rmr_rts_msg.argtypes = [c_void_p, POINTER(rmr_mbuf_t)] _rmr_rts_msg.restype = POINTER(rmr_mbuf_t) -def rmr_rts_msg(vctx, ptr_mbuf): +def rmr_rts_msg(vctx, ptr_mbuf, payload=None, mtype=None): """ Refer to the rmr C documentation for rmr_rts_msg extern rmr_mbuf_t* rmr_rts_msg(void* vctx, rmr_mbuf_t* msg) + + additional features beyond c-rmr: + if payload is not None, attempts to set the payload + if mtype is not None, sets the sbuf's message type """ + + if payload: + set_payload_and_length(payload, ptr_mbuf) + + if mtype: + ptr_mbuf.contents.mtype = mtype + return _rmr_rts_msg(vctx, ptr_mbuf) diff --git a/src/bindings/rmr-python/rmr/rmr_mocks/rmr_mocks.py b/src/bindings/rmr-python/rmr/rmr_mocks/rmr_mocks.py index 315536f..32cc575 100644 --- a/src/bindings/rmr-python/rmr/rmr_mocks/rmr_mocks.py +++ b/src/bindings/rmr-python/rmr/rmr_mocks/rmr_mocks.py @@ -121,6 +121,10 @@ def patch_rmr(monkeypatch): def fake_rmr_payload_size(_sbuf): return 4096 + def fake_free(_sbuf): + pass + + monkeypatch.setattr("rmr.rmr.rmr_free_msg", fake_free) monkeypatch.setattr("rmr.rmr.rmr_alloc_msg", fake_alloc) monkeypatch.setattr("rmr.rmr.set_payload_and_length", fake_set_payload_and_length) monkeypatch.setattr("rmr.rmr.generate_and_set_transaction_id", fake_generate_and_set_transaction_id) diff --git a/src/bindings/rmr-python/setup.py b/src/bindings/rmr-python/setup.py index ff3b352..ef31241 100644 --- a/src/bindings/rmr-python/setup.py +++ b/src/bindings/rmr-python/setup.py @@ -32,7 +32,7 @@ def _long_descr(): setup( name="rmr", - version="2.0.0", + version="2.1.0", packages=find_packages(), author="Tommy Carpenter, E. Scott Daniels", description="Python wrapper for RIC RMR", diff --git a/src/bindings/rmr-python/tests/test_rmr.py b/src/bindings/rmr-python/tests/test_rmr.py index 50dffc2..c7e872a 100644 --- a/src/bindings/rmr-python/tests/test_rmr.py +++ b/src/bindings/rmr-python/tests/test_rmr.py @@ -210,8 +210,7 @@ def test_send_rcv(): # send an ACK back ack_pay = b"message received" - rmr.set_payload_and_length(ack_pay, sbuf_rcv) - sbuf_rcv = rmr.rmr_rts_msg(MRC_RCV, sbuf_rcv) + sbuf_rcv = rmr.rmr_rts_msg(MRC_RCV, sbuf_rcv, payload=ack_pay, mtype=6666) rcv_ack_summary = rmr.message_summary(sbuf_rcv) # have the sender receive it @@ -221,6 +220,7 @@ def test_send_rcv(): assert send_ack_summary["message state"] == rcv_ack_summary["message state"] == 0 assert send_ack_summary["message status"] == rcv_ack_summary["message status"] == "RMR_OK" assert send_ack_summary["payload"] == ack_pay + assert send_ack_summary["message type"] == 6666 def test_send_rcv_subid_good(): @@ -311,7 +311,7 @@ def test_rcv_all(): bundle = helpers.rmr_rcvall_msgs(MRC_BUF_RCV) # use the buffered receiver to read all with a single call assert len(bundle) == 13 - for i in range(len(bundle)): + for i, ms in enumerate(bundle): ms = bundle[i] # validate each summary returned, and ordering preserved assert ms["message state"] == 0 expected_pay = bytes(pay_fmt % i, "UTF-8") @@ -323,15 +323,17 @@ def test_rcv_all(): send_burst(MRC_SEND, pay_fmt, mtype=2, num=4, counter=8) # total of 12 messages with type 2 should be queued time.sleep(1) # ensure underlying transport gets cycles to send/receive - bundle = helpers.rmr_rcvall_msgs(MRC_BUF_RCV, [2]) # receive only message type 2 messages + bundle = helpers.rmr_rcvall_msgs_raw(MRC_BUF_RCV, [2]) # receive only message type 2 messages assert len(bundle) == 12 # we should only get the second batch of 12 messages - for i in range(len(bundle)): - ms = bundle[i] # validate each summary + for i, (ms, sbuf) in enumerate(bundle): # test the raw version + test_summary = rmr.message_summary(sbuf) + assert test_summary == ms assert ms["message state"] == 0 # all should be OK assert ms["message type"] == 2 # only mtype 2 should have been received expected_pay = bytes(pay_fmt % i, "UTF-8") # ordering should still jive with the counter assert ms["payload"] == expected_pay + rmr.rmr_free_msg(sbuf) def test_bad_buffer(): -- 2.16.6