1 # ==================================================================================
2 # Copyright (c) 2019-2020 Nokia
3 # Copyright (c) 2018-2020 AT&T Intellectual Property.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ==================================================================================
19 from ctypes import CDLL, POINTER, RTLD_GLOBAL, Structure
20 from ctypes import c_char, c_char_p, c_int, c_void_p, cast, create_string_buffer, memmove
22 from ricxappframe.rmr.exceptions import BadBufferAllocation, MeidSizeOutOfRange, InitFailed
24 # https://docs.python.org/3.7/library/ctypes.html
25 # https://stackoverflow.com/questions/2327344/ctypes-loading-a-c-shared-library-that-has-dependencies/30845750#30845750
26 # make sure you do a set -x LD_LIBRARY_PATH /usr/local/lib/;
27 rmr_c_lib = CDLL("librmr_si.so", mode=RTLD_GLOBAL)
30 # Internal Helpers (not a part of public api)
33 _rmr_const = rmr_c_lib.rmr_get_consts
34 _rmr_const.argtypes = []
35 _rmr_const.restype = c_char_p
38 def _get_constants(cache={}) -> dict:
40 Gets constants published by RMR and caches for subsequent calls.
41 TODO: are there constants that end user applications need?
46 js = _rmr_const() # read json string
47 cache = json.loads(str(js.decode())) # create constants value object as a hash
51 def _get_mapping_dict(cache={}) -> dict:
53 Builds a state mapping dict from constants and caches for subsequent calls.
54 Relevant constants at this writing include:
56 RMR_OK 0 state is good
57 RMR_ERR_BADARG 1 argument passd to function was unusable
58 RMR_ERR_NOENDPT 2 send/call could not find an endpoint based on msg type
59 RMR_ERR_EMPTY 3 msg received had no payload; attempt to send an empty message
60 RMR_ERR_NOHDR 4 message didn't contain a valid header
61 RMR_ERR_SENDFAILED 5 send failed; errno has nano reason
62 RMR_ERR_CALLFAILED 6 unable to send call() message
63 RMR_ERR_NOWHOPEN 7 no wormholes are open
64 RMR_ERR_WHID 8 wormhole id was invalid
65 RMR_ERR_OVERFLOW 9 operation would have busted through a buffer/field size
66 RMR_ERR_RETRY 10 request (send/call/rts) failed, but caller should retry (EAGAIN for wrappers)
67 RMR_ERR_RCVFAILED 11 receive failed (hard error)
68 RMR_ERR_TIMEOUT 12 message processing call timed out
69 RMR_ERR_UNSET 13 the message hasn't been populated with a transport buffer
70 RMR_ERR_TRUNC 14 received message likely truncated
71 RMR_ERR_INITFAILED 15 initialization of something (probably message) failed
77 rmr_consts = _get_constants()
78 for key in rmr_consts: # build the state mapping dict
79 if key[:7] in ["RMR_ERR", "RMR_OK"]:
80 en = int(rmr_consts[key])
86 def _state_to_status(stateno: int) -> str:
88 Converts a msg state integer to a status string.
89 Returns "UNKNOWN STATE" if the int value is not known.
92 sdict = _get_mapping_dict()
93 return sdict.get(stateno, "UNKNOWN STATE")
96 _RCONST = _get_constants()
104 # These constants are directly usable by importers of this library
105 # TODO: Are there others that will be useful?
107 #: Maximum size message to receive
108 RMR_MAX_RCV_BYTES = _RCONST["RMR_MAX_RCV_BYTES"]
109 #: Multi-threaded initialization flag
110 RMRFL_MTCALL = _RCONST.get("RMRFL_MTCALL", 0x02) # initialization flags
112 RMRFL_NONE = _RCONST.get("RMRFL_NONE", 0x0)
113 #: State constant for OK
114 RMR_OK = _RCONST["RMR_OK"]
115 #: State constant for timeout
116 RMR_ERR_TIMEOUT = _RCONST["RMR_ERR_TIMEOUT"]
117 #: State constant for retry
118 RMR_ERR_RETRY = _RCONST["RMR_ERR_RETRY"]
121 class rmr_mbuf_t(Structure):
123 Mirrors public members of type rmr_mbuf_t from RMR header file src/common/include/rmr.h
126 | int state; // state of processing
127 | int mtype; // message type
128 | int len; // length of data in the payload (send or received)
129 | unsigned char* payload; // transported data
130 | unsigned char* xaction; // pointer to fixed length transaction id bytes
131 | int sub_id; // subscription id
132 | int tp_state; // transport state (a.k.a errno)
134 | these things are off limits to the user application
136 | void* tp_buf; // underlying transport allocated pointer (e.g. nng message)
137 | void* header; // internal message header (whole buffer: header+payload)
138 | unsigned char* id; // if we need an ID in the message separate from the xaction id
139 | int flags; // various MFL (private) flags as needed
140 | int alloc_len; // the length of the allocated space (hdr+payload)
143 RE PAYLOADs type below, see the documentation for c_char_p:
144 class ctypes.c_char_p
145 Represents the C char * datatype when it points to a zero-terminated string.
146 For a general character pointer that may also point to binary data, POINTER(c_char)
147 must be used. The constructor accepts an integer address, or a bytes object.
154 ("payload", POINTER(c_char)), # according to the following the python bytes are already unsigned
155 # https://bytes.com/topic/python/answers/695078-ctypes-unsigned-char
156 ("xaction", POINTER(c_char)),
162 # argtypes and restype are important: https://stackoverflow.com/questions/24377845/ctype-why-specify-argtypes
165 _rmr_init = rmr_c_lib.rmr_init
166 _rmr_init.argtypes = [c_char_p, c_int, c_int]
167 _rmr_init.restype = c_void_p
170 def rmr_init(uproto_port: c_char_p, max_msg_size: int, flags: int) -> c_void_p:
172 Prepares the environment for sending and receiving messages.
173 Refer to RMR C documentation for method::
175 extern void* rmr_init(char* uproto_port, int max_msg_size, int flags)
177 This function raises an exception if the returned context is None.
181 uproto_port: c_char_p
182 Pointer to bytes built from the port number as a string; e.g., b'4550'
183 max_msg_size: integer
184 Maximum message size to receive
191 Pointer to RMR context
193 mrc = _rmr_init(uproto_port, max_msg_size, flags)
199 _rmr_ready = rmr_c_lib.rmr_ready
200 _rmr_ready.argtypes = [c_void_p]
201 _rmr_ready.restype = c_int
204 def rmr_ready(vctx: c_void_p) -> int:
206 Checks if a routing table has been received and installed.
207 Refer to RMR C documentation for method::
209 extern int rmr_ready(void* vctx)
213 vctx: ctypes c_void_p
214 Pointer to RMR context
220 return _rmr_ready(vctx)
223 _rmr_close = rmr_c_lib.rmr_close
224 _rmr_close.argtypes = [c_void_p]
227 def rmr_close(vctx: c_void_p):
229 Closes the listen socket.
230 Refer to RMR C documentation for method::
232 extern void rmr_close(void* vctx)
236 vctx: ctypes c_void_p
237 Pointer to RMR context
246 _rmr_set_stimeout = rmr_c_lib.rmr_set_stimeout
247 _rmr_set_stimeout.argtypes = [c_void_p, c_int]
248 _rmr_set_stimeout.restype = c_int
251 def rmr_set_stimeout(vctx: c_void_p, rloops: int) -> int:
253 Sets the configuration for how RMR will retry message send operations.
254 Refer to RMR C documentation for method::
256 extern int rmr_set_stimeout(void* vctx, int rloops)
260 vctx: ctypes c_void_p
261 Pointer to RMR context
263 Number of retry loops
267 0 on success, -1 on failure
269 return _rmr_set_stimeout(vctx, rloops)
272 _rmr_alloc_msg = rmr_c_lib.rmr_alloc_msg
273 _rmr_alloc_msg.argtypes = [c_void_p, c_int]
274 _rmr_alloc_msg.restype = POINTER(rmr_mbuf_t)
277 def rmr_alloc_msg(vctx: c_void_p, size: int,
278 payload=None, gen_transaction_id=False, mtype=None,
279 meid=None, sub_id=None, fixed_transaction_id=None):
281 Allocates and returns a buffer to write and send through the RMR library.
282 Refer to RMR C documentation for method::
284 extern rmr_mbuf_t* rmr_alloc_msg(void* vctx, int size)
286 Optionally populates the message from the remaining arguments.
288 TODO: on next API break, clean up transaction_id ugliness. Kept for now to preserve API.
292 vctx: ctypes c_void_p
293 Pointer to RMR context
295 How much space to allocate
297 if not None, attempts to set the payload
298 gen_transaction_id: bool
299 if True, generates and sets a transaction ID.
300 Note, option fixed_transaction_id overrides this option
302 if not None, sets the sbuf's message type
304 if not None, sets the sbuf's meid
306 if not None, sets the sbuf's subscription id
307 fixed_transaction_id: bytes
308 if not None, used as the transaction ID.
309 Note, this overrides the option gen_transaction_id
314 Pointer to rmr_mbuf structure
316 sbuf = _rmr_alloc_msg(vctx, size)
318 # make sure the alloc worked
321 # set specified fields
323 set_payload_and_length(payload, sbuf)
325 if fixed_transaction_id:
326 set_transaction_id(sbuf, fixed_transaction_id)
327 elif gen_transaction_id:
328 generate_and_set_transaction_id(sbuf)
331 sbuf.contents.mtype = mtype
334 rmr_set_meid(sbuf, meid)
337 sbuf.contents.sub_id = sub_id
342 raise BadBufferAllocation
345 _rmr_realloc_payload = rmr_c_lib.rmr_realloc_payload
346 _rmr_realloc_payload.argtypes = [POINTER(rmr_mbuf_t), c_int, c_int, c_int] # new_len, copy, clone
347 _rmr_realloc_payload.restype = POINTER(rmr_mbuf_t)
350 def rmr_realloc_payload(ptr_mbuf: c_void_p, new_len: int, copy=False, clone=False):
352 Allocates and returns a message buffer large enough for the new length.
353 Refer to RMR C documentation for method::
355 extern rmr_mbuf_t* rmr_realloc_payload(rmr_mbuf_t*, int, int, int)
360 Pointer to rmr_mbuf structure
364 Whether to copy the original paylod
366 Whether to clone the original buffer
371 Pointer to rmr_mbuf structure
373 return _rmr_realloc_payload(ptr_mbuf, new_len, copy, clone)
376 _rmr_free_msg = rmr_c_lib.rmr_free_msg
377 _rmr_free_msg.argtypes = [POINTER(rmr_mbuf_t)]
378 _rmr_free_msg.restype = None
381 def rmr_free_msg(ptr_mbuf: c_void_p):
383 Releases the message buffer.
384 Refer to RMR C documentation for method::
386 extern void rmr_free_msg(rmr_mbuf_t* mbuf )
391 Pointer to rmr_mbuf structure
397 if ptr_mbuf is not None:
398 _rmr_free_msg(ptr_mbuf)
401 _rmr_payload_size = rmr_c_lib.rmr_payload_size
402 _rmr_payload_size.argtypes = [POINTER(rmr_mbuf_t)]
403 _rmr_payload_size.restype = c_int
406 def rmr_payload_size(ptr_mbuf: c_void_p) -> int:
408 Gets the number of bytes available in the payload.
409 Refer to RMR C documentation for method::
411 extern int rmr_payload_size(rmr_mbuf_t* msg)
416 Pointer to rmr_mbuf structure
421 Number of bytes available
423 return _rmr_payload_size(ptr_mbuf)
427 The following functions all seem to have the same interface
430 _rmr_send_msg = rmr_c_lib.rmr_send_msg
431 _rmr_send_msg.argtypes = [c_void_p, POINTER(rmr_mbuf_t)]
432 _rmr_send_msg.restype = POINTER(rmr_mbuf_t)
435 def rmr_send_msg(vctx: c_void_p, ptr_mbuf: POINTER(rmr_mbuf_t)) -> POINTER(rmr_mbuf_t):
437 Sends the message according to the routing table and returns an empty buffer.
438 Refer to RMR C documentation for method::
440 extern rmr_mbuf_t* rmr_send_msg(void* vctx, rmr_mbuf_t* msg)
444 vctx: ctypes c_void_p
445 Pointer to RMR context
447 Pointer to rmr_mbuf structure
452 Pointer to rmr_mbuf structure
454 return _rmr_send_msg(vctx, ptr_mbuf)
457 # TODO: the old message (Send param) is actually optional, but I don't know how to specify that in Ctypes.
458 _rmr_rcv_msg = rmr_c_lib.rmr_rcv_msg
459 _rmr_rcv_msg.argtypes = [c_void_p, POINTER(rmr_mbuf_t)]
460 _rmr_rcv_msg.restype = POINTER(rmr_mbuf_t)
463 def rmr_rcv_msg(vctx: c_void_p, ptr_mbuf: POINTER(rmr_mbuf_t)) -> POINTER(rmr_mbuf_t):
465 Waits for a message to arrive, and returns it.
466 Refer to RMR C documentation for method::
468 extern rmr_mbuf_t* rmr_rcv_msg(void* vctx, rmr_mbuf_t* old_msg)
472 vctx: ctypes c_void_p
473 Pointer to RMR context
475 Pointer to rmr_mbuf structure
480 Pointer to rmr_mbuf structure
482 return _rmr_rcv_msg(vctx, ptr_mbuf)
485 _rmr_torcv_msg = rmr_c_lib.rmr_torcv_msg
486 _rmr_torcv_msg.argtypes = [c_void_p, POINTER(rmr_mbuf_t), c_int]
487 _rmr_torcv_msg.restype = POINTER(rmr_mbuf_t)
490 def rmr_torcv_msg(vctx: c_void_p, ptr_mbuf: POINTER(rmr_mbuf_t), ms_to: int) -> POINTER(rmr_mbuf_t):
492 Waits up to the timeout value for a message to arrive, and returns it.
493 Refer to RMR C documentation for method::
495 extern rmr_mbuf_t* rmr_torcv_msg(void* vctx, rmr_mbuf_t* old_msg, int ms_to)
499 vctx: ctypes c_void_p
500 Pointer to RMR context
502 Pointer to rmr_mbuf structure
504 Time out value in milliseconds
509 Pointer to rmr_mbuf structure
511 return _rmr_torcv_msg(vctx, ptr_mbuf, ms_to)
514 _rmr_rts_msg = rmr_c_lib.rmr_rts_msg
515 _rmr_rts_msg.argtypes = [c_void_p, POINTER(rmr_mbuf_t)]
516 _rmr_rts_msg.restype = POINTER(rmr_mbuf_t)
519 def rmr_rts_msg(vctx: c_void_p, ptr_mbuf: POINTER(rmr_mbuf_t), payload=None, mtype=None) -> POINTER(rmr_mbuf_t):
521 Sends a message to the originating endpoint and returns an empty buffer.
522 Refer to RMR C documentation for method::
524 extern rmr_mbuf_t* rmr_rts_msg(void* vctx, rmr_mbuf_t* msg)
526 additional features beyond c-rmr:
527 if payload is not None, attempts to set the payload
528 if mtype is not None, sets the sbuf's message type
532 vctx: ctypes c_void_p
533 Pointer to an RMR context
534 ptr_mbuf: ctypes c_void_p
535 Pointer to an RMR message buffer
544 Pointer to rmr_mbuf structure
548 set_payload_and_length(payload, ptr_mbuf)
551 ptr_mbuf.contents.mtype = mtype
553 return _rmr_rts_msg(vctx, ptr_mbuf)
556 _rmr_call = rmr_c_lib.rmr_call
557 _rmr_call.argtypes = [c_void_p, POINTER(rmr_mbuf_t)]
558 _rmr_call.restype = POINTER(rmr_mbuf_t)
561 def rmr_call(vctx: c_void_p, ptr_mbuf: POINTER(rmr_mbuf_t)) -> POINTER(rmr_mbuf_t):
563 Sends a message, waits for a response and returns it.
564 Refer to RMR C documentation for method::
566 extern rmr_mbuf_t* rmr_call(void* vctx, rmr_mbuf_t* msg)
570 ptr_mbuf: ctypes c_void_p
571 Pointer to an RMR message buffer
576 Pointer to rmr_mbuf structure
578 return _rmr_call(vctx, ptr_mbuf)
581 _rmr_bytes2meid = rmr_c_lib.rmr_bytes2meid
582 _rmr_bytes2meid.argtypes = [POINTER(rmr_mbuf_t), c_char_p, c_int]
583 _rmr_bytes2meid.restype = c_int
586 def rmr_set_meid(ptr_mbuf: POINTER(rmr_mbuf_t), byte_str: bytes) -> int:
588 Sets the managed entity field in the message and returns the number of bytes copied.
589 Refer to RMR C documentation for method::
591 extern int rmr_bytes2meid(rmr_mbuf_t* mbuf, unsigned char const* src, int len);
593 Caution: the meid length supported in an RMR message is 32 bytes, but C applications
594 expect this to be a nil terminated string and thus only 31 bytes are actually available.
596 Raises: exceptions.MeidSizeOutOfRang
600 ptr_mbuf: ctypes c_void_p
601 Pointer to an RMR message buffer
603 Managed entity ID value
608 number of bytes copied
610 max = _get_constants().get("RMR_MAX_MEID", 32)
611 if len(byte_str) >= max:
612 raise MeidSizeOutOfRange
614 return _rmr_bytes2meid(ptr_mbuf, byte_str, len(byte_str))
617 # CAUTION: Some of the C functions expect a mutable buffer to copy the bytes into;
618 # if there is a get_* function below, use it to set up and return the
621 # extern unsigned char* rmr_get_meid(rmr_mbuf_t* mbuf, unsigned char* dest);
622 # we don't provide direct access to this function (unless it is asked for) because it is not really useful to provide your own buffer.
623 # Rather, rmr_get_meid does this for you, and just returns the string.
624 _rmr_get_meid = rmr_c_lib.rmr_get_meid
625 _rmr_get_meid.argtypes = [POINTER(rmr_mbuf_t), c_char_p]
626 _rmr_get_meid.restype = c_char_p
629 def rmr_get_meid(ptr_mbuf: POINTER(rmr_mbuf_t)) -> bytes:
631 Gets the managed entity ID (meid) from the message header.
632 This is a python-friendly version of RMR C method::
634 extern unsigned char* rmr_get_meid(rmr_mbuf_t* mbuf, unsigned char* dest);
638 ptr_mbuf: ctypes c_void_p
639 Pointer to an RMR message buffer
646 sz = _get_constants().get("RMR_MAX_MEID", 32) # size for buffer to fill
647 buf = create_string_buffer(sz)
648 _rmr_get_meid(ptr_mbuf, buf)
652 _rmr_get_src = rmr_c_lib.rmr_get_src
653 _rmr_get_src.argtypes = [POINTER(rmr_mbuf_t), c_char_p]
654 _rmr_get_src.restype = c_char_p
657 def rmr_get_src(ptr_mbuf: POINTER(rmr_mbuf_t), dest: c_char_p) -> c_char_p:
659 Copies the message-source information to the buffer.
660 Refer to RMR C documentation for method::
662 extern unsigned char* rmr_get_src(rmr_mbuf_t* mbuf, unsigned char* dest);
666 ptr_mbuf: ctypes POINTER(rmr_mbuf_t)
667 Pointer to an RMR message buffer
668 dest: ctypes c_char_p
669 Pointer to a buffer to receive the message source
674 message-source information
676 return _rmr_get_src(ptr_mbuf, dest)
679 # Methods that exist ONLY in rmr-python, and are not wrapped methods
680 # In hindsight, I wish i put these in a separate module, but leaving this here to prevent api breakage.
683 def get_payload(ptr_mbuf: c_void_p) -> bytes:
685 Gets the binary payload from the rmr_buf_t*.
689 ptr_mbuf: ctypes c_void_p
690 Pointer to an rmr message buffer
697 # Logic came from the answer here: https://stackoverflow.com/questions/55103298/python-ctypes-read-pointerc-char-in-python
698 sz = ptr_mbuf.contents.len
699 CharArr = c_char * sz
700 return CharArr(*ptr_mbuf.contents.payload[:sz]).raw
703 def get_xaction(ptr_mbuf: c_void_p) -> bytes:
705 Gets the transaction ID from the rmr_buf_t*.
709 ptr_mbuf: ctypes c_void_p
710 Pointer to an rmr message buffer
717 val = cast(ptr_mbuf.contents.xaction, c_char_p).value
718 sz = _get_constants().get("RMR_MAX_XID", 0)
722 def message_summary(ptr_mbuf: c_void_p) -> dict:
724 Returns a dict with the fields of an RMR message.
728 ptr_mbuf: ctypes c_void_p
729 Pointer to an rmr message buffer
737 "payload": get_payload(ptr_mbuf) if ptr_mbuf.contents.state == RMR_OK else None,
738 "payload length": ptr_mbuf.contents.len,
739 "message type": ptr_mbuf.contents.mtype,
740 "subscription id": ptr_mbuf.contents.sub_id,
741 "transaction id": get_xaction(ptr_mbuf),
742 "message state": ptr_mbuf.contents.state,
743 "message status": _state_to_status(ptr_mbuf.contents.state),
744 "payload max size": rmr_payload_size(ptr_mbuf),
745 "meid": rmr_get_meid(ptr_mbuf),
746 "message source": get_src(ptr_mbuf),
747 "errno": ptr_mbuf.contents.tp_state,
751 def set_payload_and_length(byte_str: bytes, ptr_mbuf: c_void_p):
753 Sets an rmr payload and content length.
758 the bytes to set the payload to
759 ptr_mbuf: ctypes c_void_p
760 Pointer to an rmr message buffer
766 if rmr_payload_size(ptr_mbuf) < len(byte_str): # existing message payload too small
767 ptr_mbuf = rmr_realloc_payload(ptr_mbuf, len(byte_str), True)
769 memmove(ptr_mbuf.contents.payload, byte_str, len(byte_str))
770 ptr_mbuf.contents.len = len(byte_str)
773 def generate_and_set_transaction_id(ptr_mbuf: c_void_p):
775 Generates a UUID and sets the RMR transaction id to it
779 ptr_mbuf: ctypes c_void_p
780 Pointer to an rmr message buffer
782 set_transaction_id(ptr_mbuf, uuid.uuid1().hex.encode("utf-8"))
785 def set_transaction_id(ptr_mbuf: c_void_p, tid_bytes: bytes):
787 Sets an RMR transaction id
788 TODO: on next API break, merge these two functions. Not done now to preserve API.
792 ptr_mbuf: ctypes c_void_p
793 Pointer to an rmr message buffer
795 bytes of the desired transaction id
797 sz = _get_constants().get("RMR_MAX_XID", 0)
798 memmove(ptr_mbuf.contents.xaction, tid_bytes, sz)
801 def get_src(ptr_mbuf: c_void_p) -> str:
803 Gets the message source (likely host:port)
807 ptr_mbuf: ctypes c_void_p
808 Pointer to an rmr message buffer
815 sz = _get_constants().get("RMR_MAX_SRC", 64) # size to fill
816 buf = create_string_buffer(sz)
817 rmr_get_src(ptr_mbuf, buf)
818 return buf.value.decode()