test(e2e): Add return to sender app based test
[ric-plt/lib/rmr.git] / src / nanomsg / src / rmr.c
index 2900117..b57df3a 100644 (file)
@@ -1,14 +1,14 @@
 // :vi sw=4 ts=4 noet:
 /*
 ==================================================================================
 // :vi sw=4 ts=4 noet:
 /*
 ==================================================================================
-       Copyright (c) 2019 Nokia 
+       Copyright (c) 2019 Nokia
        Copyright (c) 2018-2019 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at
 
        Copyright (c) 2018-2019 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at
 
-       http://www.apache.org/licenses/LICENSE-2.0
+          http://www.apache.org/licenses/LICENSE-2.0
 
    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
 
    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
@@ -24,9 +24,9 @@
                                the older nanomsg messaging transport mehhanism.
 
                                To "hide" internal functions the choice was made to implement them
                                the older nanomsg messaging transport mehhanism.
 
                                To "hide" internal functions the choice was made to implement them
-                               all as static functions. This means that we include nearly 
+                               all as static functions. This means that we include nearly
                                all of our modules here as 90% of the library is not visible to
                                all of our modules here as 90% of the library is not visible to
-                               the outside world. 
+                               the outside world.
 
        Author:         E. Scott Daniels
        Date:           28 November 2018
 
        Author:         E. Scott Daniels
        Date:           28 November 2018
@@ -80,8 +80,8 @@ static void free_ctx( uta_ctx_t* ctx ) {
 // --------------- public functions --------------------------------------------------------------------------
 
 /*
 // --------------- public functions --------------------------------------------------------------------------
 
 /*
-       Set the receive timeout to time. If time >1000 we assume the time is milliseconds,
-       else we assume seconds. Setting -1 is always block.
+       Set the receive timeout to time (ms). A value of 0 is the same as a non-blocking
+       receive and -1 is block for ever.
        Returns the nn value (0 on success <0 on error).
 */
 extern int rmr_set_rtimeout( void* vctx, int time ) {
        Returns the nn value (0 on success <0 on error).
 */
 extern int rmr_set_rtimeout( void* vctx, int time ) {
@@ -92,11 +92,11 @@ extern int rmr_set_rtimeout( void* vctx, int time ) {
                return -1;
        }
 
                return -1;
        }
 
-       if( time > 0 ) {
-               if( time < 1000 ) {     
-                       time = time * 1000;                     // assume seconds, nn wants ms
-               }
-       } 
+       if( ctx->last_rto == time ) {
+               return 0;
+       }
+
+       ctx->last_rto = time;
 
        return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_RCVTIMEO, &time, sizeof( time ) );
 }
 
        return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_RCVTIMEO, &time, sizeof( time ) );
 }
@@ -108,7 +108,6 @@ extern int rmr_rcv_to( void* vctx, int time ) {
        return rmr_rcv_to( vctx, time );
 }
 
        return rmr_rcv_to( vctx, time );
 }
 
