Fix crash under SI95 with multiple receive threads
[ric-plt/lib/rmr.git] / src / rmr / si / src / rmr_si.c
index b1bfd1f..0a4f806 100644 (file)
 #include "si95/socket_if.h"
 #include "si95/siproto.h"
 
 #include "si95/socket_if.h"
 #include "si95/siproto.h"
 
+#define SI95_BUILD     1                       // we drop some common functions for si
 
 #include "rmr.h"                               // things the users see
 #include "rmr_agnostic.h"              // agnostic things (must be included before private)
 
 #include "rmr.h"                               // things the users see
 #include "rmr_agnostic.h"              // agnostic things (must be included before private)
-#include "rmr_si_private.h"    // things that we need too
+#include "rmr_si_private.h"            // things that we need too
 #include "rmr_symtab.h"
 #include "rmr_symtab.h"
+#include "rmr_logging.h"
 
 #include "ring_static.c"                       // message ring support
 #include "rt_generic_static.c"         // route table things not transport specific
 #include "rtable_si_static.c"          // route table things -- transport specific
 
 #include "ring_static.c"                       // message ring support
 #include "rt_generic_static.c"         // route table things not transport specific
 #include "rtable_si_static.c"          // route table things -- transport specific
-#include "rtc_static.c"                                // route table collector
-#include "rtc_si_static.c"                     // our private test function
+#include "rtc_static.c"                                // route table collector (thread code)
 #include "tools_static.c"
 #include "sr_si_static.c"                      // send/receive static functions
 #include "wormholes.c"                         // wormhole api externals and related static functions (must be LAST!)
 #include "tools_static.c"
 #include "sr_si_static.c"                      // send/receive static functions
 #include "wormholes.c"                         // wormhole api externals and related static functions (must be LAST!)
@@ -184,7 +185,10 @@ extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
        if( !mbuf->ring || ! uta_ring_insert( mbuf->ring, mbuf ) ) {                    // just queue, free if ring is full
                if( mbuf->tp_buf ) {
                        free( mbuf->tp_buf );
        if( !mbuf->ring || ! uta_ring_insert( mbuf->ring, mbuf ) ) {                    // just queue, free if ring is full
                if( mbuf->tp_buf ) {
                        free( mbuf->tp_buf );
+                       mbuf->tp_buf = NULL;            // just in case user tries to reuse this mbuf; this will be an NPE
                }
                }
+
+               mbuf->cookie = 0;                       // should signal a bad mbuf (if not reallocated)
                free( mbuf );
        }
 }
                free( mbuf );
        }
 }
@@ -272,7 +276,7 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
 
        errno = 0;                                                                                                              // at this point any bad state is in msg returned
        if( msg->header == NULL ) {
 
        errno = 0;                                                                                                              // at this point any bad state is in msg returned
        if( msg->header == NULL ) {
-               fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" );
+               rmr_vlog( RMR_VL_ERR, "rmr_send_msg: message had no header\n" );
                msg->state = RMR_ERR_NOHDR;
                msg->tp_state = errno;
                return msg;
                msg->state = RMR_ERR_NOHDR;
                msg->tp_state = errno;
                return msg;
@@ -286,7 +290,8 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
 */
        if( (nn_sock = msg->rts_fd) < 0 ) {
                if( HDR_VERSION( msg->header ) > 2 ) {                                                  // with ver2 the ip is there, try if src name not known
 */
        if( (nn_sock = msg->rts_fd) < 0 ) {
                if( HDR_VERSION( msg->header ) > 2 ) {                                                  // with ver2 the ip is there, try if src name not known
-                       sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep, ctx->si_ctx );
+                       //sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep, ctx->si_ctx );
+                       sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep  );
                }
                if( ! sock_ok ) {
                        msg->state = RMR_ERR_NOENDPT;
                }
                if( ! sock_ok ) {
                        msg->state = RMR_ERR_NOENDPT;
@@ -294,7 +299,6 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
                }
        }
 
                }
        }
 
-
        msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
        hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
        hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );                                        // both the src host and src ip
        msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
        hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
        hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );                                        // both the src host and src ip
@@ -448,42 +452,42 @@ extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect,
        if( exp_len > RMR_MAX_XID ) {
                exp_len = RMR_MAX_XID;
        }
        if( exp_len > RMR_MAX_XID ) {
                exp_len = RMR_MAX_XID;
        }
-       if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n",  expect );
+       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n",  expect );
 
        while( queued < allow2queue ) {
                msg = rcv_msg( ctx, msg );                                      // hard wait for next
                if( msg->state == RMR_OK ) {
                        if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
 
        while( queued < allow2queue ) {
                msg = rcv_msg( ctx, msg );                                      // hard wait for next
                if( msg->state == RMR_OK ) {
                        if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
-                               if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
+                               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
                                return msg;
                        }
 
                        if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
                                return msg;
                        }
 
                        if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
-                               if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" );
+                               if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
                                errno = ENOBUFS;
                                return NULL;
                        }
 
                                errno = ENOBUFS;
                                return NULL;
                        }
 
