and this project adheres to `Semantic
Versioning <http://semver.org/>`__.
+[2.1.0] - 12/4/2019
+--------------------
+
+::
+
+ * Allow the setting of payload and message type in rts
+ * Add a new function, helpers.rmr_rcvall_msgs_raw, that also returns the sbuf alongside the summary
+ * Add a mock for free_msg
+
+
[2.0.0] - 12/2/2019
--------------------
def rmr_rcvall_msgs(mrc, pass_filter=[]):
"""
- Assemble an array of all messages which can be received without
- blocking. Effectively draining the message queue if RMR is started
- in mt-call mode, or draining any waiting TCP buffers. If the
- pass_filter parameter is supplied it is treated as one or more message
- types to accept (pass through). Using the default, an empty list, results
- in messages with any type being captured.
-
- Parameters
- ----------
- mrc: ctypes c_void_p
- Pointer to the RMR context
-
- pass_filter: list (optional)
- The message type(s) to capture.
-
- Returns
- -------
- list
- List of message summaries, one for each message captured.
+ Assemble an array of all messages which can be received without
+ blocking. Effectively draining the message queue if RMR is started
+ in mt-call mode, or draining any waiting TCP buffers. If the
+ pass_filter parameter is supplied it is treated as one or more message
+ types to accept (pass through). Using the default, an empty list, results
+ in messages with any type being captured.
+
+ Parameters
+ ----------
+ mrc: ctypes c_void_p
+ Pointer to the RMR context
+
+ pass_filter: list (optional)
+ The message type(s) to capture.
+
+ Returns
+ -------
+ list of dict
+ List of message summaries, one for each message captured.
"""
new_messages = []
- mbuf = rmr.rmr_alloc_msg(mrc, 4096) # allocate buffer to have something for a return status
+ mbuf = rmr.rmr_alloc_msg(mrc, 4096) # allocate buffer to have something for a return status
while True:
mbuf = rmr.rmr_torcv_msg(mrc, mbuf, 0) # set the timeout to 0 so this doesn't block!!
summary = rmr.message_summary(mbuf)
- if summary["message status"] != "RMR_OK": # ok indicates msg received, stop on all other states
+ if summary["message status"] != "RMR_OK": # ok indicates msg received, stop on all other states
break
+
+ if len(pass_filter) == 0 or summary["message type"] in pass_filter: # no filter, or passes; capture it
+ new_messages.append(summary)
+
+ rmr.rmr_free_msg(mbuf) # must free message to avoid leak
+ return new_messages
+
+
+def rmr_rcvall_msgs_raw(mrc, pass_filter=[]):
+ """
+ Same as rmr_rcvall_msgs, but the raw sbuf is also returned.
+ Useful, for example, if rts is to be used.
+
+ Parameters
+ ----------
+ mrc: ctypes c_void_p
+ Pointer to the RMR context
+
+ pass_filter: list (optional)
+ The message type(s) to capture.
+
+ Returns
+ -------
+ list of tuple:$
+ List of tuples [(S, sbuf),...] where S is a message summary and sbuf is the raw message$
+ the caller is responsible for calling rmr.rmr_free_msg(sbuf) for each sbuf afterwards to prevent memory leaks.
+ """
+
+ new_messages = []
+
+ while True:
+ mbuf = rmr.rmr_alloc_msg(mrc, 4096) # allocate buffer to have something for a return status
+ mbuf = rmr.rmr_torcv_msg(mrc, mbuf, 0) # set the timeout to 0 so this doesn't block!!
+ summary = rmr.message_summary(mbuf)
+ if summary["message status"] != "RMR_OK":
+ break
+
+ if len(pass_filter) == 0 or mbuf.contents.mtype in pass_filter: # no filter, or passes; capture it
+ new_messages.append((summary, mbuf))
else:
- if len(pass_filter) == 0 or summary["message type"] in pass_filter: # no filter, or passes; capture it
- new_messages.append(summary)
+ rmr.rmr_free_msg(mbuf)
- rmr.rmr_free_msg(mbuf) # must free message to avoid leak
return new_messages
_rmr_rts_msg.restype = POINTER(rmr_mbuf_t)
-def rmr_rts_msg(vctx, ptr_mbuf):
+def rmr_rts_msg(vctx, ptr_mbuf, payload=None, mtype=None):
"""
Refer to the rmr C documentation for rmr_rts_msg
extern rmr_mbuf_t* rmr_rts_msg(void* vctx, rmr_mbuf_t* msg)
+
+ additional features beyond c-rmr:
+ if payload is not None, attempts to set the payload
+ if mtype is not None, sets the sbuf's message type
"""
+
+ if payload:
+ set_payload_and_length(payload, ptr_mbuf)
+
+ if mtype:
+ ptr_mbuf.contents.mtype = mtype
+
return _rmr_rts_msg(vctx, ptr_mbuf)
def fake_rmr_payload_size(_sbuf):
return 4096
+ def fake_free(_sbuf):
+ pass
+
+ monkeypatch.setattr("rmr.rmr.rmr_free_msg", fake_free)
monkeypatch.setattr("rmr.rmr.rmr_alloc_msg", fake_alloc)
monkeypatch.setattr("rmr.rmr.set_payload_and_length", fake_set_payload_and_length)
monkeypatch.setattr("rmr.rmr.generate_and_set_transaction_id", fake_generate_and_set_transaction_id)
setup(
name="rmr",
- version="2.0.0",
+ version="2.1.0",
packages=find_packages(),
author="Tommy Carpenter, E. Scott Daniels",
description="Python wrapper for RIC RMR",
# send an ACK back
ack_pay = b"message received"
- rmr.set_payload_and_length(ack_pay, sbuf_rcv)
- sbuf_rcv = rmr.rmr_rts_msg(MRC_RCV, sbuf_rcv)
+ sbuf_rcv = rmr.rmr_rts_msg(MRC_RCV, sbuf_rcv, payload=ack_pay, mtype=6666)
rcv_ack_summary = rmr.message_summary(sbuf_rcv)
# have the sender receive it
assert send_ack_summary["message state"] == rcv_ack_summary["message state"] == 0
assert send_ack_summary["message status"] == rcv_ack_summary["message status"] == "RMR_OK"
assert send_ack_summary["payload"] == ack_pay
+ assert send_ack_summary["message type"] == 6666
def test_send_rcv_subid_good():
bundle = helpers.rmr_rcvall_msgs(MRC_BUF_RCV) # use the buffered receiver to read all with a single call
assert len(bundle) == 13
- for i in range(len(bundle)):
+ for i, ms in enumerate(bundle):
ms = bundle[i] # validate each summary returned, and ordering preserved
assert ms["message state"] == 0
expected_pay = bytes(pay_fmt % i, "UTF-8")
send_burst(MRC_SEND, pay_fmt, mtype=2, num=4, counter=8) # total of 12 messages with type 2 should be queued
time.sleep(1) # ensure underlying transport gets cycles to send/receive
- bundle = helpers.rmr_rcvall_msgs(MRC_BUF_RCV, [2]) # receive only message type 2 messages
+ bundle = helpers.rmr_rcvall_msgs_raw(MRC_BUF_RCV, [2]) # receive only message type 2 messages
assert len(bundle) == 12 # we should only get the second batch of 12 messages
- for i in range(len(bundle)):
- ms = bundle[i] # validate each summary
+ for i, (ms, sbuf) in enumerate(bundle): # test the raw version
+ test_summary = rmr.message_summary(sbuf)
+ assert test_summary == ms
assert ms["message state"] == 0 # all should be OK
assert ms["message type"] == 2 # only mtype 2 should have been received
expected_pay = bytes(pay_fmt % i, "UTF-8") # ordering should still jive with the counter
assert ms["payload"] == expected_pay
+ rmr.rmr_free_msg(sbuf)
def test_bad_buffer():