-
 /*
        Set the send timeout to time. If time >1000 we assume the time is milliseconds,
        else we assume seconds. Setting -1 is always block.
 /*
        Set the send timeout to time. If time >1000 we assume the time is milliseconds,
        else we assume seconds. Setting -1 is always block.
@@ -123,10 +122,10 @@ extern int rmr_set_stimeout( void* vctx, int time ) {
        }
 
        if( time > 0 ) {
        }
 
        if( time > 0 ) {
-               if( time < 1000 ) {     
+               if( time < 1000 ) {
                        time = time * 1000;                     // assume seconds, nn wants ms
                }
                        time = time * 1000;                     // assume seconds, nn wants ms
                }
-       } 
+       }
 
        return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_SNDTIMEO, &time, sizeof( time ) );
 }
 
        return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_SNDTIMEO, &time, sizeof( time ) );
 }
@@ -142,7 +141,7 @@ extern int rmr_send_to( void* vctx, int time ) {
        Returns the size of the payload (bytes) that the msg buffer references.
        Len in a message is the number of bytes which were received, or should
        be transmitted, however, it is possible that the mbuf was allocated
        Returns the size of the payload (bytes) that the msg buffer references.
        Len in a message is the number of bytes which were received, or should
        be transmitted, however, it is possible that the mbuf was allocated
-       with a larger payload space than the payload length indicates; this 
+       with a larger payload space than the payload length indicates; this
        function returns the absolute maximum space that the user has available
        in the payload. On error (bad msg buffer) -1 is returned and errno should
        indicate the rason.
        function returns the absolute maximum space that the user has available
        in the payload. On error (bad msg buffer) -1 is returned and errno should
        indicate the rason.
@@ -169,10 +168,44 @@ extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
                return NULL;
        }
 
                return NULL;
        }
 
-       m = alloc_zcmsg( ctx, NULL, size, 0 );
+       m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );
        return  m;
 }
 
        return  m;
 }
 
+/*
+       Allocates a send message as a zerocopy message allowing the underlying message protocol
+       to send the buffer without copy. In addition, a trace data field of tr_size will be
+       added and the supplied data coppied to the buffer before returning the message to
+       the caller.
+*/
+extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
+       uta_ctx_t*      ctx;
+       rmr_mbuf_t*     m;
+       int state;
+
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               return NULL;
+       }
+
+       m = alloc_zcmsg( ctx, NULL, size, 0, tr_size );                         // alloc with specific tr size
+       if( m != NULL ) {
+               state = rmr_set_trace( m, data, tr_size );                              // roll their data in
+               if( state != tr_size ) {
+                       m->state = RMR_ERR_INITFAILED;
+               }
+       }
+
+       return  m;
+}
+
+/*
+       Need an external path to the realloc static function as it's called by an
+       outward facing mbuf api function.
+*/
+extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
+       return realloc_msg( msg, new_tr_size );
+}
+
 /*
        Return the message to the available pool, or free it outright.
 */
 /*
        Return the message to the available pool, or free it outright.
 */
@@ -188,15 +221,15 @@ extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
                        free( mbuf->header );
                }
        }
                        free( mbuf->header );
                }
        }