-                       if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype );
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
                        queued++;
                        msg = NULL;
                }
        }
 
                        queued++;
                        msg = NULL;
                }
        }
 
-       if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect );
+       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific timeout waiting for %s\n", expect );
        errno = ETIMEDOUT;
        return NULL;
 }
 
 /*
        Set send timeout. The value time is assumed to be milliseconds.  The timeout is the
        errno = ETIMEDOUT;
        return NULL;
 }
 
 /*
        Set send timeout. The value time is assumed to be milliseconds.  The timeout is the
-       _rough_ maximum amount of time that RMr will block on a send attempt when the underlying
+       _rough_ maximum amount of time that RMR will block on a send attempt when the underlying
        mechnism indicates eagain or etimeedout.  All other error conditions are reported
        without this delay. Setting a timeout of 0 causes no retries to be attempted in
        RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
        but _without_ issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us)
        after every 1K send attempts until the "time" value is reached. Retries are abandoned
        mechnism indicates eagain or etimeedout.  All other error conditions are reported
        without this delay. Setting a timeout of 0 causes no retries to be attempted in
        RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
        but _without_ issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us)
        after every 1K send attempts until the "time" value is reached. Retries are abandoned
-       if NNG returns anything other than NNG_EAGAIN or NNG_ETIMEDOUT.
+       if NNG returns anything other than EAGAIN or EINTER is returned.
 
        The default, if this function is not used, is 1; meaning that RMr will retry, but will
        not enter a sleep.  In all cases the caller should check the status in the message returned
 
        The default, if this function is not used, is 1; meaning that RMr will retry, but will
        not enter a sleep.  In all cases the caller should check the status in the message returned
@@ -512,7 +516,7 @@ extern int rmr_set_stimeout( void* vctx, int time ) {
        CAUTION:  this is not supported as they must be set differently (between create and open) in NNG.
 */
 extern int rmr_set_rtimeout( void* vctx, int time ) {
        CAUTION:  this is not supported as they must be set differently (between create and open) in NNG.
 */
 extern int rmr_set_rtimeout( void* vctx, int time ) {
-       fprintf( stderr, "[WRN] Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" );
+       rmr_vlog( RMR_VL_WARN, "Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" );
        return 0;
 }
 
        return 0;
 }
 
@@ -523,11 +527,15 @@ extern int rmr_set_rtimeout( void* vctx, int time ) {
        invokes this.  Internal functions (the route table collector) which need additional
        open ports without starting additional route table collectors, will invoke this
        directly with the proper flag.
        invokes this.  Internal functions (the route table collector) which need additional
        open ports without starting additional route table collectors, will invoke this
        directly with the proper flag.
+
+       CAUTION:   The max_ibm (max inbound message) size is the supplied user max plus the lengths
+                               that we know about. The _user_ should ensure that the supplied length also
+                               includes the trace data length maximum as they are in control of that.
 */
 static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        static  int announced = 0;
        uta_ctx_t*      ctx = NULL;
 */
 static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        static  int announced = 0;
        uta_ctx_t*      ctx = NULL;
-       char    bind_info[NNG_MAXADDRLEN];      // bind info
+       char    bind_info[256];                         // bind info
        char*   proto = "tcp";                          // pointer into the proto/port string user supplied
        char*   port;
        char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
        char*   proto = "tcp";                          // pointer into the proto/port string user supplied
        char*   port;
        char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
@@ -535,14 +543,20 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        char    wbuf[1024];                                     // work buffer
        char*   tok;                                            // pointer at token in a buffer
        char*   tok2;
        char    wbuf[1024];                                     // work buffer
        char*   tok;                                            // pointer at token in a buffer
        char*   tok2;
+       int             static_rtc = 0;                         // if rtg env var is < 1, then we set and don't listen on a port
        int             state;
        int             i;
        int             state;
        int             i;
+       int             old_vlevel;
+
+       old_vlevel = rmr_vlog_init();                   // initialise and get the current level
+       rmr_set_vlevel( RMR_VL_INFO );          // we WILL announce our version etc
 
        if( ! announced ) {
 
        if( ! announced ) {
-               fprintf( stderr, "[INFO] ric message routing library on SI95 mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
+               rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/f mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
                        RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
                announced = 1;
        }
                        RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
                announced = 1;
        }
+       rmr_set_vlevel( old_vlevel );           // return logging to the desired state
 
        errno = 0;
        if( uproto_port == NULL ) {
 
        errno = 0;
        if( uproto_port == NULL ) {
@@ -557,7 +571,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        }
        memset( ctx, 0, sizeof( uta_ctx_t ) );
 
        }
        memset( ctx, 0, sizeof( uta_ctx_t ) );
 
-       if( DEBUG ) fprintf( stderr, "[DBUG] rmr_init: allocating 266 rivers\n" );
+       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" );
        ctx->nrivers = 256;                                                             // number of input flows we'll manage
        ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
        memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers );
        ctx->nrivers = 256;                                                             // number of input flows we'll manage
        ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
        memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers );
@@ -567,12 +581,21 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
 
        ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
        ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
 
        ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
        ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
-       ctx->max_ibm = max_msg_size;                                    // default to user supplied message size
+       ctx->max_ibm = max_msg_size < 1024 ? 1024 : max_msg_size;                                       // larger than their request doesn't hurt
+       ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + TP_HDR_LEN + 64;             // add in header size, transport hdr, and a bit of fudge
 
        ctx->mring = uta_mk_ring( 4096 );                               // message ring is always on for si
 
        ctx->mring = uta_mk_ring( 4096 );                               // message ring is always on for si
+       ctx->zcb_mring = uta_mk_ring( 128 );                    // zero copy buffer mbuf ring to reduce malloc/free calls
+
+       if( ! (flags & RMRFL_NOLOCK) ) {                                // user did not specifically ask that it be off; turn it on
+               uta_ring_config( ctx->mring, RING_RLOCK );                      // concurrent rcv calls require read lock
+               uta_ring_config( ctx->zcb_mring, RING_WLOCK );          // concurrent free calls from userland require write lock
+       } else {
+               rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" );
+       }
        init_mtcall( ctx );                                                             // set up call chutes
        init_mtcall( ctx );                                                             // set up call chutes
+       fd2ep_init( ctx );                                                              // initialise the fd to endpoint sym tab
 
 
-       ctx->zcb_mring = uta_mk_ring( 128 );                    // zero copy buffer mbuf ring
 
        ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
        if( max_msg_size > 0 ) {
 
        ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
        if( max_msg_size > 0 ) {
@@ -584,7 +607,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
 
        ctx->si_ctx = SIinitialise( SI_OPT_FG );                // FIX ME: si needs to streamline and drop fork/bg stuff
        if( ctx->si_ctx == NULL ) {
 
        ctx->si_ctx = SIinitialise( SI_OPT_FG );                // FIX ME: si needs to streamline and drop fork/bg stuff
        if( ctx->si_ctx == NULL ) {
-               fprintf( stderr, "[CRI] unable to initialise SI95 interface\n" );
+               rmr_vlog( RMR_VL_CRIT, "unable to initialise SI95 interface\n" );
                free_ctx( ctx );
                return NULL;
        }
                free_ctx( ctx );
                return NULL;
        }
@@ -600,6 +623,12 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
                port = proto_port;                      // assume something like "1234" was passed
        }
 
                port = proto_port;                      // assume something like "1234" was passed
        }
 
+       if( (tok = getenv( "ENV_RTG_PORT" )) != NULL ) {                                // must check port here -- if < 1 then we just start static file 'listener'
+               if( atoi( tok ) < 1 ) {
+                       static_rtc = 1;
+               }
+       }
+
        if( (tok = getenv( ENV_SRC_ID )) != NULL ) {                                                    // env var overrides what we dig from system
                tok = strdup( tok );                                    // something we can destroy
                if( *tok == '[' ) {                                             // we allow an ipv6 address here
        if( (tok = getenv( ENV_SRC_ID )) != NULL ) {                                                    // env var overrides what we dig from system
                tok = strdup( tok );                                    // something we can destroy
                if( *tok == '[' ) {                                             // we allow an ipv6 address here
@@ -615,7 +644,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
                free( tok );
        } else {
                if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
                free( tok );
        } else {
                if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
-                       fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
+                       rmr_vlog( RMR_VL_CRIT, "rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
                        return NULL;
                }
                if( (tok = strchr( wbuf, '.' )) != NULL ) {
                        return NULL;
                }
                if( (tok = strchr( wbuf, '.' )) != NULL ) {
@@ -625,7 +654,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
 
        ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
        if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
 
        ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
        if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
-               fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
+               rmr_vlog( RMR_VL_CRIT, "rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
                return NULL;
        }
 
                return NULL;
        }
 
@@ -641,11 +670,11 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        } else {
                ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
                if( ctx->my_ip == NULL ) {
        } else {
                ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
                if( ctx->my_ip == NULL ) {
-                       fprintf( stderr, "[WRN] rmr_init: default ip address could not be sussed out, using name\n" );
+                       rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
                        strcpy( ctx->my_ip, ctx->my_name );                     // if we cannot suss it out, use the name rather than a nil pointer
                }
        }
                        strcpy( ctx->my_ip, ctx->my_name );                     // if we cannot suss it out, use the name rather than a nil pointer
                }
        }
-       if( DEBUG ) fprintf( stderr, "[DBUG] default ip address: %s\n", ctx->my_ip );
+       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " default ip address: %s\n", ctx->my_ip );
 
        if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
                if( *tok == '1' ) {
 
        if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
                if( *tok == '1' ) {
@@ -660,21 +689,28 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        
        snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port );           // FIXME -- si only supports 0.0.0.0 by default
        if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
        
        snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port );           // FIXME -- si only supports 0.0.0.0 by default
        if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
-               fprintf( stderr, "[CRI] rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
+               rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
                free_ctx( ctx );
                return NULL;
        }
 
                free_ctx( ctx );
                return NULL;
        }
 
