1 # ==================================================================================
2 # Copyright (c) 2019-2020 Nokia
3 # Copyright (c) 2018-2020 AT&T Intellectual Property.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ==================================================================================
18 Wraps all RMR functions, but does not have a reference to the shared library.
21 from ctypes import POINTER, Structure
22 from ctypes import c_int, c_char, c_char_p, c_void_p, memmove, cast, create_string_buffer
24 from ricxappframe.rmr.exceptions import BadBufferAllocation, MeidSizeOutOfRange, InitFailed
25 from ricxappframe.rmr.rmrclib.rmrclib import rmr_c_lib, get_constants, state_to_status
32 def _get_rmr_constant(key: str, default=None):
34 Gets the constant with the named key from the RMR C library.
35 Returns None if the value is not a simple type. This happens
36 during sphinx autodoc document generation, which mocks the
37 rmrclib package to work without the RMR shared object file,
38 and the response is something like this:
39 <class 'ricxappframe.rmr.rmrclib.rmrclib.get_constants.get'>
40 Workaround for https://github.com/sphinx-doc/sphinx/issues/7422
42 val = get_constants().get(key, default)
43 return val if isinstance(val, (type(None), bool, bytes, float, int, str)) else None
46 # argtypes and restype are important:
47 # https://stackoverflow.com/questions/24377845/ctype-why-specify-argtypes
48 def _wrap_rmr_function(funcname, restype, argtypes):
50 Simplify wrapping ctypes functions.
55 Name of library method
57 Name of ctypes class; e.g., c_char_p
59 List of ctypes classes; e.g., [ c_char_p, int ]
64 Pointer to C library function
66 func = rmr_c_lib.__getattr__(funcname)
67 func.restype = restype
68 func.argtypes = argtypes
76 # Publish constants from RMR C-language header files for use by importers of this library.
77 # TODO: Are there others that will be useful?
78 #: Typical size message to receive; size is not limited
79 RMR_MAX_RCV_BYTES = _get_rmr_constant('RMR_MAX_RCV_BYTES')
80 #: Multi-threaded initialization flag
81 RMRFL_MTCALL = _get_rmr_constant('RMRFL_MTCALL', 0x02) # initialization flags
83 RMRFL_NONE = _get_rmr_constant('RMRFL_NONE', 0x0)
84 #: State constant for OK
85 RMR_OK = _get_rmr_constant('RMR_OK', 0x00)
86 #: State constant for no endpoint based on msg type
87 RMR_ERR_NOENDPT = _get_rmr_constant('RMR_ERR_NOENDPT')
88 #: State constant for retry
89 RMR_ERR_RETRY = _get_rmr_constant('RMR_ERR_RETRY')
90 #: State constant for timeout
91 RMR_ERR_TIMEOUT = _get_rmr_constant('RMR_ERR_TIMEOUT')
93 # Publish keys used in the message summary dict as constants
95 # message payload, bytes
96 RMR_MS_PAYLOAD = "payload"
97 # payload length, integer
98 RMR_MS_PAYLOAD_LEN = "payload length"
99 # message type, integer
100 RMR_MS_MSG_TYPE = "message type"
101 # subscription ID, integer
102 RMR_MS_SUB_ID = "subscription id"
103 # transaction ID, bytes
104 RMR_MS_TRN_ID = "transaction id"
105 # state of message processing, integer; e.g., 0
106 RMR_MS_MSG_STATE = "message state"
107 # state of message processing converted to string; e.g., RMR_OK
108 RMR_MS_MSG_STATUS = "message status"
109 # number of bytes usable in the payload, integer
110 RMR_MS_PAYLOAD_MAX = "payload max size"
111 # managed entity ID, bytes
113 # message source, string; e.g., host:port
114 RMR_MS_MSG_SOURCE = "message source"
115 # transport state, integer
116 RMR_MS_ERRNO = "errno"
119 class rmr_mbuf_t(Structure):
121 Mirrors public members of type rmr_mbuf_t from RMR header file src/common/include/rmr.h
124 | int state; // state of processing
125 | int mtype; // message type
126 | int len; // length of data in the payload (send or received)
127 | unsigned char* payload; // transported data
128 | unsigned char* xaction; // pointer to fixed length transaction id bytes
129 | int sub_id; // subscription id
130 | int tp_state; // transport state (a.k.a errno)
132 | these things are off limits to the user application
134 | void* tp_buf; // underlying transport allocated pointer
135 | void* header; // internal message header (whole buffer: header+payload)
136 | unsigned char* id; // if we need an ID in the message separate from the xaction id
137 | int flags; // various MFL (private) flags as needed
138 | int alloc_len; // the length of the allocated space (hdr+payload)
141 RE PAYLOADs type below, see the documentation for c_char_p:
142 class ctypes.c_char_p
143 Represents the C char * datatype when it points to a zero-terminated string.
144 For a general character pointer that may also point to binary data, POINTER(c_char)
145 must be used. The constructor accepts an integer address, or a bytes object.
147 # re payload, according to the following the python bytes are already unsigned:
148 # https://bytes.com/topic/python/answers/695078-ctypes-unsigned-char
153 ("payload", POINTER(c_char)),
154 ("xaction", POINTER(c_char)),
160 _rmr_init = _wrap_rmr_function('rmr_init', c_void_p, [c_char_p, c_int, c_int])
163 def rmr_init(uproto_port: c_char_p, max_msg_size: int, flags: int) -> c_void_p:
165 Prepares the environment for sending and receiving messages.
166 Refer to RMR C documentation for method::
168 extern void* rmr_init(char* uproto_port, int max_msg_size, int flags)
170 This function raises an exception if the returned context is None.
174 uproto_port: c_char_p
175 Pointer to bytes built from the port number as a string; e.g., b'4550'
176 max_msg_size: integer
177 Maximum message size to receive
184 Pointer to RMR context
186 mrc = _rmr_init(uproto_port, max_msg_size, flags)
192 _rmr_ready = _wrap_rmr_function('rmr_ready', c_int, [c_void_p])
195 def rmr_ready(vctx: c_void_p) -> int:
197 Checks if a routing table has been received and installed.
198 Refer to RMR C documentation for method::
200 extern int rmr_ready(void* vctx)
204 vctx: ctypes c_void_p
205 Pointer to RMR context
211 return _rmr_ready(vctx)
214 _rmr_close = _wrap_rmr_function('rmr_close', None, [c_void_p])
217 def rmr_close(vctx: c_void_p):
219 Closes the listen socket.
220 Refer to RMR C documentation for method::
222 extern void rmr_close(void* vctx)
226 vctx: ctypes c_void_p
227 Pointer to RMR context
236 _rmr_set_stimeout = _wrap_rmr_function('rmr_set_stimeout', c_int, [c_void_p, c_int])
239 def rmr_set_stimeout(vctx: c_void_p, rloops: int) -> int:
241 Sets the configuration for how RMR will retry message send operations.
242 Refer to RMR C documentation for method::
244 extern int rmr_set_stimeout(void* vctx, int rloops)
248 vctx: ctypes c_void_p
249 Pointer to RMR context
251 Number of retry loops
255 0 on success, -1 on failure
257 return _rmr_set_stimeout(vctx, rloops)
260 _rmr_alloc_msg = _wrap_rmr_function('rmr_alloc_msg', POINTER(rmr_mbuf_t), [c_void_p, c_int])
263 def rmr_alloc_msg(vctx: c_void_p, size: int,
264 payload=None, gen_transaction_id=False, mtype=None,
265 meid=None, sub_id=None, fixed_transaction_id=None):
267 Allocates and returns a buffer to write and send through the RMR library.
268 Refer to RMR C documentation for method::
270 extern rmr_mbuf_t* rmr_alloc_msg(void* vctx, int size)
272 Optionally populates the message from the remaining arguments.
274 TODO: on next API break, clean up transaction_id ugliness. Kept for now to preserve API.
278 vctx: ctypes c_void_p
279 Pointer to RMR context
281 How much space to allocate
283 if not None, attempts to set the payload
284 gen_transaction_id: bool
285 if True, generates and sets a transaction ID.
286 Note, option fixed_transaction_id overrides this option
288 if not None, sets the sbuf's message type
290 if not None, sets the sbuf's meid
292 if not None, sets the sbuf's subscription id
293 fixed_transaction_id: bytes
294 if not None, used as the transaction ID.
295 Note, this overrides the option gen_transaction_id
300 Pointer to rmr_mbuf structure
302 sbuf = _rmr_alloc_msg(vctx, size)
304 # make sure the alloc worked
307 # set specified fields
309 set_payload_and_length(payload, sbuf)
311 if fixed_transaction_id:
312 set_transaction_id(sbuf, fixed_transaction_id)
313 elif gen_transaction_id:
314 generate_and_set_transaction_id(sbuf)
317 sbuf.contents.mtype = mtype
320 rmr_set_meid(sbuf, meid)
323 sbuf.contents.sub_id = sub_id
328 raise BadBufferAllocation
331 _rmr_realloc_payload = _wrap_rmr_function('rmr_realloc_payload', POINTER(rmr_mbuf_t), [POINTER(rmr_mbuf_t), c_int, c_int, c_int])
334 def rmr_realloc_payload(ptr_mbuf: c_void_p, new_len: int, copy=False, clone=False):
336 Allocates and returns a message buffer large enough for the new length.
337 Refer to RMR C documentation for method::
339 extern rmr_mbuf_t* rmr_realloc_payload(rmr_mbuf_t*, int, int, int)
344 Pointer to rmr_mbuf structure
348 Whether to copy the original paylod
350 Whether to clone the original buffer
355 Pointer to rmr_mbuf structure
357 return _rmr_realloc_payload(ptr_mbuf, new_len, copy, clone)
360 _rmr_free_msg = _wrap_rmr_function('rmr_free_msg', None, [POINTER(rmr_mbuf_t)])
363 def rmr_free_msg(ptr_mbuf: c_void_p):
365 Releases the message buffer.
366 Refer to RMR C documentation for method::
368 extern void rmr_free_msg(rmr_mbuf_t* mbuf )
373 Pointer to rmr_mbuf structure
379 if ptr_mbuf is not None:
380 _rmr_free_msg(ptr_mbuf)
383 _rmr_payload_size = _wrap_rmr_function('rmr_payload_size', c_int, [POINTER(rmr_mbuf_t)])
386 def rmr_payload_size(ptr_mbuf: c_void_p) -> int:
388 Gets the number of bytes available in the payload.
389 Refer to RMR C documentation for method::
391 extern int rmr_payload_size(rmr_mbuf_t* mbuf)
396 Pointer to rmr_mbuf structure
401 Number of bytes available
403 return _rmr_payload_size(ptr_mbuf)
406 _rmr_send_msg = _wrap_rmr_function('rmr_send_msg', POINTER(rmr_mbuf_t), [c_void_p, POINTER(rmr_mbuf_t)])
409 def rmr_send_msg(vctx: c_void_p, ptr_mbuf: POINTER(rmr_mbuf_t)) -> POINTER(rmr_mbuf_t):
411 Sends the message according to the routing table and returns an empty buffer.
412 Refer to RMR C documentation for method::
414 extern rmr_mbuf_t* rmr_send_msg(void* vctx, rmr_mbuf_t* mbuf)
418 vctx: ctypes c_void_p
419 Pointer to RMR context
421 Pointer to rmr_mbuf structure
426 Pointer to rmr_mbuf structure
428 return _rmr_send_msg(vctx, ptr_mbuf)
431 # TODO: the old message (Send param) is actually optional, but I don't know how to specify that in Ctypes.
432 _rmr_rcv_msg = _wrap_rmr_function('rmr_rcv_msg', POINTER(rmr_mbuf_t), [c_void_p, POINTER(rmr_mbuf_t)])
435 def rmr_rcv_msg(vctx: c_void_p, ptr_mbuf: POINTER(rmr_mbuf_t)) -> POINTER(rmr_mbuf_t):
437 Waits for a message to arrive, and returns it.
438 Refer to RMR C documentation for method::
440 extern rmr_mbuf_t* rmr_rcv_msg(void* vctx, rmr_mbuf_t* old_mbuf)
444 vctx: ctypes c_void_p
445 Pointer to RMR context
447 Pointer to rmr_mbuf structure
452 Pointer to rmr_mbuf structure
454 return _rmr_rcv_msg(vctx, ptr_mbuf)
457 _rmr_torcv_msg = _wrap_rmr_function('rmr_torcv_msg', POINTER(rmr_mbuf_t), [c_void_p, POINTER(rmr_mbuf_t), c_int])
460 def rmr_torcv_msg(vctx: c_void_p, ptr_mbuf: POINTER(rmr_mbuf_t), ms_to: int) -> POINTER(rmr_mbuf_t):
462 Waits up to the timeout value for a message to arrive, and returns it.
463 Refer to RMR C documentation for method::
465 extern rmr_mbuf_t* rmr_torcv_msg(void* vctx, rmr_mbuf_t* old_mbuf, int ms_to)
469 vctx: ctypes c_void_p
470 Pointer to RMR context
472 Pointer to rmr_mbuf structure
474 Time out value in milliseconds
479 Pointer to rmr_mbuf structure
481 return _rmr_torcv_msg(vctx, ptr_mbuf, ms_to)
484 _rmr_rts_msg = _wrap_rmr_function('rmr_rts_msg', POINTER(rmr_mbuf_t), [c_void_p, POINTER(rmr_mbuf_t)])
487 def rmr_rts_msg(vctx: c_void_p, ptr_mbuf: POINTER(rmr_mbuf_t), payload=None, mtype=None) -> POINTER(rmr_mbuf_t):
489 Sends a message to the originating endpoint and returns an empty buffer.
490 Refer to RMR C documentation for method::
492 extern rmr_mbuf_t* rmr_rts_msg(void* vctx, rmr_mbuf_t* mbuf)
494 additional features beyond c-rmr:
495 if payload is not None, attempts to set the payload
496 if mtype is not None, sets the sbuf's message type
500 vctx: ctypes c_void_p
501 Pointer to an RMR context
502 ptr_mbuf: ctypes c_void_p
503 Pointer to an RMR message buffer
512 Pointer to rmr_mbuf structure
516 set_payload_and_length(payload, ptr_mbuf)
519 ptr_mbuf.contents.mtype = mtype
521 return _rmr_rts_msg(vctx, ptr_mbuf)
524 _rmr_call = _wrap_rmr_function('rmr_call', POINTER(rmr_mbuf_t), [c_void_p, POINTER(rmr_mbuf_t)])
527 def rmr_call(vctx: c_void_p, ptr_mbuf: POINTER(rmr_mbuf_t)) -> POINTER(rmr_mbuf_t):
529 Sends a message, waits for a response and returns it.
530 Refer to RMR C documentation for method::
532 extern rmr_mbuf_t* rmr_call(void* vctx, rmr_mbuf_t* mbuf)
536 ptr_mbuf: ctypes c_void_p
537 Pointer to an RMR message buffer
542 Pointer to rmr_mbuf structure
544 return _rmr_call(vctx, ptr_mbuf)
547 _rmr_bytes2meid = _wrap_rmr_function('rmr_bytes2meid', c_int, [POINTER(rmr_mbuf_t), c_char_p, c_int])
550 def rmr_set_meid(ptr_mbuf: POINTER(rmr_mbuf_t), byte_str: bytes) -> int:
552 Sets the managed entity field in the message and returns the number of bytes copied.
553 Refer to RMR C documentation for method::
555 extern int rmr_bytes2meid(rmr_mbuf_t* mbuf, unsigned char const* src, int len);
557 Caution: the meid length supported in an RMR message is 32 bytes, but C applications
558 expect this to be a nil terminated string and thus only 31 bytes are actually available.
560 Raises: exceptions.MeidSizeOutOfRang
564 ptr_mbuf: ctypes c_void_p
565 Pointer to rmr_mbuf structure
567 Managed entity ID value
572 number of bytes copied
574 max_meid = _get_rmr_constant("RMR_MAX_MEID", 32)
575 if len(byte_str) >= max_meid:
576 raise MeidSizeOutOfRange
578 return _rmr_bytes2meid(ptr_mbuf, byte_str, len(byte_str))
581 # CAUTION: Some of the C functions expect a mutable buffer to copy the bytes into;
582 # if there is a get_* function below, use it to set up and return the
585 # extern unsigned char* rmr_get_meid(rmr_mbuf_t* mbuf, unsigned char* dest);
586 # we don't provide direct access to this function (unless it is asked for) because it is not really useful to provide your own buffer.
587 # Rather, rmr_get_meid does this for you, and just returns the string.
588 _rmr_get_meid = _wrap_rmr_function('rmr_get_meid', c_char_p, [POINTER(rmr_mbuf_t), c_char_p])
591 def rmr_get_meid(ptr_mbuf: POINTER(rmr_mbuf_t)) -> bytes:
593 Gets the managed entity ID (meid) from the message header.
594 This is a python-friendly version of RMR C method::
596 extern unsigned char* rmr_get_meid(rmr_mbuf_t* mbuf, unsigned char* dest);
600 ptr_mbuf: ctypes c_void_p
601 Pointer to rmr_mbuf structure
608 max_meid = _get_rmr_constant("RMR_MAX_MEID", 32) # size for buffer to fill
609 buf = create_string_buffer(max_meid)
610 _rmr_get_meid(ptr_mbuf, buf)
614 _rmr_get_src = _wrap_rmr_function('rmr_get_src', c_char_p, [POINTER(rmr_mbuf_t), c_char_p])
617 def rmr_get_src(ptr_mbuf: POINTER(rmr_mbuf_t), dest: c_char_p) -> c_char_p:
619 Copies the message-source information to the buffer.
620 Refer to RMR C documentation for method::
622 extern unsigned char* rmr_get_src(rmr_mbuf_t* mbuf, unsigned char* dest);
626 ptr_mbuf: ctypes POINTER(rmr_mbuf_t)
627 Pointer to rmr_mbuf structure
628 dest: ctypes c_char_p
629 Pointer to a buffer to receive the message source
634 message-source information
636 return _rmr_get_src(ptr_mbuf, dest)
639 _rmr_set_vlevel = _wrap_rmr_function('rmr_set_vlevel', None, [c_int])
642 def rmr_set_vlevel(new_level: c_int):
644 Sets the verbosity level which determines the messages RMR writes to standard error.
645 Refer to RMR C documentation for method::
647 void rmr_set_vlevel( int new_level )
652 New logging verbosity level, an integer in the range 0 (none) to 5 (debug).
654 _rmr_set_vlevel(new_level)
657 _rmr_wh_call = _wrap_rmr_function('rmr_wh_call', POINTER(rmr_mbuf_t), [c_void_p, c_int, POINTER(rmr_mbuf_t), c_int, c_int])
660 def rmr_wh_call(vctx: c_void_p, whid: c_int, ptr_mbuf: POINTER(rmr_mbuf_t), call_id: c_int, max_wait: c_int) -> POINTER(rmr_mbuf_t):
662 Sends a message buffer (msg) using a wormhole ID (whid) and waits for a response.
663 Refer to RMR C documentation for method::
665 rmr_mbuf_t* rmr_wh_call( void* vctx, rmr_whid_t whid, rmr_mbuf_t* mbuf, int call_id, int max_wait )
669 vctx: ctypes c_void_p
670 Pointer to RMR context
672 Wormhole ID returned by rmr_wh_open
673 ptr_mbuf: ctypes POINTER(rmr_mbuf_t)
674 Pointer to rmr_mbuf structure
676 number in the range of 2..255 to identify the caller
678 number of milliseconds to wait for a reply
683 Pointer to rmr_mbuf structure
685 return _rmr_wh_call(vctx, whid, ptr_mbuf, call_id, max_wait)
688 _rmr_wh_close = _wrap_rmr_function('rmr_close', None, [c_void_p, c_int])
691 def rmr_wh_close(vctx: c_void_p, whid: c_int):
693 Closes the wormhole associated with the wormhole id.
694 Refer to RMR C documentation for method::
696 void rmr_close( void* vctx, rmr_whid_t whid )
700 vctx: ctypes c_void_p
701 Pointer to RMR context
703 Wormhole ID returned by rmr_wh_open
705 _rmr_wh_close(vctx, whid)
708 _rmr_wh_open = _wrap_rmr_function('rmr_wh_open', c_int, [c_void_p, c_char_p])
711 def rmr_wh_open(vctx: c_void_p, target: c_char_p) -> c_int:
713 Creates a direct link for sending to another RMR based process.
714 Refer to RMR C documentation for method::
716 rmr_whid_t rmr_wh_open( void* vctx, char* target )
720 vctx: ctypes c_void_p
721 Pointer to RMR context
723 name/IP and port combination of the target process; e.g., "localhost:6123"
730 return _rmr_wh_open(vctx, target)
733 _rmr_wh_send_msg = _wrap_rmr_function('rmr_wh_send_msg', POINTER(rmr_mbuf_t), [c_void_p, c_int, POINTER(rmr_mbuf_t)])
736 def rmr_wh_send_msg(vctx: c_void_p, whid: c_int, ptr_mbuf: POINTER(rmr_mbuf_t)) -> POINTER(rmr_mbuf_t):
738 Sends a message buffer (msg) using a wormhole ID (whid).
739 Refer to RMR C documentation for method::
741 rmr_mbuf_t* rmr_wh_send_msg( void* vctx, rmr_whid_t id, rmr_mbuf_t* msg )
745 vctx: ctypes c_void_p
746 Pointer to RMR context
748 Wormhole ID returned by rmr_wh_open
749 ptr_mbuf: ctypes POINTER(rmr_mbuf_t)
750 Pointer to rmr_mbuf structure
754 ctypes POINTER(rmr_mbuf_t):
755 Pointer to rmr_mbuf structure
757 return _rmr_wh_send_msg(vctx, whid, ptr_mbuf)
760 _rmr_wh_state = _wrap_rmr_function('rmr_wh_state', c_int, [c_void_p, c_int])
763 def rmr_wh_state(vctx: c_void_p, whid: c_int) -> c_int:
765 Gets the state of the connection associated with the given wormhole (whid).
766 Refer to RMR C documentation for method::
768 int rmr_wh_state( void* vctx, rmr_whid_t whid )
772 vctx: ctypes c_void_p
773 Pointer to RMR context
775 Wormhole ID returned by rmr_wh_open
780 State of the connection
782 return _rmr_wh_state(vctx, whid, whid)
785 ########################################################################################
786 # Methods that exist ONLY in rmr-python, and are not wrapped methods.
787 # These should have been in a separate module, but leaving here to prevent api breakage.
788 ########################################################################################
791 def get_payload(ptr_mbuf: c_void_p) -> bytes:
793 Gets the binary payload from the rmr_buf_t*.
797 ptr_mbuf: ctypes c_void_p
798 Pointer to an rmr message buffer
805 # Logic came from the answer here:
806 # https://stackoverflow.com/questions/55103298/python-ctypes-read-pointerc-char-in-python
807 length = ptr_mbuf.contents.len
808 char_arr = c_char * length
809 return char_arr(*ptr_mbuf.contents.payload[:length]).raw
812 def get_xaction(ptr_mbuf: c_void_p) -> bytes:
814 Gets the transaction ID from the rmr_buf_t*.
818 ptr_mbuf: ctypes c_void_p
819 Pointer to an rmr message buffer
826 val = cast(ptr_mbuf.contents.xaction, c_char_p).value
827 max_xid = _get_rmr_constant("RMR_MAX_XID", 0)
831 def message_summary(ptr_mbuf: c_void_p) -> dict:
833 Builds a dict with the contents of an RMR message.
837 ptr_mbuf: ctypes c_void_p
838 Pointer to an RMR message buffer
843 Message content as key-value pairs; keys are defined as RMR_MS_* constants.
846 RMR_MS_PAYLOAD: get_payload(ptr_mbuf) if ptr_mbuf.contents.state == RMR_OK else None,
847 RMR_MS_PAYLOAD_LEN: ptr_mbuf.contents.len,
848 RMR_MS_MSG_TYPE: ptr_mbuf.contents.mtype,
849 RMR_MS_SUB_ID: ptr_mbuf.contents.sub_id,
850 RMR_MS_TRN_ID: get_xaction(ptr_mbuf),
851 RMR_MS_MSG_STATE: ptr_mbuf.contents.state,
852 RMR_MS_MSG_STATUS: state_to_status(ptr_mbuf.contents.state),
853 RMR_MS_PAYLOAD_MAX: rmr_payload_size(ptr_mbuf),
854 RMR_MS_MEID: rmr_get_meid(ptr_mbuf),
855 RMR_MS_MSG_SOURCE: get_src(ptr_mbuf),
856 RMR_MS_ERRNO: ptr_mbuf.contents.tp_state,
860 def set_payload_and_length(byte_str: bytes, ptr_mbuf: c_void_p):
862 Sets an rmr payload and content length.
867 the bytes to set the payload to
868 ptr_mbuf: ctypes c_void_p
869 Pointer to an rmr message buffer
875 if rmr_payload_size(ptr_mbuf) < len(byte_str): # existing message payload too small
876 ptr_mbuf = rmr_realloc_payload(ptr_mbuf, len(byte_str), True)
878 memmove(ptr_mbuf.contents.payload, byte_str, len(byte_str))
879 ptr_mbuf.contents.len = len(byte_str)
882 def generate_and_set_transaction_id(ptr_mbuf: c_void_p):
884 Generates a UUID and sets the RMR transaction id to it
888 ptr_mbuf: ctypes c_void_p
889 Pointer to an rmr message buffer
891 set_transaction_id(ptr_mbuf, uuid.uuid1().hex.encode("utf-8"))
894 def set_transaction_id(ptr_mbuf: c_void_p, tid_bytes: bytes):
896 Sets an RMR transaction id
897 TODO: on next API break, merge these two functions. Not done now to preserve API.
901 ptr_mbuf: ctypes c_void_p
902 Pointer to an rmr message buffer
904 bytes of the desired transaction id
906 max_xid = _get_rmr_constant("RMR_MAX_XID", 0)
907 memmove(ptr_mbuf.contents.xaction, tid_bytes, max_xid)
910 def get_src(ptr_mbuf: c_void_p) -> str:
912 Gets the message source (likely host:port)
916 ptr_mbuf: ctypes c_void_p
917 Pointer to an rmr message buffer
924 max_src = _get_rmr_constant("RMR_MAX_SRC", 64) # size to fill
925 buf = create_string_buffer(max_src)
926 rmr_get_src(ptr_mbuf, buf)
927 return buf.value.decode()