-       
+
        free( mbuf );
 }
 
 /*
        free( mbuf );
 }
 
 /*
-       Accept a message and send it to an endpoint based on message type.      
+       Accept a message and send it to an endpoint based on message type.
        Allocates a new message buffer for the next send. If a message type has
        more than one group of endpoints defined, then the message will be sent
        Allocates a new message buffer for the next send. If a message type has
        more than one group of endpoints defined, then the message will be sent
-       in round robin fashion to one endpoint in each group. 
+       in round robin fashion to one endpoint in each group.
 
        CAUTION: this is a non-blocking send.  If the message cannot be sent, then
                it will return with an error and errno set to eagain. If the send is
 
        CAUTION: this is a non-blocking send.  If the message cannot be sent, then
                it will return with an error and errno set to eagain. If the send is
@@ -209,13 +242,16 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
        int     group;                                  // selected group to get socket for
        int send_again;                         // true if the message must be sent again
        rmr_mbuf_t*     clone_m;                // cloned message for an nth send
        int     group;                                  // selected group to get socket for
        int send_again;                         // true if the message must be sent again
        rmr_mbuf_t*     clone_m;                // cloned message for an nth send
+       uint64_t key;                           // lookup key is now subid and mtype
+       int max_rt = 1000;
+       int     altk_ok = 0;                    // ok to retry with alt key when true
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
                if( msg != NULL ) {
                        msg->state = RMR_ERR_BADARG;
                        errno = EINVAL;                                                                                 // must ensure it's not eagain
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
                if( msg != NULL ) {
                        msg->state = RMR_ERR_BADARG;
                        errno = EINVAL;                                                                                 // must ensure it's not eagain
-               }                       
+               }
                return msg;
        }
 
                return msg;
        }
 
@@ -230,32 +266,48 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
        send_again = 1;                                                                                 // force loop entry
        group = 0;                                                                                              // always start with group 0
 
        send_again = 1;                                                                                 // force loop entry
        group = 0;                                                                                              // always start with group 0
 
+       key = build_rt_key( msg->sub_id, msg->mtype );                  // what we need to find the route table entry
+       if( msg->sub_id != UNSET_SUBID ) {                                              // if sub id set, allow retry with just mtype if no endpoint when sub-id used
+               altk_ok = 1;
+       }
+
        while( send_again ) {
        while( send_again ) {
-               nn_sock = uta_epsock_rr( ctx->rtable, msg->mtype, group, &send_again );         // round robin select endpoint; again set if mult groups
-               if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d socket=%d len=%d\n", 
-                               msg->mtype, send_again, group, nn_sock, msg->len );
-               group++;
+               max_rt = 1000;
+               nn_sock = uta_epsock_rr( ctx->rtable, key, group, &send_again );                // round robin select endpoint; again set if mult groups
+               if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d socket=%d len=%d ak_ok=%d\n",
+                               msg->mtype, send_again, group, nn_sock, msg->len, altk_ok );
 
                if( nn_sock < 0 ) {
 
                if( nn_sock < 0 ) {
+                       if( altk_ok ) {                                                                                 // ok to retry with alternate key
+                               key = build_rt_key( UNSET_SUBID, msg->mtype );          // build key with just mtype and retry
+                               send_again = 1;
+                               altk_ok = 0;
+                               continue;
+                       }
+
                        msg->state = RMR_ERR_NOENDPT;
                        errno = ENXIO;                                                                                  // must ensure it's not eagain
                        return msg;                                                                                             // caller can resend (maybe) or free
                }
                        msg->state = RMR_ERR_NOENDPT;
                        errno = ENXIO;                                                                                  // must ensure it's not eagain
                        return msg;                                                                                             // caller can resend (maybe) or free
                }
+               group++;
 
                if( send_again ) {
                        clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
 
                if( send_again ) {
                        clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
-                       if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
+                       if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d sub_id=%d len=%d\n", msg->mtype, msg->sub_id, msg->len );
                        msg->flags |= MFL_NOALLOC;                                                              // send should not allocate a new buffer
                        msg = send_msg( ctx, msg, nn_sock );                                    // do the hard work, msg should be nil on success
                        msg->flags |= MFL_NOALLOC;                                                              // send should not allocate a new buffer
                        msg = send_msg( ctx, msg, nn_sock );                                    // do the hard work, msg should be nil on success
-                       /*
-                       if( msg ) {
-                               // error do we need to count successes/errors, how to report some success, esp if last fails?
-                       } 
-                       */
+                       while( max_rt > 0 &&  msg && msg->state == RMR_ERR_RETRY ) {
+                               msg = send_msg( ctx, msg, nn_sock );
+                               max_rt--;
+                       }
 
                        msg = clone_m;                                                                                  // clone will be the next to send
                } else {
                        msg = send_msg( ctx, msg, nn_sock );                                    // send the last, and allocate a new buffer; drops the clone if it was
 
                        msg = clone_m;                                                                                  // clone will be the next to send
                } else {
                        msg = send_msg( ctx, msg, nn_sock );                                    // send the last, and allocate a new buffer; drops the clone if it was
+                       while( max_rt > 0 &&  msg && msg->state == RMR_ERR_RETRY ) {
+                               msg = send_msg( ctx, msg, nn_sock );
+                               max_rt--;
+                       }
                }
        }
 
                }
        }
 
@@ -263,9 +315,9 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
 }
 
 /*
 }
 
 /*
-       Return to sender allows a message to be sent back to the endpoint where it originated. 
+       Return to sender allows a message to be sent back to the endpoint where it originated.
        The source information in the message is used to select the socket on which to write
        The source information in the message is used to select the socket on which to write
-       the message rather than using the message type and round-robin selection. This 
+       the message rather than using the message type and round-robin selection. This
        should return a message buffer with the state of the send operation set. On success
        (state is RMR_OK, the caller may use the buffer for another receive operation), and on
        error it can be passed back to this function to retry the send if desired. On error,
        should return a message buffer with the state of the send operation set. On success
        (state is RMR_OK, the caller may use the buffer for another receive operation), and on
        error it can be passed back to this function to retry the send if desired. On error,
@@ -297,7 +349,7 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
                if( msg != NULL ) {
                        msg->state = RMR_ERR_BADARG;
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
                if( msg != NULL ) {
                        msg->state = RMR_ERR_BADARG;
-               }                       
+               }
                return msg;
        }
 
                return msg;
        }
 
@@ -335,7 +387,7 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
 
        Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
        to ensure that no error was encountered. If the state is UTA_BADARG, then the message
 
        Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
        to ensure that no error was encountered. If the state is UTA_BADARG, then the message
-       may be resent (likely the context pointer was nil).  If the message is sent, but no 
+       may be resent (likely the context pointer was nil).  If the message is sent, but no
        response is received, a nil message is returned with errno set to indicate the likley
        issue:
                ETIMEDOUT -- too many messages were queued before reciving the expected response
        response is received, a nil message is returned with errno set to indicate the likley
        issue:
                ETIMEDOUT -- too many messages were queued before reciving the expected response
@@ -354,7 +406,7 @@ extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                if( msg != NULL ) {
                        msg->state = RMR_ERR_BADARG;
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                if( msg != NULL ) {
                        msg->state = RMR_ERR_BADARG;
-               }                       
+               }
                return msg;
        }
 
                return msg;
        }
 
@@ -391,7 +443,7 @@ extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
                if( old_msg != NULL ) {
                        old_msg->state = RMR_ERR_BADARG;
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
                if( old_msg != NULL ) {
                        old_msg->state = RMR_ERR_BADARG;
-               }                       
+               }
                errno = EINVAL;
                return old_msg;
        }
                errno = EINVAL;
                return old_msg;
        }
@@ -410,34 +462,41 @@ extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
 }
 
 /*
 }
 
 /*
-       Receive with a timeout.  This is a convenience function when sitting on top of 
-       nanomsg as it just sets the rcv timeout and calls rmr_rcv_msg(). 
+       Receive with a timeout.  This is a convenience function when sitting on top of
+       nanomsg as it just sets the rcv timeout and calls rmr_rcv_msg().
 */
 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
 */
 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