-       if( !(flags & FL_NOTHREAD) ) {                                                                                          // skip if internal function that doesnt need an rtc
-               if( pthread_create( &ctx->rtc_th,  NULL, rtc_file, (void *) ctx ) ) {   // kick the rt collector thread
-                       fprintf( stderr, "[WRN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
+       if( flags & FL_NOTHREAD ) {                                     // thread set to off; no rout table collector started (could be called by the rtc thread itself)
+               ctx->rtable = rt_clone_space( NULL, NULL, 0 );          // creates an empty route table so that wormholes still can be used
+       } else {
+               if( static_rtc ) {
+                       if( pthread_create( &ctx->rtc_th,  NULL, rtc_file, (void *) ctx ) ) {   // kick the rt collector thread as just file reader
+                               rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
+                       }
+               } else {
+                       if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the real rt collector thread
+                               rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
+                       }
                }
        }
 
                }
        }
 
-       //fprintf( stderr, ">>>>> starting threaded receiver with ctx=%p si_ctx=%p\n", ctx, ctx->si_ctx );
        ctx->flags |= CFL_MTC_ENABLED;                                                                                          // for SI threaded receiver is the only way
        if( pthread_create( &ctx->mtc_th,  NULL, mt_receive, (void *) ctx ) ) {         // so kick it
        ctx->flags |= CFL_MTC_ENABLED;                                                                                          // for SI threaded receiver is the only way
        if( pthread_create( &ctx->mtc_th,  NULL, mt_receive, (void *) ctx ) ) {         // so kick it
-               fprintf( stderr, "[WRN] rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
+               rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
        }
 
        free( proto_port );
        }
 
        free( proto_port );
@@ -755,7 +791,7 @@ extern int rmr_get_rcvfd( void* vctx ) {
 
 /*
        if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
 
 /*
        if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
-               fprintf( stderr, "[WRN] rmr cannot get recv fd: %s\n", nng_strerror( state ) );
+               rmr_vlog( RMR_VL_WARN, "rmr cannot get recv fd: %s\n", nng_strerror( state ) );
                return -1;
        }
 */
                return -1;
        }
 */
@@ -875,7 +911,7 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
                mbuf = ombuf;                           // return caller's buffer if they passed one in
        } else {
                errno = 0;                                              // interrupted call state could be left; clear
                mbuf = ombuf;                           // return caller's buffer if they passed one in
        } else {
                errno = 0;                                              // interrupted call state could be left; clear
-               if( DEBUG ) fprintf( stderr, "[DBUG] mt_rcv extracting from normal ring\n" );
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " mt_rcv extracting from normal ring\n" );
                if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
                        mbuf->state = RMR_OK;
 
                if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
                        mbuf->state = RMR_OK;
 
@@ -1019,6 +1055,34 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
        return mbuf;
 }
 
        return mbuf;
 }
 
+/*
+       Given an existing message buffer, reallocate the payload portion to
+       be at least new_len bytes.  The message header will remain such that
+       the caller may use the rmr_rts_msg() function to return a payload
+       to the sender. 
+
+       The mbuf passed in may or may not be reallocated and the caller must
+       use the returned pointer and should NOT assume that it can use the 
+       pointer passed in with the exceptions based on the clone flag.
+
+       If the clone flag is set, then a duplicated message, with larger payload
+       size, is allocated and returned.  The old_msg pointer in this situation is
+       still valid and must be explicitly freed by the application. If the clone 
+       message is not set (0), then any memory management of the old message is
+       handled by the function.
+
+       If the copy flag is set, the contents of the old message's payload is 
+       copied to the reallocated payload.  If the flag is not set, then the 
+       contents of the payload is undetermined.
+*/
+extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
+       if( old_msg == NULL ) {
+               return NULL;
+       }
+
+       return realloc_payload( old_msg, new_len, copy, clone );        // message allocation is transport specific, so this is a passthrough
+}
+
 /*
        Enable low latency things in the transport (when supported).
 */
 /*
        Enable low latency things in the transport (when supported).
 */