feature(routes): Add rtable update
[ric-plt/lib/rmr.git] / src / nng / src / rmr_nng.c
index af8ce6f..e77c9f3 100644 (file)
@@ -1,14 +1,14 @@
 // : vi ts=4 sw=4 noet :
 /*
 ==================================================================================
 // : vi ts=4 sw=4 noet :
 /*
 ==================================================================================
-       Copyright (c) 2019 Nokia 
+       Copyright (c) 2019 Nokia
        Copyright (c) 2018-2019 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at
 
        Copyright (c) 2018-2019 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at
 
-       http://www.apache.org/licenses/LICENSE-2.0
+          http://www.apache.org/licenses/LICENSE-2.0
 
    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
 
    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
 
 /*
        Mnemonic:       rmr_nng.c
 
 /*
        Mnemonic:       rmr_nng.c
-       Abstract:       This is the compile point for the nng version of the rmr 
+       Abstract:       This is the compile point for the nng version of the rmr
                                library (formarly known as uta, so internal function names
                                are likely still uta_*)
 
                                library (formarly known as uta, so internal function names
                                are likely still uta_*)
 
-                               With the exception of the symtab portion of the library, 
-                               RMr is built with a single compile so as to "hide" the 
+                               With the exception of the symtab portion of the library,
+                               RMr is built with a single compile so as to "hide" the
                                internal functions as statics.  Because they interdepend
                                on each other, and CMake has issues with generating two
                                different wormhole objects from a single source, we just
                                pull it all together with a centralised comple using
                                internal functions as statics.  Because they interdepend
                                on each other, and CMake has issues with generating two
                                different wormhole objects from a single source, we just
                                pull it all together with a centralised comple using
-                               includes.  
+                               includes.
 
                                Future:  the API functions at this point can be separated
                                into a common source module.
 
                                Future:  the API functions at this point can be separated
                                into a common source module.
@@ -92,7 +92,7 @@ static void free_ctx( uta_ctx_t* ctx ) {
        Returns the size of the payload (bytes) that the msg buffer references.
        Len in a message is the number of bytes which were received, or should
        be transmitted, however, it is possible that the mbuf was allocated
        Returns the size of the payload (bytes) that the msg buffer references.
        Len in a message is the number of bytes which were received, or should
        be transmitted, however, it is possible that the mbuf was allocated
-       with a larger payload space than the payload length indicates; this 
+       with a larger payload space than the payload length indicates; this
        function returns the absolute maximum space that the user has available
        in the payload. On error (bad msg buffer) -1 is returned and errno should
        indicate the rason.
        function returns the absolute maximum space that the user has available
        in the payload. On error (bad msg buffer) -1 is returned and errno should
        indicate the rason.
@@ -119,10 +119,47 @@ extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
                return NULL;
        }
 
                return NULL;
        }
 
-       m = alloc_zcmsg( ctx, NULL, size, 0 );
+       m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );                              // alloc with default trace data
        return  m;
 }
 
        return  m;
 }
 
+
+/*
+       Allocates a send message as a zerocopy message allowing the underlying message protocol
+       to send the buffer without copy. In addition, a trace data field of tr_size will be
+       added and the supplied data coppied to the buffer before returning the message to
+       the caller.
+*/
+extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
+       uta_ctx_t*      ctx;
+       rmr_mbuf_t*     m;
+       int state;
+
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               return NULL;
+       }
+
+       m = alloc_zcmsg( ctx, NULL, size, 0, tr_size );                         // alloc with specific tr size
+       if( m != NULL ) {
+               state = rmr_set_trace( m, data, tr_size );                              // roll their data in
+               if( state != tr_size ) {
+                       m->state = RMR_ERR_INITFAILED;
+               }
+       }
+
+       return  m;
+}
+
+/*
+       This provides an external path to the realloc static function as it's called by an
+       outward facing mbuf api function. Used to reallocate a message with a different
+       trace data size.
+*/
+extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
+       return realloc_msg( msg, new_tr_size );
+}
+
+
 /*
        Return the message to the available pool, or free it outright.
 */
 /*
        Return the message to the available pool, or free it outright.
 */
@@ -139,26 +176,31 @@ extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
                        }
                }
        }
                        }
                }
        }