-       rmr_set_rtimeout( vctx, ms_to );
+       uta_ctx_t*      ctx;
+
+       if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
+               if( ctx->last_rto != ms_to ) {                                                  // avoid call overhead
+                       rmr_set_rtimeout( vctx, ms_to );
+               }
+       }
+
        return rmr_rcv_msg( vctx, old_msg );
 }
 
 
 /*
        This blocks until the message with the 'expect' ID is received. Messages which are received
        return rmr_rcv_msg( vctx, old_msg );
 }
 
 
 /*
        This blocks until the message with the 'expect' ID is received. Messages which are received
-       before the expected message are queued onto the message ring.  The function will return 
+       before the expected message are queued onto the message ring.  The function will return
        a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
        expected message is received. If the queued message ring fills a nil pointer is returned
        and errno is set to ENOBUFS.
 
        a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
        expected message is received. If the queued message ring fills a nil pointer is returned
        and errno is set to ENOBUFS.
 
-       Generally this will be invoked only by the call() function as it waits for a response, but 
+       Generally this will be invoked only by the call() function as it waits for a response, but
        it is exposed to the user application as three is no reason not to.
 */
 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
        uta_ctx_t*      ctx;
        int     queued = 0;                             // number we pushed into the ring
        int     exp_len = 0;                    // length of expected ID
        it is exposed to the user application as three is no reason not to.
 */
 extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
        uta_ctx_t*      ctx;
        int     queued = 0;                             // number we pushed into the ring
        int     exp_len = 0;                    // length of expected ID
-       
+
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
                if( msg != NULL ) {
                        msg->state = RMR_ERR_BADARG;
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
                if( msg != NULL ) {
                        msg->state = RMR_ERR_BADARG;
-               }                       
+               }
                errno = EINVAL;
                return msg;
        }
                errno = EINVAL;
                return msg;
        }
