1 # ==================================================================================
2 # Copyright (c) 2019-2020 Nokia
3 # Copyright (c) 2018-2020 AT&T Intellectual Property.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ==================================================================================
18 from ctypes import POINTER, Structure
19 from ctypes import c_int, c_char, c_char_p, c_void_p, memmove, cast, create_string_buffer
21 from ricxappframe.rmr.exceptions import BadBufferAllocation, MeidSizeOutOfRange, InitFailed
22 from ricxappframe.rmr.rmrclib.rmrclib import rmr_c_lib, get_constants, state_to_status
29 def _get_rmr_constant(key: str, default=None):
31 Gets the constant with the named key from the RMR C library.
32 Returns None if the value is not a simple type. This happens
33 during sphinx autodoc document generation, which mocks the
34 rmrclib package to work without the RMR shared object file,
35 and the response is something like this:
36 <class 'ricxappframe.rmr.rmrclib.rmrclib.get_constants.get'>
38 val = get_constants().get(key, default)
39 return val if isinstance(val, (type(None), bool, bytes, float, int, str)) else None
46 # Publish constants from RMR C-language header files for use by importers of this library.
47 # TODO: Are there others that will be useful?
48 #: Typical size message to receive; size is not limited
49 RMR_MAX_RCV_BYTES = _get_rmr_constant('RMR_MAX_RCV_BYTES')
50 #: Multi-threaded initialization flag
51 RMRFL_MTCALL = _get_rmr_constant('RMRFL_MTCALL', 0x02) # initialization flags
53 RMRFL_NONE = _get_rmr_constant('RMRFL_NONE', 0x0)
54 #: State constant for OK
55 RMR_OK = _get_rmr_constant('RMR_OK', 0x00)
56 #: State constant for timeout
57 RMR_ERR_TIMEOUT = _get_rmr_constant('RMR_ERR_TIMEOUT')
58 #: State constant for retry
59 RMR_ERR_RETRY = _get_rmr_constant('RMR_ERR_RETRY')
62 class rmr_mbuf_t(Structure):
64 Mirrors public members of type rmr_mbuf_t from RMR header file src/common/include/rmr.h
67 | int state; // state of processing
68 | int mtype; // message type
69 | int len; // length of data in the payload (send or received)
70 | unsigned char* payload; // transported data
71 | unsigned char* xaction; // pointer to fixed length transaction id bytes
72 | int sub_id; // subscription id
73 | int tp_state; // transport state (a.k.a errno)
75 | these things are off limits to the user application
77 | void* tp_buf; // underlying transport allocated pointer (e.g. nng message)
78 | void* header; // internal message header (whole buffer: header+payload)
79 | unsigned char* id; // if we need an ID in the message separate from the xaction id
80 | int flags; // various MFL (private) flags as needed
81 | int alloc_len; // the length of the allocated space (hdr+payload)
84 RE PAYLOADs type below, see the documentation for c_char_p:
86 Represents the C char * datatype when it points to a zero-terminated string.
87 For a general character pointer that may also point to binary data, POINTER(c_char)
88 must be used. The constructor accepts an integer address, or a bytes object.
95 ("payload", POINTER(c_char)), # according to the following the python bytes are already unsigned
96 # https://bytes.com/topic/python/answers/695078-ctypes-unsigned-char
97 ("xaction", POINTER(c_char)),
103 # argtypes and restype are important: https://stackoverflow.com/questions/24377845/ctype-why-specify-argtypes
104 _rmr_init = rmr_c_lib.rmr_init
105 _rmr_init.argtypes = [c_char_p, c_int, c_int]
106 _rmr_init.restype = c_void_p
109 def rmr_init(uproto_port: c_char_p, max_msg_size: int, flags: int) -> c_void_p:
111 Prepares the environment for sending and receiving messages.
112 Refer to RMR C documentation for method::
114 extern void* rmr_init(char* uproto_port, int max_msg_size, int flags)
116 This function raises an exception if the returned context is None.
120 uproto_port: c_char_p
121 Pointer to bytes built from the port number as a string; e.g., b'4550'
122 max_msg_size: integer
123 Maximum message size to receive
130 Pointer to RMR context
132 mrc = _rmr_init(uproto_port, max_msg_size, flags)
138 _rmr_ready = rmr_c_lib.rmr_ready
139 _rmr_ready.argtypes = [c_void_p]
140 _rmr_ready.restype = c_int
143 def rmr_ready(vctx: c_void_p) -> int:
145 Checks if a routing table has been received and installed.
146 Refer to RMR C documentation for method::
148 extern int rmr_ready(void* vctx)
152 vctx: ctypes c_void_p
153 Pointer to RMR context
159 return _rmr_ready(vctx)
162 _rmr_close = rmr_c_lib.rmr_close
163 _rmr_close.argtypes = [c_void_p]
166 def rmr_close(vctx: c_void_p):
168 Closes the listen socket.
169 Refer to RMR C documentation for method::
171 extern void rmr_close(void* vctx)
175 vctx: ctypes c_void_p
176 Pointer to RMR context
185 _rmr_set_stimeout = rmr_c_lib.rmr_set_stimeout
186 _rmr_set_stimeout.argtypes = [c_void_p, c_int]
187 _rmr_set_stimeout.restype = c_int
190 def rmr_set_stimeout(vctx: c_void_p, rloops: int) -> int:
192 Sets the configuration for how RMR will retry message send operations.
193 Refer to RMR C documentation for method::
195 extern int rmr_set_stimeout(void* vctx, int rloops)
199 vctx: ctypes c_void_p
200 Pointer to RMR context
202 Number of retry loops
206 0 on success, -1 on failure
208 return _rmr_set_stimeout(vctx, rloops)
211 _rmr_alloc_msg = rmr_c_lib.rmr_alloc_msg
212 _rmr_alloc_msg.argtypes = [c_void_p, c_int]
213 _rmr_alloc_msg.restype = POINTER(rmr_mbuf_t)
216 def rmr_alloc_msg(vctx: c_void_p, size: int,
217 payload=None, gen_transaction_id=False, mtype=None,
218 meid=None, sub_id=None, fixed_transaction_id=None):
220 Allocates and returns a buffer to write and send through the RMR library.
221 Refer to RMR C documentation for method::
223 extern rmr_mbuf_t* rmr_alloc_msg(void* vctx, int size)
225 Optionally populates the message from the remaining arguments.
227 TODO: on next API break, clean up transaction_id ugliness. Kept for now to preserve API.
231 vctx: ctypes c_void_p
232 Pointer to RMR context
234 How much space to allocate
236 if not None, attempts to set the payload
237 gen_transaction_id: bool
238 if True, generates and sets a transaction ID.
239 Note, option fixed_transaction_id overrides this option
241 if not None, sets the sbuf's message type
243 if not None, sets the sbuf's meid
245 if not None, sets the sbuf's subscription id
246 fixed_transaction_id: bytes
247 if not None, used as the transaction ID.
248 Note, this overrides the option gen_transaction_id
253 Pointer to rmr_mbuf structure
255 sbuf = _rmr_alloc_msg(vctx, size)
257 # make sure the alloc worked
260 # set specified fields
262 set_payload_and_length(payload, sbuf)
264 if fixed_transaction_id:
265 set_transaction_id(sbuf, fixed_transaction_id)
266 elif gen_transaction_id:
267 generate_and_set_transaction_id(sbuf)
270 sbuf.contents.mtype = mtype
273 rmr_set_meid(sbuf, meid)
276 sbuf.contents.sub_id = sub_id
281 raise BadBufferAllocation
284 _rmr_realloc_payload = rmr_c_lib.rmr_realloc_payload
285 _rmr_realloc_payload.argtypes = [POINTER(rmr_mbuf_t), c_int, c_int, c_int] # new_len, copy, clone
286 _rmr_realloc_payload.restype = POINTER(rmr_mbuf_t)
289 def rmr_realloc_payload(ptr_mbuf: c_void_p, new_len: int, copy=False, clone=False):
291 Allocates and returns a message buffer large enough for the new length.
292 Refer to RMR C documentation for method::
294 extern rmr_mbuf_t* rmr_realloc_payload(rmr_mbuf_t*, int, int, int)
299 Pointer to rmr_mbuf structure
303 Whether to copy the original paylod
305 Whether to clone the original buffer
310 Pointer to rmr_mbuf structure
312 return _rmr_realloc_payload(ptr_mbuf, new_len, copy, clone)
315 _rmr_free_msg = rmr_c_lib.rmr_free_msg
316 _rmr_free_msg.argtypes = [POINTER(rmr_mbuf_t)]
317 _rmr_free_msg.restype = None
320 def rmr_free_msg(ptr_mbuf: c_void_p):
322 Releases the message buffer.
323 Refer to RMR C documentation for method::
325 extern void rmr_free_msg(rmr_mbuf_t* mbuf )
330 Pointer to rmr_mbuf structure
336 if ptr_mbuf is not None:
337 _rmr_free_msg(ptr_mbuf)
340 _rmr_payload_size = rmr_c_lib.rmr_payload_size
341 _rmr_payload_size.argtypes = [POINTER(rmr_mbuf_t)]
342 _rmr_payload_size.restype = c_int
345 def rmr_payload_size(ptr_mbuf: c_void_p) -> int:
347 Gets the number of bytes available in the payload.
348 Refer to RMR C documentation for method::
350 extern int rmr_payload_size(rmr_mbuf_t* msg)
355 Pointer to rmr_mbuf structure
360 Number of bytes available
362 return _rmr_payload_size(ptr_mbuf)
366 The following functions all seem to have the same interface
369 _rmr_send_msg = rmr_c_lib.rmr_send_msg
370 _rmr_send_msg.argtypes = [c_void_p, POINTER(rmr_mbuf_t)]
371 _rmr_send_msg.restype = POINTER(rmr_mbuf_t)
374 def rmr_send_msg(vctx: c_void_p, ptr_mbuf: POINTER(rmr_mbuf_t)) -> POINTER(rmr_mbuf_t):
376 Sends the message according to the routing table and returns an empty buffer.
377 Refer to RMR C documentation for method::
379 extern rmr_mbuf_t* rmr_send_msg(void* vctx, rmr_mbuf_t* msg)
383 vctx: ctypes c_void_p
384 Pointer to RMR context
386 Pointer to rmr_mbuf structure
391 Pointer to rmr_mbuf structure
393 return _rmr_send_msg(vctx, ptr_mbuf)
396 # TODO: the old message (Send param) is actually optional, but I don't know how to specify that in Ctypes.
397 _rmr_rcv_msg = rmr_c_lib.rmr_rcv_msg
398 _rmr_rcv_msg.argtypes = [c_void_p, POINTER(rmr_mbuf_t)]
399 _rmr_rcv_msg.restype = POINTER(rmr_mbuf_t)
402 def rmr_rcv_msg(vctx: c_void_p, ptr_mbuf: POINTER(rmr_mbuf_t)) -> POINTER(rmr_mbuf_t):
404 Waits for a message to arrive, and returns it.
405 Refer to RMR C documentation for method::
407 extern rmr_mbuf_t* rmr_rcv_msg(void* vctx, rmr_mbuf_t* old_msg)
411 vctx: ctypes c_void_p
412 Pointer to RMR context
414 Pointer to rmr_mbuf structure
419 Pointer to rmr_mbuf structure
421 return _rmr_rcv_msg(vctx, ptr_mbuf)
424 _rmr_torcv_msg = rmr_c_lib.rmr_torcv_msg
425 _rmr_torcv_msg.argtypes = [c_void_p, POINTER(rmr_mbuf_t), c_int]
426 _rmr_torcv_msg.restype = POINTER(rmr_mbuf_t)
429 def rmr_torcv_msg(vctx: c_void_p, ptr_mbuf: POINTER(rmr_mbuf_t), ms_to: int) -> POINTER(rmr_mbuf_t):
431 Waits up to the timeout value for a message to arrive, and returns it.
432 Refer to RMR C documentation for method::
434 extern rmr_mbuf_t* rmr_torcv_msg(void* vctx, rmr_mbuf_t* old_msg, int ms_to)
438 vctx: ctypes c_void_p
439 Pointer to RMR context
441 Pointer to rmr_mbuf structure
443 Time out value in milliseconds
448 Pointer to rmr_mbuf structure
450 return _rmr_torcv_msg(vctx, ptr_mbuf, ms_to)
453 _rmr_rts_msg = rmr_c_lib.rmr_rts_msg
454 _rmr_rts_msg.argtypes = [c_void_p, POINTER(rmr_mbuf_t)]
455 _rmr_rts_msg.restype = POINTER(rmr_mbuf_t)
458 def rmr_rts_msg(vctx: c_void_p, ptr_mbuf: POINTER(rmr_mbuf_t), payload=None, mtype=None) -> POINTER(rmr_mbuf_t):
460 Sends a message to the originating endpoint and returns an empty buffer.
461 Refer to RMR C documentation for method::
463 extern rmr_mbuf_t* rmr_rts_msg(void* vctx, rmr_mbuf_t* msg)
465 additional features beyond c-rmr:
466 if payload is not None, attempts to set the payload
467 if mtype is not None, sets the sbuf's message type
471 vctx: ctypes c_void_p
472 Pointer to an RMR context
473 ptr_mbuf: ctypes c_void_p
474 Pointer to an RMR message buffer
483 Pointer to rmr_mbuf structure
487 set_payload_and_length(payload, ptr_mbuf)
490 ptr_mbuf.contents.mtype = mtype
492 return _rmr_rts_msg(vctx, ptr_mbuf)
495 _rmr_call = rmr_c_lib.rmr_call
496 _rmr_call.argtypes = [c_void_p, POINTER(rmr_mbuf_t)]
497 _rmr_call.restype = POINTER(rmr_mbuf_t)
500 def rmr_call(vctx: c_void_p, ptr_mbuf: POINTER(rmr_mbuf_t)) -> POINTER(rmr_mbuf_t):
502 Sends a message, waits for a response and returns it.
503 Refer to RMR C documentation for method::
505 extern rmr_mbuf_t* rmr_call(void* vctx, rmr_mbuf_t* msg)
509 ptr_mbuf: ctypes c_void_p
510 Pointer to an RMR message buffer
515 Pointer to rmr_mbuf structure
517 return _rmr_call(vctx, ptr_mbuf)
520 _rmr_bytes2meid = rmr_c_lib.rmr_bytes2meid
521 _rmr_bytes2meid.argtypes = [POINTER(rmr_mbuf_t), c_char_p, c_int]
522 _rmr_bytes2meid.restype = c_int
525 def rmr_set_meid(ptr_mbuf: POINTER(rmr_mbuf_t), byte_str: bytes) -> int:
527 Sets the managed entity field in the message and returns the number of bytes copied.
528 Refer to RMR C documentation for method::
530 extern int rmr_bytes2meid(rmr_mbuf_t* mbuf, unsigned char const* src, int len);
532 Caution: the meid length supported in an RMR message is 32 bytes, but C applications
533 expect this to be a nil terminated string and thus only 31 bytes are actually available.
535 Raises: exceptions.MeidSizeOutOfRang
539 ptr_mbuf: ctypes c_void_p
540 Pointer to an RMR message buffer
542 Managed entity ID value
547 number of bytes copied
549 max = _get_rmr_constant("RMR_MAX_MEID", 32)
550 if len(byte_str) >= max:
551 raise MeidSizeOutOfRange
553 return _rmr_bytes2meid(ptr_mbuf, byte_str, len(byte_str))
556 # CAUTION: Some of the C functions expect a mutable buffer to copy the bytes into;
557 # if there is a get_* function below, use it to set up and return the
560 # extern unsigned char* rmr_get_meid(rmr_mbuf_t* mbuf, unsigned char* dest);
561 # we don't provide direct access to this function (unless it is asked for) because it is not really useful to provide your own buffer.
562 # Rather, rmr_get_meid does this for you, and just returns the string.
563 _rmr_get_meid = rmr_c_lib.rmr_get_meid
564 _rmr_get_meid.argtypes = [POINTER(rmr_mbuf_t), c_char_p]
565 _rmr_get_meid.restype = c_char_p
568 def rmr_get_meid(ptr_mbuf: POINTER(rmr_mbuf_t)) -> bytes:
570 Gets the managed entity ID (meid) from the message header.
571 This is a python-friendly version of RMR C method::
573 extern unsigned char* rmr_get_meid(rmr_mbuf_t* mbuf, unsigned char* dest);
577 ptr_mbuf: ctypes c_void_p
578 Pointer to an RMR message buffer
585 sz = _get_rmr_constant("RMR_MAX_MEID", 32) # size for buffer to fill
586 buf = create_string_buffer(sz)
587 _rmr_get_meid(ptr_mbuf, buf)
591 _rmr_get_src = rmr_c_lib.rmr_get_src
592 _rmr_get_src.argtypes = [POINTER(rmr_mbuf_t), c_char_p]
593 _rmr_get_src.restype = c_char_p
596 def rmr_get_src(ptr_mbuf: POINTER(rmr_mbuf_t), dest: c_char_p) -> c_char_p:
598 Copies the message-source information to the buffer.
599 Refer to RMR C documentation for method::
601 extern unsigned char* rmr_get_src(rmr_mbuf_t* mbuf, unsigned char* dest);
605 ptr_mbuf: ctypes POINTER(rmr_mbuf_t)
606 Pointer to an RMR message buffer
607 dest: ctypes c_char_p
608 Pointer to a buffer to receive the message source
613 message-source information
615 return _rmr_get_src(ptr_mbuf, dest)
618 # Methods that exist ONLY in rmr-python, and are not wrapped methods
619 # In hindsight, I wish i put these in a separate module, but leaving this here to prevent api breakage.
622 def get_payload(ptr_mbuf: c_void_p) -> bytes:
624 Gets the binary payload from the rmr_buf_t*.
628 ptr_mbuf: ctypes c_void_p
629 Pointer to an rmr message buffer
636 # Logic came from the answer here: https://stackoverflow.com/questions/55103298/python-ctypes-read-pointerc-char-in-python
637 sz = ptr_mbuf.contents.len
638 CharArr = c_char * sz
639 return CharArr(*ptr_mbuf.contents.payload[:sz]).raw
642 def get_xaction(ptr_mbuf: c_void_p) -> bytes:
644 Gets the transaction ID from the rmr_buf_t*.
648 ptr_mbuf: ctypes c_void_p
649 Pointer to an rmr message buffer
656 val = cast(ptr_mbuf.contents.xaction, c_char_p).value
657 sz = _get_rmr_constant("RMR_MAX_XID", 0)
661 def message_summary(ptr_mbuf: c_void_p) -> dict:
663 Returns a dict with the fields of an RMR message.
667 ptr_mbuf: ctypes c_void_p
668 Pointer to an rmr message buffer
676 "payload": get_payload(ptr_mbuf) if ptr_mbuf.contents.state == RMR_OK else None,
677 "payload length": ptr_mbuf.contents.len,
678 "message type": ptr_mbuf.contents.mtype,
679 "subscription id": ptr_mbuf.contents.sub_id,
680 "transaction id": get_xaction(ptr_mbuf),
681 "message state": ptr_mbuf.contents.state,
682 "message status": state_to_status(ptr_mbuf.contents.state),
683 "payload max size": rmr_payload_size(ptr_mbuf),
684 "meid": rmr_get_meid(ptr_mbuf),
685 "message source": get_src(ptr_mbuf),
686 "errno": ptr_mbuf.contents.tp_state,
690 def set_payload_and_length(byte_str: bytes, ptr_mbuf: c_void_p):
692 Sets an rmr payload and content length.
697 the bytes to set the payload to
698 ptr_mbuf: ctypes c_void_p
699 Pointer to an rmr message buffer
705 if rmr_payload_size(ptr_mbuf) < len(byte_str): # existing message payload too small
706 ptr_mbuf = rmr_realloc_payload(ptr_mbuf, len(byte_str), True)
708 memmove(ptr_mbuf.contents.payload, byte_str, len(byte_str))
709 ptr_mbuf.contents.len = len(byte_str)
712 def generate_and_set_transaction_id(ptr_mbuf: c_void_p):
714 Generates a UUID and sets the RMR transaction id to it
718 ptr_mbuf: ctypes c_void_p
719 Pointer to an rmr message buffer
721 set_transaction_id(ptr_mbuf, uuid.uuid1().hex.encode("utf-8"))
724 def set_transaction_id(ptr_mbuf: c_void_p, tid_bytes: bytes):
726 Sets an RMR transaction id
727 TODO: on next API break, merge these two functions. Not done now to preserve API.
731 ptr_mbuf: ctypes c_void_p
732 Pointer to an rmr message buffer
734 bytes of the desired transaction id
736 sz = _get_rmr_constant("RMR_MAX_XID", 0)
737 memmove(ptr_mbuf.contents.xaction, tid_bytes, sz)
740 def get_src(ptr_mbuf: c_void_p) -> str:
742 Gets the message source (likely host:port)
746 ptr_mbuf: ctypes c_void_p
747 Pointer to an rmr message buffer
754 sz = _get_rmr_constant("RMR_MAX_SRC", 64) # size to fill
755 buf = create_string_buffer(sz)
756 rmr_get_src(ptr_mbuf, buf)
757 return buf.value.decode()