-       
+
        free( mbuf );
 }
 
 /*
        send message with maximum timeout.
        free( mbuf );
 }
 
 /*
        send message with maximum timeout.
-       Accept a message and send it to an endpoint based on message type.      
+       Accept a message and send it to an endpoint based on message type.
        If NNG reports that the send attempt timed out, or should be retried,
        RMr will retry for approximately max_to microseconds; rounded to the next
        higher value of 10.
 
        Allocates a new message buffer for the next send. If a message type has
        more than one group of endpoints defined, then the message will be sent
        If NNG reports that the send attempt timed out, or should be retried,
        RMr will retry for approximately max_to microseconds; rounded to the next
        higher value of 10.
 
        Allocates a new message buffer for the next send. If a message type has
        more than one group of endpoints defined, then the message will be sent
-       in round robin fashion to one endpoint in each group. 
+       in round robin fashion to one endpoint in each group.
+
+       An endpoint will be looked up in the route table using the message type and
+       the subscription id. If the subscription id is "UNSET_SUBID", then only the
+       message type is used.  If the initial lookup, with a subid, fails, then a
+       second lookup using just the mtype is tried.
 
        CAUTION: this is a non-blocking send.  If the message cannot be sent, then
                it will return with an error and errno set to eagain. If the send is
                a limited fanout, then the returned status is the status of the last
                send attempt.
 
        CAUTION: this is a non-blocking send.  If the message cannot be sent, then
                it will return with an error and errno set to eagain. If the send is
                a limited fanout, then the returned status is the status of the last
                send attempt.
-       
+
 */
 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
        nng_socket nn_sock;                     // endpoint socket for send
 */
 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
        nng_socket nn_sock;                     // endpoint socket for send
@@ -167,13 +209,15 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
        int send_again;                         // true if the message must be sent again
        rmr_mbuf_t*     clone_m;                // cloned message for an nth send
        int sock_ok;                            // got a valid socket from round robin select
        int send_again;                         // true if the message must be sent again
        rmr_mbuf_t*     clone_m;                // cloned message for an nth send
        int sock_ok;                            // got a valid socket from round robin select
+       uint64_t key;                           // mtype or sub-id/mtype sym table key
+       int     altk_ok = 0;                    // set true if we can lookup on alternate key if mt/sid lookup fails
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
                if( msg != NULL ) {
                        msg->state = RMR_ERR_BADARG;
                        errno = EINVAL;                                                                                 // must ensure it's not eagain
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
                if( msg != NULL ) {
                        msg->state = RMR_ERR_BADARG;
                        errno = EINVAL;                                                                                 // must ensure it's not eagain
-               }                       
+               }
                return msg;
        }
 
                return msg;
        }
 
@@ -192,18 +236,30 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
        send_again = 1;                                                                                 // force loop entry
        group = 0;                                                                                              // always start with group 0
 
        send_again = 1;                                                                                 // force loop entry
        group = 0;                                                                                              // always start with group 0
 
+       key = build_rt_key( msg->sub_id, msg->mtype );                  // route table key to find the entry
+       if( msg->sub_id != UNSET_SUBID ) {
+               altk_ok = 1;                                                                            // if caller's sub-id doesn't hit with mtype, allow mtype only key for retry
+       }
        while( send_again ) {
        while( send_again ) {
-               sock_ok = uta_epsock_rr( ctx->rtable, msg->mtype, group, &send_again, &nn_sock );               // round robin sel epoint; again set if mult groups
-               if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d\n", 
-                               msg->mtype, send_again, group, msg->len, sock_ok );
-               group++;
+               sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock );              // round robin sel epoint; again set if mult groups
+               if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d ak_ok=%d\n",
+                               msg->mtype, send_again, group, msg->len, sock_ok, altk_ok );
 
                if( ! sock_ok ) {
 
                if( ! sock_ok ) {
+                       if( altk_ok ) {                                                                                 // we can try with the alternate (no sub-id) key
+                               altk_ok = 0;
+                               key = build_rt_key( UNSET_SUBID, msg->mtype );          // build with just the mtype and try again
+                               send_again = 1;                                                                         // ensure we don't exit the while
+                               continue;
+                       }
+
                        msg->state = RMR_ERR_NOENDPT;
                        errno = ENXIO;                                                                                  // must ensure it's not eagain
                        return msg;                                                                                             // caller can resend (maybe) or free
                }
 
                        msg->state = RMR_ERR_NOENDPT;
                        errno = ENXIO;                                                                                  // must ensure it's not eagain
                        return msg;                                                                                             // caller can resend (maybe) or free
                }
 
