1 # ==================================================================================
2 # Copyright (c) 2019-2020 Nokia
3 # Copyright (c) 2018-2020 AT&T Intellectual Property.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ==================================================================================
18 from ctypes import POINTER, Structure
19 from ctypes import c_int, c_char, c_char_p, c_void_p, memmove, cast, create_string_buffer
21 from ricxappframe.rmr.exceptions import BadBufferAllocation, MeidSizeOutOfRange, InitFailed
22 from ricxappframe.rmr.rmrclib.rmrclib import rmr_c_lib, get_constants, state_to_status
29 def _get_rmr_constant(key: str, default=None):
31 Gets the constant with the named key from the RMR C library.
32 Returns None if the value is not a simple type. This happens
33 during sphinx autodoc document generation, which mocks the
34 rmrclib package to work without the RMR shared object file,
35 and the response is something like this:
36 <class 'ricxappframe.rmr.rmrclib.rmrclib.get_constants.get'>
37 Workaround for https://github.com/sphinx-doc/sphinx/issues/7422
39 val = get_constants().get(key, default)
40 return val if isinstance(val, (type(None), bool, bytes, float, int, str)) else None
43 # argtypes and restype are important:
44 # https://stackoverflow.com/questions/24377845/ctype-why-specify-argtypes
45 def _wrap_rmr_function(funcname, restype, argtypes):
47 Simplify wrapping ctypes functions.
52 Name of library method
54 Name of ctypes class; e.g., c_char_p
56 List of ctypes classes; e.g., [ c_char_p, int ]
61 Pointer to C library function
63 func = rmr_c_lib.__getattr__(funcname)
64 func.restype = restype
65 func.argtypes = argtypes
73 # Publish constants from RMR C-language header files for use by importers of this library.
74 # TODO: Are there others that will be useful?
75 #: Typical size message to receive; size is not limited
76 RMR_MAX_RCV_BYTES = _get_rmr_constant('RMR_MAX_RCV_BYTES')
77 #: Multi-threaded initialization flag
78 RMRFL_MTCALL = _get_rmr_constant('RMRFL_MTCALL', 0x02) # initialization flags
80 RMRFL_NONE = _get_rmr_constant('RMRFL_NONE', 0x0)
81 #: State constant for OK
82 RMR_OK = _get_rmr_constant('RMR_OK', 0x00)
83 #: State constant for no endpoint based on msg type
84 RMR_ERR_NOENDPT = _get_rmr_constant('RMR_ERR_NOENDPT')
85 #: State constant for retry
86 RMR_ERR_RETRY = _get_rmr_constant('RMR_ERR_RETRY')
87 #: State constant for timeout
88 RMR_ERR_TIMEOUT = _get_rmr_constant('RMR_ERR_TIMEOUT')
90 # Publish keys used in the message summary dict as constants
92 # message payload, bytes
93 RMR_MS_PAYLOAD = "payload"
94 # payload length, integer
95 RMR_MS_PAYLOAD_LEN = "payload length"
96 # message type, integer
97 RMR_MS_MSG_TYPE = "message type"
98 # subscription ID, integer
99 RMR_MS_SUB_ID = "subscription id"
100 # transaction ID, bytes
101 RMR_MS_TRN_ID = "transaction id"
102 # state of message processing, integer; e.g., 0
103 RMR_MS_MSG_STATE = "message state"
104 # state of message processing converted to string; e.g., RMR_OK
105 RMR_MS_MSG_STATUS = "message status"
106 # number of bytes usable in the payload, integer
107 RMR_MS_PAYLOAD_MAX = "payload max size"
108 # managed entity ID, bytes
110 # message source, string; e.g., host:port
111 RMR_MS_MSG_SOURCE = "message source"
112 # transport state, integer
113 RMR_MS_ERRNO = "errno"
116 class rmr_mbuf_t(Structure):
118 Mirrors public members of type rmr_mbuf_t from RMR header file src/common/include/rmr.h
121 | int state; // state of processing
122 | int mtype; // message type
123 | int len; // length of data in the payload (send or received)
124 | unsigned char* payload; // transported data
125 | unsigned char* xaction; // pointer to fixed length transaction id bytes
126 | int sub_id; // subscription id
127 | int tp_state; // transport state (a.k.a errno)
129 | these things are off limits to the user application
131 | void* tp_buf; // underlying transport allocated pointer
132 | void* header; // internal message header (whole buffer: header+payload)
133 | unsigned char* id; // if we need an ID in the message separate from the xaction id
134 | int flags; // various MFL (private) flags as needed
135 | int alloc_len; // the length of the allocated space (hdr+payload)
138 RE PAYLOADs type below, see the documentation for c_char_p:
139 class ctypes.c_char_p
140 Represents the C char * datatype when it points to a zero-terminated string.
141 For a general character pointer that may also point to binary data, POINTER(c_char)
142 must be used. The constructor accepts an integer address, or a bytes object.
149 ("payload", POINTER(c_char)), # according to the following the python bytes are already unsigned
150 # https://bytes.com/topic/python/answers/695078-ctypes-unsigned-char
151 ("xaction", POINTER(c_char)),
157 _rmr_init = _wrap_rmr_function('rmr_init', c_void_p, [c_char_p, c_int, c_int])
160 def rmr_init(uproto_port: c_char_p, max_msg_size: int, flags: int) -> c_void_p:
162 Prepares the environment for sending and receiving messages.
163 Refer to RMR C documentation for method::
165 extern void* rmr_init(char* uproto_port, int max_msg_size, int flags)
167 This function raises an exception if the returned context is None.
171 uproto_port: c_char_p
172 Pointer to bytes built from the port number as a string; e.g., b'4550'
173 max_msg_size: integer
174 Maximum message size to receive
181 Pointer to RMR context
183 mrc = _rmr_init(uproto_port, max_msg_size, flags)
189 _rmr_ready = _wrap_rmr_function('rmr_ready', c_int, [c_void_p])
192 def rmr_ready(vctx: c_void_p) -> int:
194 Checks if a routing table has been received and installed.
195 Refer to RMR C documentation for method::
197 extern int rmr_ready(void* vctx)
201 vctx: ctypes c_void_p
202 Pointer to RMR context
208 return _rmr_ready(vctx)
211 _rmr_close = _wrap_rmr_function('rmr_close', None, [c_void_p])
214 def rmr_close(vctx: c_void_p):
216 Closes the listen socket.
217 Refer to RMR C documentation for method::
219 extern void rmr_close(void* vctx)
223 vctx: ctypes c_void_p
224 Pointer to RMR context
233 _rmr_set_stimeout = _wrap_rmr_function('rmr_set_stimeout', c_int, [c_void_p, c_int])
236 def rmr_set_stimeout(vctx: c_void_p, rloops: int) -> int:
238 Sets the configuration for how RMR will retry message send operations.
239 Refer to RMR C documentation for method::
241 extern int rmr_set_stimeout(void* vctx, int rloops)
245 vctx: ctypes c_void_p
246 Pointer to RMR context
248 Number of retry loops
252 0 on success, -1 on failure
254 return _rmr_set_stimeout(vctx, rloops)
257 _rmr_alloc_msg = _wrap_rmr_function('rmr_alloc_msg', POINTER(rmr_mbuf_t), [c_void_p, c_int])
260 def rmr_alloc_msg(vctx: c_void_p, size: int,
261 payload=None, gen_transaction_id=False, mtype=None,
262 meid=None, sub_id=None, fixed_transaction_id=None):
264 Allocates and returns a buffer to write and send through the RMR library.
265 Refer to RMR C documentation for method::
267 extern rmr_mbuf_t* rmr_alloc_msg(void* vctx, int size)
269 Optionally populates the message from the remaining arguments.
271 TODO: on next API break, clean up transaction_id ugliness. Kept for now to preserve API.
275 vctx: ctypes c_void_p
276 Pointer to RMR context
278 How much space to allocate
280 if not None, attempts to set the payload
281 gen_transaction_id: bool
282 if True, generates and sets a transaction ID.
283 Note, option fixed_transaction_id overrides this option
285 if not None, sets the sbuf's message type
287 if not None, sets the sbuf's meid
289 if not None, sets the sbuf's subscription id
290 fixed_transaction_id: bytes
291 if not None, used as the transaction ID.
292 Note, this overrides the option gen_transaction_id
297 Pointer to rmr_mbuf structure
299 sbuf = _rmr_alloc_msg(vctx, size)
301 # make sure the alloc worked
304 # set specified fields
306 set_payload_and_length(payload, sbuf)
308 if fixed_transaction_id:
309 set_transaction_id(sbuf, fixed_transaction_id)
310 elif gen_transaction_id:
311 generate_and_set_transaction_id(sbuf)
314 sbuf.contents.mtype = mtype
317 rmr_set_meid(sbuf, meid)
320 sbuf.contents.sub_id = sub_id
325 raise BadBufferAllocation
328 _rmr_realloc_payload = _wrap_rmr_function('rmr_realloc_payload', POINTER(rmr_mbuf_t), [POINTER(rmr_mbuf_t), c_int, c_int, c_int]) # new_len, copy, clone
331 def rmr_realloc_payload(ptr_mbuf: c_void_p, new_len: int, copy=False, clone=False):
333 Allocates and returns a message buffer large enough for the new length.
334 Refer to RMR C documentation for method::
336 extern rmr_mbuf_t* rmr_realloc_payload(rmr_mbuf_t*, int, int, int)
341 Pointer to rmr_mbuf structure
345 Whether to copy the original paylod
347 Whether to clone the original buffer
352 Pointer to rmr_mbuf structure
354 return _rmr_realloc_payload(ptr_mbuf, new_len, copy, clone)
357 _rmr_free_msg = _wrap_rmr_function('rmr_free_msg', None, [POINTER(rmr_mbuf_t)])
360 def rmr_free_msg(ptr_mbuf: c_void_p):
362 Releases the message buffer.
363 Refer to RMR C documentation for method::
365 extern void rmr_free_msg(rmr_mbuf_t* mbuf )
370 Pointer to rmr_mbuf structure
376 if ptr_mbuf is not None:
377 _rmr_free_msg(ptr_mbuf)
380 _rmr_payload_size = _wrap_rmr_function('rmr_payload_size', c_int, [POINTER(rmr_mbuf_t)])
383 def rmr_payload_size(ptr_mbuf: c_void_p) -> int:
385 Gets the number of bytes available in the payload.
386 Refer to RMR C documentation for method::
388 extern int rmr_payload_size(rmr_mbuf_t* msg)
393 Pointer to rmr_mbuf structure
398 Number of bytes available
400 return _rmr_payload_size(ptr_mbuf)
404 The following functions all seem to have the same interface
407 _rmr_send_msg = _wrap_rmr_function('rmr_send_msg', POINTER(rmr_mbuf_t), [c_void_p, POINTER(rmr_mbuf_t)])
410 def rmr_send_msg(vctx: c_void_p, ptr_mbuf: POINTER(rmr_mbuf_t)) -> POINTER(rmr_mbuf_t):
412 Sends the message according to the routing table and returns an empty buffer.
413 Refer to RMR C documentation for method::
415 extern rmr_mbuf_t* rmr_send_msg(void* vctx, rmr_mbuf_t* msg)
419 vctx: ctypes c_void_p
420 Pointer to RMR context
422 Pointer to rmr_mbuf structure
427 Pointer to rmr_mbuf structure
429 return _rmr_send_msg(vctx, ptr_mbuf)
432 # TODO: the old message (Send param) is actually optional, but I don't know how to specify that in Ctypes.
433 _rmr_rcv_msg = _wrap_rmr_function('rmr_rcv_msg', POINTER(rmr_mbuf_t), [c_void_p, POINTER(rmr_mbuf_t)])
436 def rmr_rcv_msg(vctx: c_void_p, ptr_mbuf: POINTER(rmr_mbuf_t)) -> POINTER(rmr_mbuf_t):
438 Waits for a message to arrive, and returns it.
439 Refer to RMR C documentation for method::
441 extern rmr_mbuf_t* rmr_rcv_msg(void* vctx, rmr_mbuf_t* old_msg)
445 vctx: ctypes c_void_p
446 Pointer to RMR context
448 Pointer to rmr_mbuf structure
453 Pointer to rmr_mbuf structure
455 return _rmr_rcv_msg(vctx, ptr_mbuf)
458 _rmr_torcv_msg = _wrap_rmr_function('rmr_torcv_msg', POINTER(rmr_mbuf_t), [c_void_p, POINTER(rmr_mbuf_t), c_int])
461 def rmr_torcv_msg(vctx: c_void_p, ptr_mbuf: POINTER(rmr_mbuf_t), ms_to: int) -> POINTER(rmr_mbuf_t):
463 Waits up to the timeout value for a message to arrive, and returns it.
464 Refer to RMR C documentation for method::
466 extern rmr_mbuf_t* rmr_torcv_msg(void* vctx, rmr_mbuf_t* old_msg, int ms_to)
470 vctx: ctypes c_void_p
471 Pointer to RMR context
473 Pointer to rmr_mbuf structure
475 Time out value in milliseconds
480 Pointer to rmr_mbuf structure
482 return _rmr_torcv_msg(vctx, ptr_mbuf, ms_to)
485 _rmr_rts_msg = _wrap_rmr_function('rmr_rts_msg', POINTER(rmr_mbuf_t), [c_void_p, POINTER(rmr_mbuf_t)])
488 def rmr_rts_msg(vctx: c_void_p, ptr_mbuf: POINTER(rmr_mbuf_t), payload=None, mtype=None) -> POINTER(rmr_mbuf_t):
490 Sends a message to the originating endpoint and returns an empty buffer.
491 Refer to RMR C documentation for method::
493 extern rmr_mbuf_t* rmr_rts_msg(void* vctx, rmr_mbuf_t* msg)
495 additional features beyond c-rmr:
496 if payload is not None, attempts to set the payload
497 if mtype is not None, sets the sbuf's message type
501 vctx: ctypes c_void_p
502 Pointer to an RMR context
503 ptr_mbuf: ctypes c_void_p
504 Pointer to an RMR message buffer
513 Pointer to rmr_mbuf structure
517 set_payload_and_length(payload, ptr_mbuf)
520 ptr_mbuf.contents.mtype = mtype
522 return _rmr_rts_msg(vctx, ptr_mbuf)
525 _rmr_call = _wrap_rmr_function('rmr_call', POINTER(rmr_mbuf_t), [c_void_p, POINTER(rmr_mbuf_t)])
528 def rmr_call(vctx: c_void_p, ptr_mbuf: POINTER(rmr_mbuf_t)) -> POINTER(rmr_mbuf_t):
530 Sends a message, waits for a response and returns it.
531 Refer to RMR C documentation for method::
533 extern rmr_mbuf_t* rmr_call(void* vctx, rmr_mbuf_t* msg)
537 ptr_mbuf: ctypes c_void_p
538 Pointer to an RMR message buffer
543 Pointer to rmr_mbuf structure
545 return _rmr_call(vctx, ptr_mbuf)
548 _rmr_bytes2meid = _wrap_rmr_function('rmr_bytes2meid', c_int, [POINTER(rmr_mbuf_t), c_char_p, c_int])
551 def rmr_set_meid(ptr_mbuf: POINTER(rmr_mbuf_t), byte_str: bytes) -> int:
553 Sets the managed entity field in the message and returns the number of bytes copied.
554 Refer to RMR C documentation for method::
556 extern int rmr_bytes2meid(rmr_mbuf_t* mbuf, unsigned char const* src, int len);
558 Caution: the meid length supported in an RMR message is 32 bytes, but C applications
559 expect this to be a nil terminated string and thus only 31 bytes are actually available.
561 Raises: exceptions.MeidSizeOutOfRang
565 ptr_mbuf: ctypes c_void_p
566 Pointer to an RMR message buffer
568 Managed entity ID value
573 number of bytes copied
575 max = _get_rmr_constant("RMR_MAX_MEID", 32)
576 if len(byte_str) >= max:
577 raise MeidSizeOutOfRange
579 return _rmr_bytes2meid(ptr_mbuf, byte_str, len(byte_str))
582 # CAUTION: Some of the C functions expect a mutable buffer to copy the bytes into;
583 # if there is a get_* function below, use it to set up and return the
586 # extern unsigned char* rmr_get_meid(rmr_mbuf_t* mbuf, unsigned char* dest);
587 # we don't provide direct access to this function (unless it is asked for) because it is not really useful to provide your own buffer.
588 # Rather, rmr_get_meid does this for you, and just returns the string.
589 _rmr_get_meid = _wrap_rmr_function('rmr_get_meid', c_char_p, [POINTER(rmr_mbuf_t), c_char_p])
592 def rmr_get_meid(ptr_mbuf: POINTER(rmr_mbuf_t)) -> bytes:
594 Gets the managed entity ID (meid) from the message header.
595 This is a python-friendly version of RMR C method::
597 extern unsigned char* rmr_get_meid(rmr_mbuf_t* mbuf, unsigned char* dest);
601 ptr_mbuf: ctypes c_void_p
602 Pointer to an RMR message buffer
609 sz = _get_rmr_constant("RMR_MAX_MEID", 32) # size for buffer to fill
610 buf = create_string_buffer(sz)
611 _rmr_get_meid(ptr_mbuf, buf)
615 _rmr_get_src = _wrap_rmr_function('rmr_get_src', c_char_p, [POINTER(rmr_mbuf_t), c_char_p])
618 def rmr_get_src(ptr_mbuf: POINTER(rmr_mbuf_t), dest: c_char_p) -> c_char_p:
620 Copies the message-source information to the buffer.
621 Refer to RMR C documentation for method::
623 extern unsigned char* rmr_get_src(rmr_mbuf_t* mbuf, unsigned char* dest);
627 ptr_mbuf: ctypes POINTER(rmr_mbuf_t)
628 Pointer to an RMR message buffer
629 dest: ctypes c_char_p
630 Pointer to a buffer to receive the message source
635 message-source information
637 return _rmr_get_src(ptr_mbuf, dest)
640 _rmr_set_vlevel = _wrap_rmr_function('rmr_set_vlevel', None, [c_int])
643 def rmr_set_vlevel(new_level: c_int):
645 Sets the verbosity level which determines the messages RMR writes to standard error.
646 Refer to RMR C documentation for method::
648 void rmr_set_vlevel( int new_level )
653 New logging verbosity level, an integer in the range 0 (none) to 5 (debug).
655 _rmr_set_vlevel(new_level)
658 # Methods that exist ONLY in rmr-python, and are not wrapped methods
659 # In hindsight, I wish i put these in a separate module, but leaving this here to prevent api breakage.
662 def get_payload(ptr_mbuf: c_void_p) -> bytes:
664 Gets the binary payload from the rmr_buf_t*.
668 ptr_mbuf: ctypes c_void_p
669 Pointer to an rmr message buffer
676 # Logic came from the answer here: https://stackoverflow.com/questions/55103298/python-ctypes-read-pointerc-char-in-python
677 sz = ptr_mbuf.contents.len
678 CharArr = c_char * sz
679 return CharArr(*ptr_mbuf.contents.payload[:sz]).raw
682 def get_xaction(ptr_mbuf: c_void_p) -> bytes:
684 Gets the transaction ID from the rmr_buf_t*.
688 ptr_mbuf: ctypes c_void_p
689 Pointer to an rmr message buffer
696 val = cast(ptr_mbuf.contents.xaction, c_char_p).value
697 sz = _get_rmr_constant("RMR_MAX_XID", 0)
701 def message_summary(ptr_mbuf: c_void_p) -> dict:
703 Builds a dict with the contents of an RMR message.
707 ptr_mbuf: ctypes c_void_p
708 Pointer to an RMR message buffer
713 Message content as key-value pairs; keys are defined as RMR_MS_* constants.
716 RMR_MS_PAYLOAD: get_payload(ptr_mbuf) if ptr_mbuf.contents.state == RMR_OK else None,
717 RMR_MS_PAYLOAD_LEN: ptr_mbuf.contents.len,
718 RMR_MS_MSG_TYPE: ptr_mbuf.contents.mtype,
719 RMR_MS_SUB_ID: ptr_mbuf.contents.sub_id,
720 RMR_MS_TRN_ID: get_xaction(ptr_mbuf),
721 RMR_MS_MSG_STATE: ptr_mbuf.contents.state,
722 RMR_MS_MSG_STATUS: state_to_status(ptr_mbuf.contents.state),
723 RMR_MS_PAYLOAD_MAX: rmr_payload_size(ptr_mbuf),
724 RMR_MS_MEID: rmr_get_meid(ptr_mbuf),
725 RMR_MS_MSG_SOURCE: get_src(ptr_mbuf),
726 RMR_MS_ERRNO: ptr_mbuf.contents.tp_state,
730 def set_payload_and_length(byte_str: bytes, ptr_mbuf: c_void_p):
732 Sets an rmr payload and content length.
737 the bytes to set the payload to
738 ptr_mbuf: ctypes c_void_p
739 Pointer to an rmr message buffer
745 if rmr_payload_size(ptr_mbuf) < len(byte_str): # existing message payload too small
746 ptr_mbuf = rmr_realloc_payload(ptr_mbuf, len(byte_str), True)
748 memmove(ptr_mbuf.contents.payload, byte_str, len(byte_str))
749 ptr_mbuf.contents.len = len(byte_str)
752 def generate_and_set_transaction_id(ptr_mbuf: c_void_p):
754 Generates a UUID and sets the RMR transaction id to it
758 ptr_mbuf: ctypes c_void_p
759 Pointer to an rmr message buffer
761 set_transaction_id(ptr_mbuf, uuid.uuid1().hex.encode("utf-8"))
764 def set_transaction_id(ptr_mbuf: c_void_p, tid_bytes: bytes):
766 Sets an RMR transaction id
767 TODO: on next API break, merge these two functions. Not done now to preserve API.
771 ptr_mbuf: ctypes c_void_p
772 Pointer to an rmr message buffer
774 bytes of the desired transaction id
776 sz = _get_rmr_constant("RMR_MAX_XID", 0)
777 memmove(ptr_mbuf.contents.xaction, tid_bytes, sz)
780 def get_src(ptr_mbuf: c_void_p) -> str:
782 Gets the message source (likely host:port)
786 ptr_mbuf: ctypes c_void_p
787 Pointer to an rmr message buffer
794 sz = _get_rmr_constant("RMR_MAX_SRC", 64) # size to fill
795 buf = create_string_buffer(sz)
796 rmr_get_src(ptr_mbuf, buf)
797 return buf.value.decode()