@@ -497,12 +556,12 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) {
        char*   proto = "tcp";                          // pointer into the proto/port string user supplied
        char*   port;
        char*   proto_port;
        char*   proto = "tcp";                          // pointer into the proto/port string user supplied
        char*   port;
        char*   proto_port;
-       char    wbuf[1024];                                     // work buffer 
+       char    wbuf[1024];                                     // work buffer
        char*   tok;                                            // pointer at token in a buffer
        int             state;
        char*   interface = NULL;                       // interface to bind to pulled from RMR_BIND_IF if set
 
        char*   tok;                                            // pointer at token in a buffer
        int             state;
        char*   interface = NULL;                       // interface to bind to pulled from RMR_BIND_IF if set
 
-       fprintf( stderr, "[INFO] ric message routing library on nanomsg (%s %s.%s.%s built: %s)\n", 
+       fprintf( stderr, "[INFO] ric message routing library on nanomsg (%s %s.%s.%s built: %s)\n",
                        QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
 
        errno = 0;
                        QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
 
        errno = 0;
@@ -520,6 +579,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) {
 
 
        ctx->mring = uta_mk_ring( 128 );                                // message ring to hold asynch msgs received while waiting for call response
 
 
        ctx->mring = uta_mk_ring( 128 );                                // message ring to hold asynch msgs received while waiting for call response
+       ctx->last_rto = -2;                                                             // last receive timeout that was set; invalid value to force first to set
 
        ctx->max_plen = RMR_MAX_RCV_BYTES + sizeof( uta_mhdr_t );               // default max buffer size
        if( max_msg_size > 0 ) {
 
        ctx->max_plen = RMR_MAX_RCV_BYTES + sizeof( uta_mhdr_t );               // default max buffer size
        if( max_msg_size > 0 ) {
@@ -534,7 +594,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) {
 
        uta_lookup_rtg( ctx );                                                  // attempt to fill in rtg info; rtc will handle missing values/errors
 
 
        uta_lookup_rtg( ctx );                                                  // attempt to fill in rtg info; rtc will handle missing values/errors
 
-    ctx->nn_sock = nn_socket( AF_SP, NN_PULL );                // our 'listen' socket should allow multiple senders to connect
+       ctx->nn_sock = nn_socket( AF_SP, NN_PULL );             // our 'listen' socket should allow multiple senders to connect
        if( ctx->nn_sock < 0 ) {
                fprintf( stderr, "[CRIT] rmr_init: unable to initialise nanomsg listen socket: %d\n", errno );
                free_ctx( ctx );
        if( ctx->nn_sock < 0 ) {
                fprintf( stderr, "[CRIT] rmr_init: unable to initialise nanomsg listen socket: %d\n", errno );
                free_ctx( ctx );
@@ -569,7 +629,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) {
                interface = "0.0.0.0";
        }
        snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port );
                interface = "0.0.0.0";
        }
        snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port );
-    if( nn_bind( ctx->nn_sock, bind_info ) < 0) {                      // bind and automatically accept client sessions
+       if( nn_bind( ctx->nn_sock, bind_info ) < 0) {                   // bind and automatically accept client sessions
                fprintf( stderr, "[CRIT] rmr_init: unable to bind nanomsg listen socket for %s: %s\n", bind_info, strerror( errno ) );
                nn_close( ctx->nn_sock );
                free_ctx( ctx );
                fprintf( stderr, "[CRIT] rmr_init: unable to bind nanomsg listen socket for %s: %s\n", bind_info, strerror( errno ) );
                nn_close( ctx->nn_sock );
                free_ctx( ctx );
@@ -586,10 +646,30 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) {
        return (void *) ctx;
 }
 
        return (void *) ctx;
 }
 
+/*
+       This sets the default trace length which will be added to any message buffers
+       allocated.  It can be set at any time, and if rmr_set_trace() is given a
+       trace len that is different than the default allcoated in a message, the message
+       will be resized.
+
+       Returns 0 on failure and 1 on success. If failure, then errno will be set.
+*/
+extern int rmr_init_trace( void* vctx, int tr_len ) {
+       uta_ctx_t* ctx;
+
+       errno = 0;
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               errno = EINVAL;
+               return 0;
+       }
+
+       ctx->trace_data_len = tr_len;
+       return 1;
+}
 
 /*
        Publicly facing initialisation function. Wrapper for the init() funcion above
 
 /*
        Publicly facing initialisation function. Wrapper for the init() funcion above
-       as it needs to ensure internal flags are masked off before calling the 
+       as it needs to ensure internal flags are masked off before calling the
        real workhorse.
 */
 extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
        real workhorse.
 */
 extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
@@ -614,7 +694,7 @@ extern int rmr_ready( void* vctx ) {
 }
 
 /*
 }
 
 /*
-       Provides a non-fatal (compile) interface for the nng only function. 
+       Provides a non-fatal (compile) interface for the nng only function.
        Not supported on top of nano, so this always returns -1.
 */
 extern int rmr_get_rcvfd( void* vctx ) {
        Not supported on top of nano, so this always returns -1.
 */
 extern int rmr_get_rcvfd( void* vctx ) {
@@ -631,6 +711,6 @@ extern void rmr_close( void* vctx ) {
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
                return;
        }
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
                return;
        }
-       
+
        nn_close( ctx->nn_sock );
 }
        nn_close( ctx->nn_sock );
 }