+               group++;
+
                if( send_again ) {
                        clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
                        if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
                if( send_again ) {
                        clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
                        if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
@@ -212,7 +268,7 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
                        /*
                        if( msg ) {
                                // error do we need to count successes/errors, how to report some success, esp if last fails?
                        /*
                        if( msg ) {
                                // error do we need to count successes/errors, how to report some success, esp if last fails?
-                       } 
+                       }
                        */
 
                        msg = clone_m;                                                                                  // clone will be the next to send
                        */
 
                        msg = clone_m;                                                                                  // clone will be the next to send
@@ -225,7 +281,7 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
 }
 
 /*
 }
 
 /*
-       Send with default max timeout as is set in the context. 
+       Send with default max timeout as is set in the context.
        See rmr_mtosend_msg() for more details on the parameters.
        See rmr_stimeout() for info on setting the default timeout.
 */
        See rmr_mtosend_msg() for more details on the parameters.
        See rmr_stimeout() for info on setting the default timeout.
 */
@@ -234,9 +290,9 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
 }
 
 /*
 }
 
 /*
-       Return to sender allows a message to be sent back to the endpoint where it originated. 
+       Return to sender allows a message to be sent back to the endpoint where it originated.
        The source information in the message is used to select the socket on which to write
        The source information in the message is used to select the socket on which to write
-       the message rather than using the message type and round-robin selection. This 
+       the message rather than using the message type and round-robin selection. This
        should return a message buffer with the state of the send operation set. On success
        (state is RMR_OK, the caller may use the buffer for another receive operation), and on
        error it can be passed back to this function to retry the send if desired. On error,
        should return a message buffer with the state of the send operation set. On success
        (state is RMR_OK, the caller may use the buffer for another receive operation), and on
        error it can be passed back to this function to retry the send if desired. On error,
@@ -269,7 +325,7 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
                if( msg != NULL ) {
                        msg->state = RMR_ERR_BADARG;
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
                if( msg != NULL ) {
                        msg->state = RMR_ERR_BADARG;
-               }                       
+               }
                return msg;
        }
 
                return msg;
        }
 
@@ -308,7 +364,7 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
 
        Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
        to ensure that no error was encountered. If the state is UTA_BADARG, then the message
 
        Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
        to ensure that no error was encountered. If the state is UTA_BADARG, then the message
-       may be resent (likely the context pointer was nil).  If the message is sent, but no 
+       may be resent (likely the context pointer was nil).  If the message is sent, but no
        response is received, a nil message is returned with errno set to indicate the likley
        issue:
                ETIMEDOUT -- too many messages were queued before reciving the expected response
        response is received, a nil message is returned with errno set to indicate the likley
        issue:
                ETIMEDOUT -- too many messages were queued before reciving the expected response
@@ -327,7 +383,7 @@ extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                if( msg != NULL ) {
                        msg->state = RMR_ERR_BADARG;
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                if( msg != NULL ) {
                        msg->state = RMR_ERR_BADARG;
-               }                       
+               }
                return msg;
        }
 
                return msg;
        }
 
@@ -364,7 +420,7 @@ extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
                if( old_msg != NULL ) {
                        old_msg->state = RMR_ERR_BADARG;
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
                if( old_msg != NULL ) {
                        old_msg->state = RMR_ERR_BADARG;
-               }                       
+               }
                errno = EINVAL;
                return old_msg;
        }
                errno = EINVAL;
                return old_msg;
        }
@@ -383,8 +439,8 @@ extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
 }
 
 /*
 }
 
 /*
-       This implements a receive with a timeout via epoll. Mostly this is for 
-       wrappers as native C applications can use epoll directly and will not have 
+       This implements a receive with a timeout via epoll. Mostly this is for
+       wrappers as native C applications can use epoll directly and will not have
        to depend on this.
 */
 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
        to depend on this.
 */
 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
@@ -397,7 +453,7 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
                if( old_msg != NULL ) {
                        old_msg->state = RMR_ERR_BADARG;
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
                if( old_msg != NULL ) {
                        old_msg->state = RMR_ERR_BADARG;
-               }                       
+               }
                errno = EINVAL;
                return old_msg;
        }
                errno = EINVAL;
                return old_msg;
        }
