1 // : vi ts=4 sw=4 noet :
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: This is the compile point for the nng version of the rmr
24 library (formarly known as uta, so internal function names
25 are likely still uta_*)
27 With the exception of the symtab portion of the library,
28 RMr is built with a single compile so as to "hide" the
29 internal functions as statics. Because they interdepend
30 on each other, and CMake has issues with generating two
31 different wormhole objects from a single source, we just
32 pull it all together with a centralised comple using
35 Future: the API functions at this point can be separated
36 into a common source module.
38 Author: E. Scott Daniels
52 #include <arpa/inet.h>
55 #include <nng/protocol/pubsub0/pub.h>
56 #include <nng/protocol/pubsub0/sub.h>
57 #include <nng/protocol/pipeline0/push.h>
58 #include <nng/protocol/pipeline0/pull.h>
61 #include "rmr.h" // things the users see
62 #include "rmr_agnostic.h" // agnostic things (must be included before private)
63 #include "rmr_nng_private.h" // things that we need too
64 #include "rmr_symtab.h"
66 #include "ring_static.c" // message ring support
67 #include "rt_generic_static.c" // route table things not transport specific
68 #include "rtable_nng_static.c" // route table things -- transport specific
69 #include "rtc_static.c" // route table collector
70 #include "tools_static.c"
71 #include "sr_nng_static.c" // send/receive static functions
72 #include "wormholes.c" // wormhole api externals and related static functions (must be LAST!)
75 //------------------------------------------------------------------------------
81 static void free_ctx( uta_ctx_t* ctx ) {
84 free( ctx->rtg_addr );
89 // --------------- public functions --------------------------------------------------------------------------
92 Returns the size of the payload (bytes) that the msg buffer references.
93 Len in a message is the number of bytes which were received, or should
94 be transmitted, however, it is possible that the mbuf was allocated
95 with a larger payload space than the payload length indicates; this
96 function returns the absolute maximum space that the user has available
97 in the payload. On error (bad msg buffer) -1 is returned and errno should
100 extern int rmr_payload_size( rmr_mbuf_t* msg ) {
101 if( msg == NULL || msg->header == NULL ) {
107 return msg->alloc_len - RMR_HDR_LEN( msg->header ); // allocated transport size less the header and other data bits
111 Allocates a send message as a zerocopy message allowing the underlying message protocol
112 to send the buffer without copy.
114 extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
118 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
122 m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN ); // alloc with default trace data
128 Allocates a send message as a zerocopy message allowing the underlying message protocol
129 to send the buffer without copy. In addition, a trace data field of tr_size will be
130 added and the supplied data coppied to the buffer before returning the message to
133 extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
138 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
142 m = alloc_zcmsg( ctx, NULL, size, 0, tr_size ); // alloc with specific tr size
144 state = rmr_set_trace( m, data, tr_size ); // roll their data in
145 if( state != tr_size ) {
146 m->state = RMR_ERR_INITFAILED;
154 This provides an external path to the realloc static function as it's called by an
155 outward facing mbuf api function. Used to reallocate a message with a different
158 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
159 return realloc_msg( msg, new_tr_size );
164 Return the message to the available pool, or free it outright.
166 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
172 if( mbuf->flags & MFL_ZEROCOPY ) {
173 //nng_free( (void *) mbuf->header, mbuf->alloc_len );
175 nng_msg_free( mbuf->tp_buf );
184 send message with maximum timeout.
185 Accept a message and send it to an endpoint based on message type.
186 If NNG reports that the send attempt timed out, or should be retried,
187 RMr will retry for approximately max_to microseconds; rounded to the next
190 Allocates a new message buffer for the next send. If a message type has
191 more than one group of endpoints defined, then the message will be sent
192 in round robin fashion to one endpoint in each group.
194 An endpoint will be looked up in the route table using the message type and
195 the subscription id. If the subscription id is "UNSET_SUBID", then only the
196 message type is used. If the initial lookup, with a subid, fails, then a
197 second lookup using just the mtype is tried.
199 CAUTION: this is a non-blocking send. If the message cannot be sent, then
200 it will return with an error and errno set to eagain. If the send is
201 a limited fanout, then the returned status is the status of the last
205 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
206 nng_socket nn_sock; // endpoint socket for send
208 int group; // selected group to get socket for
209 int send_again; // true if the message must be sent again
210 rmr_mbuf_t* clone_m; // cloned message for an nth send
211 int sock_ok; // got a valid socket from round robin select
212 uint64_t key; // mtype or sub-id/mtype sym table key
213 int altk_ok = 0; // set true if we can lookup on alternate key if mt/sid lookup fails
215 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
216 errno = EINVAL; // if msg is null, this is their clue
218 msg->state = RMR_ERR_BADARG;
219 errno = EINVAL; // must ensure it's not eagain
224 errno = 0; // clear; nano might set, but ensure it's not left over if it doesn't
225 if( msg->header == NULL ) {
226 fprintf( stderr, "rmr_send_msg: ERROR: message had no header\n" );
227 msg->state = RMR_ERR_NOHDR;
228 errno = EBADMSG; // must ensure it's not eagain
233 max_to = ctx->send_retries; // convert to retries
236 send_again = 1; // force loop entry
237 group = 0; // always start with group 0
239 key = build_rt_key( msg->sub_id, msg->mtype ); // route table key to find the entry
240 if( msg->sub_id != UNSET_SUBID ) {
241 altk_ok = 1; // if caller's sub-id doesn't hit with mtype, allow mtype only key for retry
243 while( send_again ) {
244 sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock ); // round robin sel epoint; again set if mult groups
245 if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d ak_ok=%d\n",
246 msg->mtype, send_again, group, msg->len, sock_ok, altk_ok );
249 if( altk_ok ) { // we can try with the alternate (no sub-id) key
251 key = build_rt_key( UNSET_SUBID, msg->mtype ); // build with just the mtype and try again
252 send_again = 1; // ensure we don't exit the while
256 msg->state = RMR_ERR_NOENDPT;
257 errno = ENXIO; // must ensure it's not eagain
258 return msg; // caller can resend (maybe) or free
264 clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available
265 if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
266 msg->flags |= MFL_NOALLOC; // send should not allocate a new buffer
267 msg = send_msg( ctx, msg, nn_sock, max_to ); // do the hard work, msg should be nil on success
270 // error do we need to count successes/errors, how to report some success, esp if last fails?
274 msg = clone_m; // clone will be the next to send
276 msg = send_msg( ctx, msg, nn_sock, max_to ); // send the last, and allocate a new buffer; drops the clone if it was
280 return msg; // last message caries the status of last/only send attempt
284 Send with default max timeout as is set in the context.
285 See rmr_mtosend_msg() for more details on the parameters.
286 See rmr_stimeout() for info on setting the default timeout.
288 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
289 return rmr_mtosend_msg( vctx, msg, -1 ); // retries < uses default from ctx
293 Return to sender allows a message to be sent back to the endpoint where it originated.
294 The source information in the message is used to select the socket on which to write
295 the message rather than using the message type and round-robin selection. This
296 should return a message buffer with the state of the send operation set. On success
297 (state is RMR_OK, the caller may use the buffer for another receive operation), and on
298 error it can be passed back to this function to retry the send if desired. On error,
299 errno will liklely have the failure reason set by the nng send processing.
300 The following are possible values for the state in the message buffer:
302 Message states returned:
303 RMR_ERR_BADARG - argument (context or msg) was nil or invalid
304 RMR_ERR_NOHDR - message did not have a header
305 RMR_ERR_NOENDPT- an endpoint to send the message to could not be determined
306 RMR_ERR_SENDFAILED - send failed; errno has nano error code
307 RMR_ERR_RETRY - the reqest failed but should be retried (EAGAIN)
309 A nil message as the return value is rare, and generally indicates some kind of horrible
310 failure. The value of errno might give a clue as to what is wrong.
313 Like send_msg(), this is non-blocking and will return the msg if there is an errror.
314 The caller must check for this and handle.
316 extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
317 nng_socket nn_sock; // endpoint socket for send
321 char* hold_src; // we need the original source if send fails
322 int sock_ok; // true if we found a valid endpoint socket
324 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
325 errno = EINVAL; // if msg is null, this is their clue
327 msg->state = RMR_ERR_BADARG;
332 errno = 0; // at this point any bad state is in msg returned
333 if( msg->header == NULL ) {
334 fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" );
335 msg->state = RMR_ERR_NOHDR;
339 sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock ); // socket of specific endpoint
341 msg->state = RMR_ERR_NOENDPT;
342 return msg; // preallocated msg can be reused since not given back to nn
345 msg->state = RMR_OK; // ensure it is clear before send
346 hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to
347 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID ); // must overlay the source to be ours
348 msg = send_msg( ctx, msg, nn_sock, -1 );
350 strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SID ); // always return original source so rts can be called again
351 msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source
359 Call sends the message based on message routing using the message type, and waits for a
360 response message to arrive with the same transaction id that was in the outgoing message.
361 If, while wiating for the expected response, messages are received which do not have the
362 desired transaction ID, they are queued. Calls to uta_rcv_msg() will dequeue them in the
363 order that they were received.
365 Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
366 to ensure that no error was encountered. If the state is UTA_BADARG, then the message
367 may be resent (likely the context pointer was nil). If the message is sent, but no
368 response is received, a nil message is returned with errno set to indicate the likley
370 ETIMEDOUT -- too many messages were queued before reciving the expected response
371 ENOBUFS -- the queued message ring is full, messages were dropped
372 EINVAL -- A parameter was not valid
373 EAGAIN -- the underlying message system wsa interrupted or the device was busy;
374 user should call this function with the message again.
377 QUESTION: should user specify the number of messages to allow to queue?
379 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
381 unsigned char expected_id[RMR_MAX_XID+1]; // the transaction id in the message; we wait for response with same ID
383 if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
385 msg->state = RMR_ERR_BADARG;
390 memcpy( expected_id, msg->xaction, RMR_MAX_XID );
391 expected_id[RMR_MAX_XID] = 0; // ensure it's a string
392 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rmr_call is making call, waiting for (%s)\n", expected_id );
394 msg->flags |= MFL_NOALLOC; // we don't need a new buffer from send
396 msg = rmr_send_msg( ctx, msg );
397 if( msg ) { // msg should be nil, if not there was a problem; return buffer to user
398 if( msg->state != RMR_ERR_RETRY ) {
399 msg->state = RMR_ERR_CALLFAILED; // errno not available to all wrappers; don't stomp if marked retry
404 return rmr_rcv_specific( ctx, NULL, (char *) expected_id, 20 ); // wait for msg allowing 20 to queue ahead
408 The outward facing receive function. When invoked it will pop the oldest message
409 from the receive ring, if any are queued, and return it. If the ring is empty
410 then the receive function is invoked to wait for the next message to arrive (blocking).
412 If old_msg is provided, it will be populated (avoiding lots of free/alloc cycles). If
413 nil, a new one will be allocated. However, the caller should NOT expect to get the same
414 struct back (if a queued message is returned the message struct will be different).
416 extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
418 rmr_mbuf_t* qm; // message that was queued on the ring
420 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
421 if( old_msg != NULL ) {
422 old_msg->state = RMR_ERR_BADARG;
429 qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring ); // pop if queued
432 rmr_free_msg( old_msg ); // future: push onto a free list???
438 return rcv_msg( ctx, old_msg ); // nothing queued, wait for one
442 This implements a receive with a timeout via epoll. Mostly this is for
443 wrappers as native C applications can use epoll directly and will not have
446 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
447 struct epoll_stuff* eps; // convience pointer
449 rmr_mbuf_t* qm; // message that was queued on the ring
453 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
454 if( old_msg != NULL ) {
455 old_msg->state = RMR_ERR_BADARG;
461 qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring ); // pop if queued
464 rmr_free_msg( old_msg ); // future: push onto a free list???
470 if( (eps = ctx->eps) == NULL ) { // set up epoll on first call
471 eps = malloc( sizeof *eps );
473 if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) {
474 fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno );
479 eps->nng_fd = rmr_get_rcvfd( ctx );
480 eps->epe.events = EPOLLIN;
481 eps->epe.data.fd = eps->nng_fd;
483 if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 ) {
484 fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
495 msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN ); // will abort on failure, no need to check
502 nready = epoll_wait( eps->ep_fd, eps->events, 1, ms_to ); // block until something or timedout
503 if( nready <= 0 ) { // we only wait on ours, so we assume ready means it's ours
504 msg->state = RMR_ERR_TIMEOUT;
506 return rcv_msg( ctx, msg ); // receive it and return it
509 return msg; // return empty message with state set
513 This blocks until the message with the 'expect' ID is received. Messages which are received
514 before the expected message are queued onto the message ring. The function will return
515 a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
516 expected message is received. If the queued message ring fills a nil pointer is returned
517 and errno is set to ENOBUFS.
519 Generally this will be invoked only by the call() function as it waits for a response, but
520 it is exposed to the user application as three is no reason not to.
522 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
524 int queued = 0; // number we pushed into the ring
525 int exp_len = 0; // length of expected ID
527 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
529 msg->state = RMR_ERR_BADARG;
537 if( expect == NULL || ! *expect ) { // nothing expected if nil or empty string, just receive
538 return rmr_rcv_msg( ctx, msg );
541 exp_len = strlen( expect );
542 if( exp_len > RMR_MAX_XID ) {
543 exp_len = RMR_MAX_XID;
545 if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n", expect );
547 while( queued < allow2queue ) {
548 msg = rcv_msg( ctx, msg ); // hard wait for next
549 if( msg->state == RMR_OK ) {
550 if( memcmp( msg->xaction, expect, exp_len ) == 0 ) { // got it -- return it
551 if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
555 if( ! uta_ring_insert( ctx->mring, msg ) ) { // just queue, error if ring is full
556 if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" );
561 if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype );
567 if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect );
572 // CAUTION: these are not supported as they must be set differently (between create and open) in NNG.
573 // until those details are worked out, these generate a warning.
575 Set send timeout. The value time is assumed to be microseconds. The timeout is the
576 rough maximum amount of time that RMr will block on a send attempt when the underlying
577 mechnism indicates eagain or etimeedout. All other error conditions are reported
578 without this delay. Setting a timeout of 0 causes no retries to be attempted in
579 RMr code. Setting a timeout of 1 causes RMr to spin up to 10K retries before returning,
580 but without issuing a sleep. If timeout is > 1, then RMr will issue a sleep (1us)
581 after every 10K send attempts until the time value is reached. Retries are abandoned
582 if NNG returns anything other than NNG_AGAIN or NNG_TIMEDOUT.
584 The default, if this function is not used, is 1; meaning that RMr will retry, but will
585 not enter a sleep. In all cases the caller should check the status in the message returned
588 Returns -1 if the context was invalid; RMR_OK otherwise.
590 extern int rmr_set_stimeout( void* vctx, int time ) {
593 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
601 ctx->send_retries = time;
606 Set receive timeout -- not supported in nng implementation
608 extern int rmr_set_rtimeout( void* vctx, int time ) {
609 fprintf( stderr, "[WRN] Current implementation of RMR ontop of NNG does not support setting a receive timeout\n" );
615 This is the actual init workhorse. The user visible function meerly ensures that the
616 calling programme does NOT set any internal flags that are supported, and then
617 invokes this. Internal functions (the route table collector) which need additional
618 open ports without starting additional route table collectors, will invoke this
619 directly with the proper flag.
621 static void* init( char* uproto_port, int max_msg_size, int flags ) {
622 static int announced = 0;
623 uta_ctx_t* ctx = NULL;
624 char bind_info[NNG_MAXADDRLEN]; // bind info
625 char* proto = "tcp"; // pointer into the proto/port string user supplied
627 char* interface = NULL; // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
629 char wbuf[1024]; // work buffer
630 char* tok; // pointer at token in a buffer
634 fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d (%s %s.%s.%s built: %s)\n",
635 RMR_MSG_VER, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
640 if( uproto_port == NULL ) {
641 proto_port = strdup( DEF_COMM_PORT );
643 proto_port = strdup( uproto_port ); // so we can modify it
646 if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
650 memset( ctx, 0, sizeof( uta_ctx_t ) );
652 ctx->send_retries = 1; // default is not to sleep at all; RMr will retry about 10K times before returning
653 ctx->mring = uta_mk_ring( 128 ); // message ring to hold asynch msgs received while waiting for call response
655 ctx->max_plen = RMR_MAX_RCV_BYTES; // max user payload lengh
656 if( max_msg_size > 0 ) {
657 ctx->max_plen = max_msg_size;
660 // we're using a listener to get rtg updates, so we do NOT need this.
661 //uta_lookup_rtg( ctx ); // attempt to fill in rtg info; rtc will handle missing values/errors
663 if( nng_pull0_open( &ctx->nn_sock ) != 0 ) { // and assign the mode
664 fprintf( stderr, "[CRI] rmr_init: unable to initialise nng listen (pull) socket: %d\n", errno );
669 if( (port = strchr( proto_port, ':' )) != NULL ) {
670 if( port == proto_port ) { // ":1234" supplied; leave proto to default and point port correctly
673 *(port++) = 0; // term proto string and point at port string
674 proto = proto_port; // user supplied proto so point at it rather than default
677 port = proto_port; // assume something like "1234" was passed
680 if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
681 fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
684 if( (tok = strchr( wbuf, '.' )) != NULL ) {
685 *tok = 0; // we don't keep domain portion
687 ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SID );
688 if( snprintf( ctx->my_name, RMR_MAX_SID, "%s:%s", wbuf, port ) >= RMR_MAX_SID ) { // our registered name is host:port
689 fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SID, wbuf, port );
693 ctx->ip_list = mk_ip_list( port ); // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
697 if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
698 interface = "0.0.0.0";
700 // NOTE: if there are options that might need to be configured, the listener must be created, options set, then started
701 // rather than using this generic listen() call.
702 snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port );
703 if( (state = nng_listen( ctx->nn_sock, bind_info, NULL, NO_FLAGS )) != 0 ) {
704 fprintf( stderr, "[CRIT] rmr_init: unable to start nng listener for %s: %s\n", bind_info, nng_strerror( state ) );
705 nng_close( ctx->nn_sock );
710 if( !(flags & FL_NOTHREAD) ) { // skip if internal function that doesnt need an rtc
711 if( pthread_create( &ctx->rtc_th, NULL, rtc, (void *) ctx ) ) { // kick the rt collector thread
712 fprintf( stderr, "[WARN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
721 Initialise the message routing environment. Flags are one of the UTAFL_
722 constants. Proto_port is a protocol:port string (e.g. tcp:1234). If default protocol
723 (tcp) to be used, then :port is all that is needed.
725 At the moment it seems that TCP really is the only viable protocol, but
726 we'll allow flexibility.
728 The return value is a void pointer which must be passed to most uta functions. On
729 error, a nil pointer is returned and errno should be set.
732 No user flags supported (needed) at the moment, but this provides for extension
733 without drastically changing anything. The user should invoke with RMRFL_NONE to
734 avoid any misbehavour as there are internal flags which are suported
736 extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
737 return init( uproto_port, max_msg_size, flags & UFL_MASK ); // ensure any internal flags are off
741 This sets the default trace length which will be added to any message buffers
742 allocated. It can be set at any time, and if rmr_set_trace() is given a
743 trace len that is different than the default allcoated in a message, the message
746 Returns 0 on failure and 1 on success. If failure, then errno will be set.
748 extern int rmr_init_trace( void* vctx, int tr_len ) {
752 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
757 ctx->trace_data_len = tr_len;
762 Return true if routing table is initialised etc. and app can send/receive.
764 extern int rmr_ready( void* vctx ) {
767 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
771 if( ctx->rtable != NULL ) {
779 Returns a file descriptor which can be used with epoll() to signal a receive
780 pending. The file descriptor should NOT be read from directly, nor closed, as NNG
781 does not support this.
783 extern int rmr_get_rcvfd( void* vctx ) {
788 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
792 if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
793 fprintf( stderr, "[WRN] rmr cannot get recv fd: %s\n", nng_strerror( state ) );
804 There isn't an nng_flush() per se, but we can pause, generate
805 a context switch, which should allow the last sent buffer to
806 flow. There isn't exactly an nng_term/close either, so there
807 isn't much we can do.
809 extern void rmr_close( void* vctx ) {
812 if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
817 nng_close( ctx->nn_sock );