1 # ==================================================================================
2 # Copyright (c) 2019-2020 Nokia
3 # Copyright (c) 2018-2020 AT&T Intellectual Property.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ==================================================================================
18 from ctypes import POINTER, Structure
19 from ctypes import c_int, c_char, c_char_p, c_void_p, memmove, cast, create_string_buffer
21 from ricxappframe.rmr.exceptions import BadBufferAllocation, MeidSizeOutOfRange, InitFailed
22 from ricxappframe.rmr.rmrclib.rmrclib import rmr_c_lib, get_constants, state_to_status
29 def _get_rmr_constant(key: str, default=None):
31 Gets the constant with the named key from the RMR C library.
32 Returns None if the value is not a simple type. This happens
33 during sphinx autodoc document generation, which mocks the
34 rmrclib package to work without the RMR shared object file.
36 val = get_constants().get(key, default)
37 return val if isinstance(val, (type(None), str, int, float, bool)) else None
44 # Publish constants from RMR C-language header files for use by importers of this library.
45 # TODO: Are there others that will be useful?
46 #: Typical size message to receive; size is not limited
47 RMR_MAX_RCV_BYTES = _get_rmr_constant('RMR_MAX_RCV_BYTES')
48 #: Multi-threaded initialization flag
49 RMRFL_MTCALL = _get_rmr_constant('RMRFL_MTCALL', 0x02) # initialization flags
51 RMRFL_NONE = _get_rmr_constant('RMRFL_NONE', 0x0)
52 #: State constant for OK
53 RMR_OK = _get_rmr_constant('RMR_OK', 0x00)
54 #: State constant for timeout
55 RMR_ERR_TIMEOUT = _get_rmr_constant('RMR_ERR_TIMEOUT')
56 #: State constant for retry
57 RMR_ERR_RETRY = _get_rmr_constant('RMR_ERR_RETRY')
60 class rmr_mbuf_t(Structure):
62 Mirrors public members of type rmr_mbuf_t from RMR header file src/common/include/rmr.h
65 | int state; // state of processing
66 | int mtype; // message type
67 | int len; // length of data in the payload (send or received)
68 | unsigned char* payload; // transported data
69 | unsigned char* xaction; // pointer to fixed length transaction id bytes
70 | int sub_id; // subscription id
71 | int tp_state; // transport state (a.k.a errno)
73 | these things are off limits to the user application
75 | void* tp_buf; // underlying transport allocated pointer (e.g. nng message)
76 | void* header; // internal message header (whole buffer: header+payload)
77 | unsigned char* id; // if we need an ID in the message separate from the xaction id
78 | int flags; // various MFL (private) flags as needed
79 | int alloc_len; // the length of the allocated space (hdr+payload)
82 RE PAYLOADs type below, see the documentation for c_char_p:
84 Represents the C char * datatype when it points to a zero-terminated string.
85 For a general character pointer that may also point to binary data, POINTER(c_char)
86 must be used. The constructor accepts an integer address, or a bytes object.
93 ("payload", POINTER(c_char)), # according to the following the python bytes are already unsigned
94 # https://bytes.com/topic/python/answers/695078-ctypes-unsigned-char
95 ("xaction", POINTER(c_char)),
101 # argtypes and restype are important: https://stackoverflow.com/questions/24377845/ctype-why-specify-argtypes
102 _rmr_init = rmr_c_lib.rmr_init
103 _rmr_init.argtypes = [c_char_p, c_int, c_int]
104 _rmr_init.restype = c_void_p
107 def rmr_init(uproto_port: c_char_p, max_msg_size: int, flags: int) -> c_void_p:
109 Prepares the environment for sending and receiving messages.
110 Refer to RMR C documentation for method::
112 extern void* rmr_init(char* uproto_port, int max_msg_size, int flags)
114 This function raises an exception if the returned context is None.
118 uproto_port: c_char_p
119 Pointer to bytes built from the port number as a string; e.g., b'4550'
120 max_msg_size: integer
121 Maximum message size to receive
128 Pointer to RMR context
130 mrc = _rmr_init(uproto_port, max_msg_size, flags)
136 _rmr_ready = rmr_c_lib.rmr_ready
137 _rmr_ready.argtypes = [c_void_p]
138 _rmr_ready.restype = c_int
141 def rmr_ready(vctx: c_void_p) -> int:
143 Checks if a routing table has been received and installed.
144 Refer to RMR C documentation for method::
146 extern int rmr_ready(void* vctx)
150 vctx: ctypes c_void_p
151 Pointer to RMR context
157 return _rmr_ready(vctx)
160 _rmr_close = rmr_c_lib.rmr_close
161 _rmr_close.argtypes = [c_void_p]
164 def rmr_close(vctx: c_void_p):
166 Closes the listen socket.
167 Refer to RMR C documentation for method::
169 extern void rmr_close(void* vctx)
173 vctx: ctypes c_void_p
174 Pointer to RMR context
183 _rmr_set_stimeout = rmr_c_lib.rmr_set_stimeout
184 _rmr_set_stimeout.argtypes = [c_void_p, c_int]
185 _rmr_set_stimeout.restype = c_int
188 def rmr_set_stimeout(vctx: c_void_p, rloops: int) -> int:
190 Sets the configuration for how RMR will retry message send operations.
191 Refer to RMR C documentation for method::
193 extern int rmr_set_stimeout(void* vctx, int rloops)
197 vctx: ctypes c_void_p
198 Pointer to RMR context
200 Number of retry loops
204 0 on success, -1 on failure
206 return _rmr_set_stimeout(vctx, rloops)
209 _rmr_alloc_msg = rmr_c_lib.rmr_alloc_msg
210 _rmr_alloc_msg.argtypes = [c_void_p, c_int]
211 _rmr_alloc_msg.restype = POINTER(rmr_mbuf_t)
214 def rmr_alloc_msg(vctx: c_void_p, size: int,
215 payload=None, gen_transaction_id=False, mtype=None,
216 meid=None, sub_id=None, fixed_transaction_id=None):
218 Allocates and returns a buffer to write and send through the RMR library.
219 Refer to RMR C documentation for method::
221 extern rmr_mbuf_t* rmr_alloc_msg(void* vctx, int size)
223 Optionally populates the message from the remaining arguments.
225 TODO: on next API break, clean up transaction_id ugliness. Kept for now to preserve API.
229 vctx: ctypes c_void_p
230 Pointer to RMR context
232 How much space to allocate
234 if not None, attempts to set the payload
235 gen_transaction_id: bool
236 if True, generates and sets a transaction ID.
237 Note, option fixed_transaction_id overrides this option
239 if not None, sets the sbuf's message type
241 if not None, sets the sbuf's meid
243 if not None, sets the sbuf's subscription id
244 fixed_transaction_id: bytes
245 if not None, used as the transaction ID.
246 Note, this overrides the option gen_transaction_id
251 Pointer to rmr_mbuf structure
253 sbuf = _rmr_alloc_msg(vctx, size)
255 # make sure the alloc worked
258 # set specified fields
260 set_payload_and_length(payload, sbuf)
262 if fixed_transaction_id:
263 set_transaction_id(sbuf, fixed_transaction_id)
264 elif gen_transaction_id:
265 generate_and_set_transaction_id(sbuf)
268 sbuf.contents.mtype = mtype
271 rmr_set_meid(sbuf, meid)
274 sbuf.contents.sub_id = sub_id
279 raise BadBufferAllocation
282 _rmr_realloc_payload = rmr_c_lib.rmr_realloc_payload
283 _rmr_realloc_payload.argtypes = [POINTER(rmr_mbuf_t), c_int, c_int, c_int] # new_len, copy, clone
284 _rmr_realloc_payload.restype = POINTER(rmr_mbuf_t)
287 def rmr_realloc_payload(ptr_mbuf: c_void_p, new_len: int, copy=False, clone=False):
289 Allocates and returns a message buffer large enough for the new length.
290 Refer to RMR C documentation for method::
292 extern rmr_mbuf_t* rmr_realloc_payload(rmr_mbuf_t*, int, int, int)
297 Pointer to rmr_mbuf structure
301 Whether to copy the original paylod
303 Whether to clone the original buffer
308 Pointer to rmr_mbuf structure
310 return _rmr_realloc_payload(ptr_mbuf, new_len, copy, clone)
313 _rmr_free_msg = rmr_c_lib.rmr_free_msg
314 _rmr_free_msg.argtypes = [POINTER(rmr_mbuf_t)]
315 _rmr_free_msg.restype = None
318 def rmr_free_msg(ptr_mbuf: c_void_p):
320 Releases the message buffer.
321 Refer to RMR C documentation for method::
323 extern void rmr_free_msg(rmr_mbuf_t* mbuf )
328 Pointer to rmr_mbuf structure
334 if ptr_mbuf is not None:
335 _rmr_free_msg(ptr_mbuf)
338 _rmr_payload_size = rmr_c_lib.rmr_payload_size
339 _rmr_payload_size.argtypes = [POINTER(rmr_mbuf_t)]
340 _rmr_payload_size.restype = c_int
343 def rmr_payload_size(ptr_mbuf: c_void_p) -> int:
345 Gets the number of bytes available in the payload.
346 Refer to RMR C documentation for method::
348 extern int rmr_payload_size(rmr_mbuf_t* msg)
353 Pointer to rmr_mbuf structure
358 Number of bytes available
360 return _rmr_payload_size(ptr_mbuf)
364 The following functions all seem to have the same interface
367 _rmr_send_msg = rmr_c_lib.rmr_send_msg
368 _rmr_send_msg.argtypes = [c_void_p, POINTER(rmr_mbuf_t)]
369 _rmr_send_msg.restype = POINTER(rmr_mbuf_t)
372 def rmr_send_msg(vctx: c_void_p, ptr_mbuf: POINTER(rmr_mbuf_t)) -> POINTER(rmr_mbuf_t):
374 Sends the message according to the routing table and returns an empty buffer.
375 Refer to RMR C documentation for method::
377 extern rmr_mbuf_t* rmr_send_msg(void* vctx, rmr_mbuf_t* msg)
381 vctx: ctypes c_void_p
382 Pointer to RMR context
384 Pointer to rmr_mbuf structure
389 Pointer to rmr_mbuf structure
391 return _rmr_send_msg(vctx, ptr_mbuf)
394 # TODO: the old message (Send param) is actually optional, but I don't know how to specify that in Ctypes.
395 _rmr_rcv_msg = rmr_c_lib.rmr_rcv_msg
396 _rmr_rcv_msg.argtypes = [c_void_p, POINTER(rmr_mbuf_t)]
397 _rmr_rcv_msg.restype = POINTER(rmr_mbuf_t)
400 def rmr_rcv_msg(vctx: c_void_p, ptr_mbuf: POINTER(rmr_mbuf_t)) -> POINTER(rmr_mbuf_t):
402 Waits for a message to arrive, and returns it.
403 Refer to RMR C documentation for method::
405 extern rmr_mbuf_t* rmr_rcv_msg(void* vctx, rmr_mbuf_t* old_msg)
409 vctx: ctypes c_void_p
410 Pointer to RMR context
412 Pointer to rmr_mbuf structure
417 Pointer to rmr_mbuf structure
419 return _rmr_rcv_msg(vctx, ptr_mbuf)
422 _rmr_torcv_msg = rmr_c_lib.rmr_torcv_msg
423 _rmr_torcv_msg.argtypes = [c_void_p, POINTER(rmr_mbuf_t), c_int]
424 _rmr_torcv_msg.restype = POINTER(rmr_mbuf_t)
427 def rmr_torcv_msg(vctx: c_void_p, ptr_mbuf: POINTER(rmr_mbuf_t), ms_to: int) -> POINTER(rmr_mbuf_t):
429 Waits up to the timeout value for a message to arrive, and returns it.
430 Refer to RMR C documentation for method::
432 extern rmr_mbuf_t* rmr_torcv_msg(void* vctx, rmr_mbuf_t* old_msg, int ms_to)
436 vctx: ctypes c_void_p
437 Pointer to RMR context
439 Pointer to rmr_mbuf structure
441 Time out value in milliseconds
446 Pointer to rmr_mbuf structure
448 return _rmr_torcv_msg(vctx, ptr_mbuf, ms_to)
451 _rmr_rts_msg = rmr_c_lib.rmr_rts_msg
452 _rmr_rts_msg.argtypes = [c_void_p, POINTER(rmr_mbuf_t)]
453 _rmr_rts_msg.restype = POINTER(rmr_mbuf_t)
456 def rmr_rts_msg(vctx: c_void_p, ptr_mbuf: POINTER(rmr_mbuf_t), payload=None, mtype=None) -> POINTER(rmr_mbuf_t):
458 Sends a message to the originating endpoint and returns an empty buffer.
459 Refer to RMR C documentation for method::
461 extern rmr_mbuf_t* rmr_rts_msg(void* vctx, rmr_mbuf_t* msg)
463 additional features beyond c-rmr:
464 if payload is not None, attempts to set the payload
465 if mtype is not None, sets the sbuf's message type
469 vctx: ctypes c_void_p
470 Pointer to an RMR context
471 ptr_mbuf: ctypes c_void_p
472 Pointer to an RMR message buffer
481 Pointer to rmr_mbuf structure
485 set_payload_and_length(payload, ptr_mbuf)
488 ptr_mbuf.contents.mtype = mtype
490 return _rmr_rts_msg(vctx, ptr_mbuf)
493 _rmr_call = rmr_c_lib.rmr_call
494 _rmr_call.argtypes = [c_void_p, POINTER(rmr_mbuf_t)]
495 _rmr_call.restype = POINTER(rmr_mbuf_t)
498 def rmr_call(vctx: c_void_p, ptr_mbuf: POINTER(rmr_mbuf_t)) -> POINTER(rmr_mbuf_t):
500 Sends a message, waits for a response and returns it.
501 Refer to RMR C documentation for method::
503 extern rmr_mbuf_t* rmr_call(void* vctx, rmr_mbuf_t* msg)
507 ptr_mbuf: ctypes c_void_p
508 Pointer to an RMR message buffer
513 Pointer to rmr_mbuf structure
515 return _rmr_call(vctx, ptr_mbuf)
518 _rmr_bytes2meid = rmr_c_lib.rmr_bytes2meid
519 _rmr_bytes2meid.argtypes = [POINTER(rmr_mbuf_t), c_char_p, c_int]
520 _rmr_bytes2meid.restype = c_int
523 def rmr_set_meid(ptr_mbuf: POINTER(rmr_mbuf_t), byte_str: bytes) -> int:
525 Sets the managed entity field in the message and returns the number of bytes copied.
526 Refer to RMR C documentation for method::
528 extern int rmr_bytes2meid(rmr_mbuf_t* mbuf, unsigned char const* src, int len);
530 Caution: the meid length supported in an RMR message is 32 bytes, but C applications
531 expect this to be a nil terminated string and thus only 31 bytes are actually available.
533 Raises: exceptions.MeidSizeOutOfRang
537 ptr_mbuf: ctypes c_void_p
538 Pointer to an RMR message buffer
540 Managed entity ID value
545 number of bytes copied
547 max = _get_rmr_constant("RMR_MAX_MEID", 32)
548 if len(byte_str) >= max:
549 raise MeidSizeOutOfRange
551 return _rmr_bytes2meid(ptr_mbuf, byte_str, len(byte_str))
554 # CAUTION: Some of the C functions expect a mutable buffer to copy the bytes into;
555 # if there is a get_* function below, use it to set up and return the
558 # extern unsigned char* rmr_get_meid(rmr_mbuf_t* mbuf, unsigned char* dest);
559 # we don't provide direct access to this function (unless it is asked for) because it is not really useful to provide your own buffer.
560 # Rather, rmr_get_meid does this for you, and just returns the string.
561 _rmr_get_meid = rmr_c_lib.rmr_get_meid
562 _rmr_get_meid.argtypes = [POINTER(rmr_mbuf_t), c_char_p]
563 _rmr_get_meid.restype = c_char_p
566 def rmr_get_meid(ptr_mbuf: POINTER(rmr_mbuf_t)) -> bytes:
568 Gets the managed entity ID (meid) from the message header.
569 This is a python-friendly version of RMR C method::
571 extern unsigned char* rmr_get_meid(rmr_mbuf_t* mbuf, unsigned char* dest);
575 ptr_mbuf: ctypes c_void_p
576 Pointer to an RMR message buffer
583 sz = _get_rmr_constant("RMR_MAX_MEID", 32) # size for buffer to fill
584 buf = create_string_buffer(sz)
585 _rmr_get_meid(ptr_mbuf, buf)
589 _rmr_get_src = rmr_c_lib.rmr_get_src
590 _rmr_get_src.argtypes = [POINTER(rmr_mbuf_t), c_char_p]
591 _rmr_get_src.restype = c_char_p
594 def rmr_get_src(ptr_mbuf: POINTER(rmr_mbuf_t), dest: c_char_p) -> c_char_p:
596 Copies the message-source information to the buffer.
597 Refer to RMR C documentation for method::
599 extern unsigned char* rmr_get_src(rmr_mbuf_t* mbuf, unsigned char* dest);
603 ptr_mbuf: ctypes POINTER(rmr_mbuf_t)
604 Pointer to an RMR message buffer
605 dest: ctypes c_char_p
606 Pointer to a buffer to receive the message source
611 message-source information
613 return _rmr_get_src(ptr_mbuf, dest)
616 # Methods that exist ONLY in rmr-python, and are not wrapped methods
617 # In hindsight, I wish i put these in a separate module, but leaving this here to prevent api breakage.
620 def get_payload(ptr_mbuf: c_void_p) -> bytes:
622 Gets the binary payload from the rmr_buf_t*.
626 ptr_mbuf: ctypes c_void_p
627 Pointer to an rmr message buffer
634 # Logic came from the answer here: https://stackoverflow.com/questions/55103298/python-ctypes-read-pointerc-char-in-python
635 sz = ptr_mbuf.contents.len
636 CharArr = c_char * sz
637 return CharArr(*ptr_mbuf.contents.payload[:sz]).raw
640 def get_xaction(ptr_mbuf: c_void_p) -> bytes:
642 Gets the transaction ID from the rmr_buf_t*.
646 ptr_mbuf: ctypes c_void_p
647 Pointer to an rmr message buffer
654 val = cast(ptr_mbuf.contents.xaction, c_char_p).value
655 sz = _get_rmr_constant("RMR_MAX_XID", 0)
659 def message_summary(ptr_mbuf: c_void_p) -> dict:
661 Returns a dict with the fields of an RMR message.
665 ptr_mbuf: ctypes c_void_p
666 Pointer to an rmr message buffer
674 "payload": get_payload(ptr_mbuf) if ptr_mbuf.contents.state == RMR_OK else None,
675 "payload length": ptr_mbuf.contents.len,
676 "message type": ptr_mbuf.contents.mtype,
677 "subscription id": ptr_mbuf.contents.sub_id,
678 "transaction id": get_xaction(ptr_mbuf),
679 "message state": ptr_mbuf.contents.state,
680 "message status": state_to_status(ptr_mbuf.contents.state),
681 "payload max size": rmr_payload_size(ptr_mbuf),
682 "meid": rmr_get_meid(ptr_mbuf),
683 "message source": get_src(ptr_mbuf),
684 "errno": ptr_mbuf.contents.tp_state,
688 def set_payload_and_length(byte_str: bytes, ptr_mbuf: c_void_p):
690 Sets an rmr payload and content length.
695 the bytes to set the payload to
696 ptr_mbuf: ctypes c_void_p
697 Pointer to an rmr message buffer
703 if rmr_payload_size(ptr_mbuf) < len(byte_str): # existing message payload too small
704 ptr_mbuf = rmr_realloc_payload(ptr_mbuf, len(byte_str), True)
706 memmove(ptr_mbuf.contents.payload, byte_str, len(byte_str))
707 ptr_mbuf.contents.len = len(byte_str)
710 def generate_and_set_transaction_id(ptr_mbuf: c_void_p):
712 Generates a UUID and sets the RMR transaction id to it
716 ptr_mbuf: ctypes c_void_p
717 Pointer to an rmr message buffer
719 set_transaction_id(ptr_mbuf, uuid.uuid1().hex.encode("utf-8"))
722 def set_transaction_id(ptr_mbuf: c_void_p, tid_bytes: bytes):
724 Sets an RMR transaction id
725 TODO: on next API break, merge these two functions. Not done now to preserve API.
729 ptr_mbuf: ctypes c_void_p
730 Pointer to an rmr message buffer
732 bytes of the desired transaction id
734 sz = _get_rmr_constant("RMR_MAX_XID", 0)
735 memmove(ptr_mbuf.contents.xaction, tid_bytes, sz)
738 def get_src(ptr_mbuf: c_void_p) -> str:
740 Gets the message source (likely host:port)
744 ptr_mbuf: ctypes c_void_p
745 Pointer to an rmr message buffer
752 sz = _get_rmr_constant("RMR_MAX_SRC", 64) # size to fill
753 buf = create_string_buffer(sz)
754 rmr_get_src(ptr_mbuf, buf)
755 return buf.value.decode()