@@ -412,23 +468,23 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
        }
 
        if( (eps = ctx->eps)  == NULL ) {                                       // set up epoll on first call
        }
 
        if( (eps = ctx->eps)  == NULL ) {                                       // set up epoll on first call
-               eps = malloc( sizeof *eps );            
+               eps = malloc( sizeof *eps );
 
 
-       if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) {
-               fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno );
+               if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) {
+               fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno );
                        free( eps );
                        return NULL;
                        free( eps );
                        return NULL;
-       }
+               }
 
                eps->nng_fd = rmr_get_rcvfd( ctx );
                eps->epe.events = EPOLLIN;
 
                eps->nng_fd = rmr_get_rcvfd( ctx );
                eps->epe.events = EPOLLIN;
-       eps->epe.data.fd = eps->nng_fd;
+               eps->epe.data.fd = eps->nng_fd;
 
 
-       if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 )  {
-               fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
+               if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 )  {
+               fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
                        free( eps );
                        return NULL;
                        free( eps );
                        return NULL;
-       }
+               }
 
                ctx->eps = eps;
        }
 
                ctx->eps = eps;
        }
@@ -436,7 +492,7 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
        if( old_msg ) {
                msg = old_msg;
        } else {
        if( old_msg ) {
                msg = old_msg;
        } else {
-               msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK );                      // will abort on failure, no need to check
+               msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
        }
 
        if( ms_to < 0 ) {
        }
 
        if( ms_to < 0 ) {
@@ -455,23 +511,23 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
 
 /*
        This blocks until the message with the 'expect' ID is received. Messages which are received
 
 /*
        This blocks until the message with the 'expect' ID is received. Messages which are received
-       before the expected message are queued onto the message ring.  The function will return 
+       before the expected message are queued onto the message ring.  The function will return
        a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
        expected message is received. If the queued message ring fills a nil pointer is returned
        and errno is set to ENOBUFS.
 
        a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
        expected message is received. If the queued message ring fills a nil pointer is returned
        and errno is set to ENOBUFS.
 
-       Generally this will be invoked only by the call() function as it waits for a response, but 
+       Generally this will be invoked only by the call() function as it waits for a response, but
        it is exposed to the user application as three is no reason not to.
 */
 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
        uta_ctx_t*      ctx;
        int     queued = 0;                             // number we pushed into the ring
        int     exp_len = 0;                    // length of expected ID
        it is exposed to the user application as three is no reason not to.
 */
 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
        uta_ctx_t*      ctx;
        int     queued = 0;                             // number we pushed into the ring
        int     exp_len = 0;                    // length of expected ID
-       
+
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
                if( msg != NULL ) {
                        msg->state = RMR_ERR_BADARG;
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
                if( msg != NULL ) {
                        msg->state = RMR_ERR_BADARG;
-               }                       
+               }
                errno = EINVAL;
                return msg;
        }
                errno = EINVAL;
                return msg;
        }
@@ -516,12 +572,12 @@ extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect,
 //  CAUTION:  these are not supported as they must be set differently (between create and open) in NNG.
 //                             until those details are worked out, these generate a warning.
 /*
 //  CAUTION:  these are not supported as they must be set differently (between create and open) in NNG.
 //                             until those details are worked out, these generate a warning.
 /*
-       Set send timeout. The value time is assumed to be microseconds.  The timeout is the 
+       Set send timeout. The value time is assumed to be microseconds.  The timeout is the
        rough maximum amount of time that RMr will block on a send attempt when the underlying
        mechnism indicates eagain or etimeedout.  All other error conditions are reported
        without this delay. Setting a timeout of 0 causes no retries to be attempted in
        RMr code. Setting a timeout of 1 causes RMr to spin up to 10K retries before returning,
        rough maximum amount of time that RMr will block on a send attempt when the underlying
        mechnism indicates eagain or etimeedout.  All other error conditions are reported
        without this delay. Setting a timeout of 0 causes no retries to be attempted in
        RMr code. Setting a timeout of 1 causes RMr to spin up to 10K retries before returning,
-       but without issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us) 
+       but without issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us)
        after every 10K send attempts until the time value is reached. Retries are abandoned
        if NNG returns anything other than NNG_AGAIN or NNG_TIMEDOUT.
 
        after every 10K send attempts until the time value is reached. Retries are abandoned
        if NNG returns anything other than NNG_AGAIN or NNG_TIMEDOUT.
 
@@ -557,9 +613,9 @@ extern int rmr_set_rtimeout( void* vctx, int time ) {
 
 /*
        This is the actual init workhorse. The user visible function meerly ensures that the
 
 /*
        This is the actual init workhorse. The user visible function meerly ensures that the
-       calling programme does NOT set any internal flags that are supported, and then 
+       calling programme does NOT set any internal flags that are supported, and then
        invokes this.  Internal functions (the route table collector) which need additional
        invokes this.  Internal functions (the route table collector) which need additional
-       open ports without starting additional route table collectors, will invoke this 
+       open ports without starting additional route table collectors, will invoke this
        directly with the proper flag.
 */
 static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        directly with the proper flag.
 */
 static void* init(  char* uproto_port, int max_msg_size, int flags ) {
@@ -570,12 +626,12 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        char*   port;
        char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
        char*   proto_port;
        char*   port;
        char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
        char*   proto_port;
-       char    wbuf[1024];                                     // work buffer 
+       char    wbuf[1024];                                     // work buffer
        char*   tok;                                            // pointer at token in a buffer
        int             state;
 
        if( ! announced ) {
        char*   tok;                                            // pointer at token in a buffer
        int             state;
 
        if( ! announced ) {
-               fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d (%s %s.%s.%s built: %s)\n", 
+               fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d (%s %s.%s.%s built: %s)\n",
                        RMR_MSG_VER, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
                announced = 1;
        }
                        RMR_MSG_VER, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
                announced = 1;
        }
@@ -674,13 +730,34 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
 
        Flags:
                No user flags supported (needed) at the moment, but this provides for extension
 
        Flags:
                No user flags supported (needed) at the moment, but this provides for extension
-               without drastically changing anything. The user should invoke with RMRFL_NONE to 
+               without drastically changing anything. The user should invoke with RMRFL_NONE to
                avoid any misbehavour as there are internal flags which are suported
 */
 extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
        return init( uproto_port, max_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
 }
 
                avoid any misbehavour as there are internal flags which are suported
 */
 extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
        return init( uproto_port, max_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
 }
 
+/*
+       This sets the default trace length which will be added to any message buffers
+       allocated.  It can be set at any time, and if rmr_set_trace() is given a
+       trace len that is different than the default allcoated in a message, the message
+       will be resized.
+
+       Returns 0 on failure and 1 on success. If failure, then errno will be set.
+*/
+extern int rmr_init_trace( void* vctx, int tr_len ) {
+       uta_ctx_t* ctx;
+
+       errno = 0;
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               errno = EINVAL;
+               return 0;
+       }
+
+       ctx->trace_data_len = tr_len;
+       return 1;
+}
+
 /*
        Return true if routing table is initialised etc. and app can send/receive.
 */
 /*
        Return true if routing table is initialised etc. and app can send/receive.
 */
@@ -699,9 +776,9 @@ extern int rmr_ready( void* vctx ) {
 }
 
 /*
 }
 
 /*
-    Returns a file descriptor which can be used with epoll() to signal a receive
-    pending. The file descriptor should NOT be read from directly, nor closed, as NNG
-    does not support this.
+       Returns a file descriptor which can be used with epoll() to signal a receive
+       pending. The file descriptor should NOT be read from directly, nor closed, as NNG
+       does not support this.
 */
 extern int rmr_get_rcvfd( void* vctx ) {
        uta_ctx_t* ctx;
 */
 extern int rmr_get_rcvfd( void* vctx ) {
        uta_ctx_t* ctx;
@@ -725,7 +802,7 @@ extern int rmr_get_rcvfd( void* vctx ) {
        Clean up things.
 
        There isn't an nng_flush() per se, but we can pause, generate
        Clean up things.
 
        There isn't an nng_flush() per se, but we can pause, generate
-       a context switch, which should allow the last sent buffer to 
+       a context switch, which should allow the last sent buffer to
        flow. There isn't exactly an nng_term/close either, so there
        isn't much we can do.
 */
        flow. There isn't exactly an nng_term/close either, so there
        isn't much we can do.
 */