Correct bug identified in static analysis
[ric-plt/lib/rmr.git] / src / rmr / si / src / rmr_si.c
index 8e499c1..54a1bba 100644 (file)
@@ -1,8 +1,8 @@
 // vim: ts=4 sw=4 noet :
 /*
 ==================================================================================
-       Copyright (c) 2019-2020 Nokia
-       Copyright (c) 2018-2020 AT&T Intellectual Property.
+       Copyright (c) 2019-2021 Nokia
+       Copyright (c) 2018-2021 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
 #include "si95/siproto.h"
 
 
+#define SI95_BUILD     1                       // we drop some common functions for si
+
 #include "rmr.h"                               // things the users see
 #include "rmr_agnostic.h"              // agnostic things (must be included before private)
-#include "rmr_si_private.h"    // things that we need too
+#include "rmr_si_private.h"            // things that we need too
+
 #include "rmr_symtab.h"
+#include "rmr_logging.h"
 
 #include "ring_static.c"                       // message ring support
 #include "rt_generic_static.c"         // route table things not transport specific
 #include "rtable_si_static.c"          // route table things -- transport specific
-#include "rtc_si_static.c"                     // specific RMR only route table collector (SI only for now)
+#include "alarm.c"
+#include "rtc_static.c"                                // route table collector (thread code)
 #include "tools_static.c"
 #include "sr_si_static.c"                      // send/receive static functions
 #include "wormholes.c"                         // wormhole api externals and related static functions (must be LAST!)
 
 //------------------------------------------------------------------------------
 
+/*
+       If we have an EP, up the counters based on state.
+       This isn't needed, but it makes driving the code under unit test easier so we
+       induldge in the bean counter's desire for coverage numbers.
+*/
+static inline void incr_ep_counts( int state, endpoint_t* ep ) {
+       if( ep != NULL ) {
+               switch( state ) {
+                       case RMR_OK:
+                               ep->scounts[EPSC_GOOD]++;
+                               break;
+
+                       case RMR_ERR_RETRY:
+                               ep->scounts[EPSC_TRANS]++;
+                               break;
+
+                       default:
+                               ep->scounts[EPSC_FAIL]++;
+                               break;
+               }
+       }
+}
 
 /*
        Clean up a context.
 */
 static void free_ctx( uta_ctx_t* ctx ) {
        if( ctx ) {
-               if( ctx->rtg_addr ) {
+               if( ctx->rtg_addr ){
                        free( ctx->rtg_addr );
                }
+               uta_ring_free( ctx->mring );
+               uta_ring_free( ctx->zcb_mring );
+               if( ctx->chutes ){
+                       free( ctx->chutes );
+               }
+               if( ctx->fd2ep ){
+                       rmr_sym_free( ctx->fd2ep );
+               }
+               if( ctx->my_name ){
+                       free( ctx->my_name );
+               }
+               if( ctx->my_ip ){
+                       free( ctx->my_ip );
+               }
+               if( ctx->rtable ){
+                       rmr_sym_free( ctx->rtable->hash );
+                       free( ctx->rtable );
+               }
+               if ( ctx->ephash ){
+                       free( ctx->ephash );
+               }
+               free( ctx );
        }
 }
 
@@ -100,8 +149,8 @@ static void free_ctx( uta_ctx_t* ctx ) {
 
        The allocated len stored in the msg is:
                transport header length +
-               message header + 
-               user requested payload 
+               message header +
+               user requested payload
 
        The msg header is a combination of the fixed RMR header and the variable
        trace data and d2 fields which may vary for each message.
@@ -163,6 +212,10 @@ extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned
        This provides an external path to the realloc static function as it's called by an
        outward facing mbuf api function. Used to reallocate a message with a different
        trace data size.
+
+       User programmes must use this with CAUTION!  The mbuf passed in is NOT freed and
+       is still valid following this call. The caller is reponsible for maintainting
+       a pointer to both old and new messages and invoking rmr_free_msg() on both!
 */
 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
        return realloc_msg( msg, new_tr_size );
@@ -173,19 +226,34 @@ extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
        Return the message to the available pool, or free it outright.
 */
 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
-       //fprintf( stderr, "SKIPPING FREE: %p\n", mbuf );
-       //return;
-
        if( mbuf == NULL ) {
+               fprintf( stderr, ">>>FREE  nil buffer\n" );
                return;
        }
 
-       if( !mbuf->ring || ! uta_ring_insert( mbuf->ring, mbuf ) ) {                    // just queue, free if ring is full
+#ifdef KEEP
+       if( mbuf->flags & MFL_HUGE ||                                                   // don't cache oversized messages
+               ! mbuf->ring ||                                                                         // cant cache if no ring
+               ! uta_ring_insert( mbuf->ring, mbuf ) ) {                       // or ring is full
+
                if( mbuf->tp_buf ) {
                        free( mbuf->tp_buf );
+                       mbuf->tp_buf = NULL;            // just in case user tries to reuse this mbuf; this will be an NPE
                }
+
+               mbuf->cookie = 0;                       // should signal a bad mbuf (if not reallocated)
                free( mbuf );
        }
+#else
+       // always free, never manage a pool
+       if( mbuf->tp_buf ) {
+               free( mbuf->tp_buf );
+               mbuf->tp_buf = NULL;            // just in case user tries to reuse this mbuf; this will be an NPE
+       }
+
+       mbuf->cookie = 0;                       // should signal a bad mbuf (if not reallocated)
+       free( mbuf );
+#endif
 }
 
 /*
@@ -199,8 +267,8 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
                ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
 
                d1 = DATA1_ADDR( msg->header );
-               d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
-       }       
+               d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                 // must blot out so it doesn't queue on a chute at the other end
+       }
 
        return mtosend_msg( vctx, msg, max_to );
 }
@@ -218,7 +286,7 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
 
                d1 = DATA1_ADDR( msg->header );
                d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
-       }       
+       }
 
        return rmr_mtosend_msg( vctx, msg,  -1 );                                                       // retries < 0  uses default from ctx
 }
@@ -226,16 +294,20 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
 /*
        Return to sender allows a message to be sent back to the endpoint where it originated.
 
-       In the SI world the file descriptor that was the source of the message is captured in
-       the mbuffer and thus can be used to quickly find the target for an RTS call. 
+       With SI95 it was thought that the return to sender would be along the same open conneciton
+       and thus no table lookup would be needed to open a 'reverse direction' path. However, for
+       applications sending at high message rates, returning responses on the same connection
+       causes major strife. Thus the decision was made to use the same method as NNG and just
+       open a second connection for reverse path.
+
+       We will attempt to use the name in the received message to look up the endpoint. If
+       that failes, then we will write on the connection that the message arrived on as a
+       falback.
 
-       The source information in the message is used to select the socket on which to write
-       the message rather than using the message type and round-robin selection. This
-       should return a message buffer with the state of the send operation set. On success
-       (state is RMR_OK, the caller may use the buffer for another receive operation), and on
-       error it can be passed back to this function to retry the send if desired. On error,
-       errno will liklely have the failure reason set by the nng send processing.
-       The following are possible values for the state in the message buffer:
+       On success (state is RMR_OK, the caller may use the buffer for another receive operation),
+       and on error it can be passed back to this function to retry the send if desired. On error,
+       errno will liklely have the failure reason set by the nng send processing.  The following
+       are possible values for the state in the message buffer:
 
        Message states returned:
                RMR_ERR_BADARG - argument (context or msg) was nil or invalid
@@ -248,13 +320,12 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
        failure. The value of errno might give a clue as to what is wrong.
 
        CAUTION:
-               Like send_msg(), this is non-blocking and will return the msg if there is an errror.
+               Like send_msg(), this is non-blocking and will return the msg if there is an error.
                The caller must check for this and handle it properly.
 */
 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
        int                     nn_sock;                        // endpoint socket for send
        uta_ctx_t*      ctx;
-       int                     state;
        char*           hold_src;                       // we need the original source if send fails
        char*           hold_ip;                        // also must hold original ip
        int                     sock_ok = 0;            // true if we found a valid endpoint socket
@@ -271,7 +342,7 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
 
        errno = 0;                                                                                                              // at this point any bad state is in msg returned
        if( msg->header == NULL ) {
-               fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" );
+               rmr_vlog( RMR_VL_ERR, "rmr_send_msg: message had no header\n" );
                msg->state = RMR_ERR_NOHDR;
                msg->tp_state = errno;
                return msg;
@@ -279,45 +350,29 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
 
        ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
 
-/*
-       sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep, ctx->si_ctx );                      // src is always used first for rts
+       sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep );   // always try src first
        if( ! sock_ok ) {
-*/
-       if( (nn_sock = msg->rts_fd) < 0 ) {
-               if( HDR_VERSION( msg->header ) > 2 ) {                                                  // with ver2 the ip is there, try if src name not known
-                       sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep, ctx->si_ctx );
-               }
-               if( ! sock_ok ) {
-                       msg->state = RMR_ERR_NOENDPT;
-                       return msg;                                                                                                                             // preallocated msg can be reused since not given back to nn
+               if( (nn_sock = msg->rts_fd) < 0 ) {
+                       if( HDR_VERSION( msg->header ) > 2 ) {                                                  // with ver2 the ip is there, try if src name not known
+                               sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep  );
+                       }
+                       if( ! sock_ok ) {
+                               msg->state = RMR_ERR_NOENDPT;
+                               return msg;
+                       }
                }
        }
 
-
        msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
        hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
        hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );                                        // both the src host and src ip
-       strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );        // must overlay the source to be ours
+       zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );    // must overlay the source to be ours
        msg = send_msg( ctx, msg, nn_sock, -1 );
        if( msg ) {
-               if( ep != NULL ) {
-                       switch( msg->state ) {
-                               case RMR_OK:
-                                       ep->scounts[EPSC_GOOD]++;
-                                       break;
-                       
-                               case RMR_ERR_RETRY:
-                                       ep->scounts[EPSC_TRANS]++;
-                                       break;
-
-                               default:
-                                       // FIX ME uta_fd_failed( nn_sock );                     // we don't have an ep so this requires a look up/search to mark it failed
-                                       ep->scounts[EPSC_FAIL]++;
-                                       break;
-                       }
-               }
-               strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );    // always return original source so rts can be called again
-               strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );   // always return original source so rts can be called again
+               incr_ep_counts(  msg->state, ep );                              // update counts
+
+               zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );        // always replace original source & ip so rts can be called again
+               zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );
                msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
        }
 
@@ -330,7 +385,7 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
        If multi-threading call is turned on, this invokes that mechanism with the special call
        id of 1 and a max wait of 1 second.  If multi threaded call is not on, then the original
        behavour (described below) is carried out.  This is safe to use when mt is enabled, but
-       the user app is invoking rmr_call() from only one thread, and the caller doesn't need 
+       the user app is invoking rmr_call() from only one thread, and the caller doesn't need
        a flexible timeout.
 
        On timeout this function will return a nil pointer. If the original message could not
@@ -365,7 +420,7 @@ extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
                return msg;
        }
 
-       return rmr_mt_call( vctx, msg, 1, 1000 );               // use the reserved call-id of 1 and wait up to 1 sec
+    return mt_call( vctx, msg, 1, 1000, NULL );                // use the reserved call-id of 1 and wait up to 1 sec
 }
 
 /*
@@ -414,6 +469,10 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
 }
 
 /*
+       DEPRECATED -- this function is not needed in the SI world, and when NNG goes away this will
+               too.  This function likely will not behave as expected in SI, and we are pretty sure it
+               isn't being used as there was an abort triggering reference to rmr_rcv() until now.
+
        This blocks until the message with the 'expect' ID is received. Messages which are received
        before the expected message are queued onto the message ring.  The function will return
        a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
@@ -447,42 +506,45 @@ extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect,
        if( exp_len > RMR_MAX_XID ) {
                exp_len = RMR_MAX_XID;
        }
-       if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n",  expect );
+       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n",  expect );
 
        while( queued < allow2queue ) {
-               msg = rcv_msg( ctx, msg );                                      // hard wait for next
-               if( msg->state == RMR_OK ) {
-                       if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
-                               if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
-                               return msg;
-                       }
-
-                       if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
-                               if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" );
-                               errno = ENOBUFS;
-                               return NULL;
+               msg = rmr_rcv_msg( ctx, msg );                                  // hard wait for next
+               if( msg != NULL ) {
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific checking message; queued=%d allowed=%d state=%d\n",  queued, allow2queue, msg->state );
+                       if( msg->state == RMR_OK ) {
+                               if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
+                                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific matched (%s); %d messages were queued\n", msg->xaction, queued );
+                                       return msg;
+                               }
+
+                               if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
+                                       if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
+                                       errno = ENOBUFS;
+                                       return NULL;
+                               }
+
+                               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
+                               queued++;
+                               msg = NULL;
                        }
-
-                       if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype );
-                       queued++;
-                       msg = NULL;
                }
        }
 
-       if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect );
+       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific timeout waiting for %s\n", expect );
        errno = ETIMEDOUT;
        return NULL;
 }
 
 /*
        Set send timeout. The value time is assumed to be milliseconds.  The timeout is the
-       _rough_ maximum amount of time that RMr will block on a send attempt when the underlying
+       _rough_ maximum amount of time that RMR will block on a send attempt when the underlying
        mechnism indicates eagain or etimeedout.  All other error conditions are reported
        without this delay. Setting a timeout of 0 causes no retries to be attempted in
        RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
        but _without_ issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us)
        after every 1K send attempts until the "time" value is reached. Retries are abandoned
-       if NNG returns anything other than NNG_EAGAIN or NNG_ETIMEDOUT.
+       if NNG returns anything other than EAGAIN or EINTER is returned.
 
        The default, if this function is not used, is 1; meaning that RMr will retry, but will
        not enter a sleep.  In all cases the caller should check the status in the message returned
@@ -511,10 +573,39 @@ extern int rmr_set_stimeout( void* vctx, int time ) {
        CAUTION:  this is not supported as they must be set differently (between create and open) in NNG.
 */
 extern int rmr_set_rtimeout( void* vctx, int time ) {
-       fprintf( stderr, "[WRN] Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" );
+       rmr_vlog( RMR_VL_WARN, "Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" );
        return 0;
 }
 
+/*
+       Common cleanup on initialisation error. These are hard to force, and this helps to ensure
+       all code is tested by providing a callable rather than a block of "goto" code.
+
+       There is a return value so that where we need this we get dinked only for one
+       uncovered line rather than two:
+               init_err(...);
+               return NULL;
+
+       That's a hack, and is yet another example of the testing tail wagging the dog.
+*/
+static inline void* init_err( char* msg, void* ctx, void* port, int errval ) {
+       if( errval != 0 ) {                     // if not letting it be what a sysllib set it to...
+               errno = errval;
+       }
+
+       if( port ) {                            // free things if allocated
+               free( port );
+       }
+       if( ctx ) {
+               free_ctx( ctx );
+       }
+
+       if( msg ) {                                                                     // crit message if supplied
+               rmr_vlog( RMR_VL_CRIT, "rmr_init: %s: %s", msg, strerror( errno ) );
+       }
+
+       return NULL;
+}
 
 /*
        This is the actual init workhorse. The user visible function meerly ensures that the
@@ -522,13 +613,17 @@ extern int rmr_set_rtimeout( void* vctx, int time ) {
        invokes this.  Internal functions (the route table collector) which need additional
        open ports without starting additional route table collectors, will invoke this
        directly with the proper flag.
+
+       CAUTION:   The max_ibm (max inbound message) size is the supplied user max plus the lengths
+                               that we know about. The _user_ should ensure that the supplied length also
+                               includes the trace data length maximum as they are in control of that.
 */
-static void* init(  char* uproto_port, int max_msg_size, int flags ) {
+static void* init( char* uproto_port, int def_msg_size, int flags ) {
        static  int announced = 0;
        uta_ctx_t*      ctx = NULL;
-       char    bind_info[NNG_MAXADDRLEN];      // bind info
+       char    bind_info[256];                         // bind info
        char*   proto = "tcp";                          // pointer into the proto/port string user supplied
-       char*   port;
+       char*   port;                                           // pointer into the proto_port buffer at the port value
        char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
        char*   proto_port;
        char    wbuf[1024];                                     // work buffer
@@ -537,29 +632,42 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        int             static_rtc = 0;                         // if rtg env var is < 1, then we set and don't listen on a port
        int             state;
        int             i;
+       int             old_vlevel;
+
+       old_vlevel = rmr_vlog_init();                   // initialise and get the current level
 
        if( ! announced ) {
-               fprintf( stderr, "[INFO] ric message routing library on SI95/b mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
-                       RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
+               rmr_set_vlevel( RMR_VL_INFO );          // we WILL announce our version
+               rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95 p=%s mv=%d flg=%02x id=a (%s %s.%s.%s built: %s)\n",
+                       uproto_port, RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
                announced = 1;
+
+               rmr_set_vlevel( old_vlevel );           // return logging to the desired state
+               uta_dump_env();                                                 // spit out environment settings meaningful to us if in info mode
        }
 
        errno = 0;
        if( uproto_port == NULL ) {
                proto_port = strdup( DEF_COMM_PORT );
+               rmr_vlog( RMR_VL_WARN, "user passed nil as the listen port, using default: %s\n", proto_port );
        } else {
                proto_port = strdup( uproto_port );             // so we can modify it
        }
 
+       if ( proto_port == NULL ){
+               return init_err( "unable to alloc proto port string", NULL, NULL, ENOMEM );
+       }
+
        if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
-               errno = ENOMEM;
-               return NULL;
+               return init_err( "unable to allocate context", ctx, proto_port, ENOMEM );
        }
        memset( ctx, 0, sizeof( uta_ctx_t ) );
 
-       if( DEBUG ) fprintf( stderr, "[DBUG] rmr_init: allocating 266 rivers\n" );
-       ctx->nrivers = 256;                                                             // number of input flows we'll manage
+       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" );
+       ctx->snarf_rt_fd = -1;
+       ctx->nrivers = MAX_RIVERS;                                              // the array allows for fast index mapping for fd values < max
        ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
+       ctx->river_hash = rmr_sym_alloc( 129 );                         // connections with fd values > FD_MAX have to e hashed
        memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers );
        for( i = 0; i < ctx->nrivers; i++ ) {
                ctx->rivers[i].state = RS_NEW;                          // force allocation of accumulator on first received packet
@@ -567,26 +675,31 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
 
        ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
        ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
-       ctx->max_ibm = max_msg_size;                                    // default to user supplied message size
+       ctx->max_ibm = def_msg_size < 1024 ? 1024 : def_msg_size;                                       // larger than their request doesn't hurt
+       ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + TP_HDR_LEN + 64;             // add in header size, transport hdr, and a bit of fudge
 
        ctx->mring = uta_mk_ring( 4096 );                               // message ring is always on for si
+       ctx->zcb_mring = uta_mk_ring( 128 );                    // zero copy buffer mbuf ring to reduce malloc/free calls
+
+       if( ! (flags & RMRFL_NOLOCK) ) {                                // user did not specifically ask that it be off; turn it on
+               uta_ring_config( ctx->mring, RING_RLOCK );                      // concurrent rcv calls require read lock
+               uta_ring_config( ctx->zcb_mring, RING_WLOCK );          // concurrent free calls from userland require write lock
+               uta_ring_config( ctx->zcb_mring, RING_FRLOCK );         // concurrent message allocatieon calls from userland require read lock, but can be fast
+       } else {
+               rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" );
+       }
        init_mtcall( ctx );                                                             // set up call chutes
+       fd2ep_init( ctx );                                                              // initialise the fd to endpoint sym tab
 
-       ctx->zcb_mring = uta_mk_ring( 128 );                    // zero copy buffer mbuf ring
 
        ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
-       if( max_msg_size > 0 ) {
-               ctx->max_plen = max_msg_size;
+       if( def_msg_size > 0 ) {
+               ctx->max_plen = def_msg_size;
        }
 
-       // we're using a listener to get rtg updates, so we do NOT need this.
-       //uta_lookup_rtg( ctx );                                                        // attempt to fill in rtg info; rtc will handle missing values/errors
-
        ctx->si_ctx = SIinitialise( SI_OPT_FG );                // FIX ME: si needs to streamline and drop fork/bg stuff
        if( ctx->si_ctx == NULL ) {
-               fprintf( stderr, "[CRI] unable to initialise SI95 interface\n" );
-               free_ctx( ctx );
-               return NULL;
+               return init_err( "unable to initialise SI95 interface\n", ctx, proto_port, 0 );
        }
 
        if( (port = strchr( proto_port, ':' )) != NULL ) {
@@ -599,11 +712,10 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        } else {
                port = proto_port;                      // assume something like "1234" was passed
        }
+       rmr_vlog( RMR_VL_INFO, "listen port = %s\n", port );
 
-       if( (tok = getenv( "ENV_RTG_PORT" )) != NULL ) {                                // must check port here -- if < 1 then we just start static file 'listener'
-               if( atoi( tok ) < 1 ) {
-                       static_rtc = 1;
-               }
+       if( (tok = getenv( ENV_RTG_PORT )) != NULL && atoi( tok ) < 0 ) {       // must check here -- if < 0 then we just start static file 'listener'
+               static_rtc = 1;
        }
 
        if( (tok = getenv( ENV_SRC_ID )) != NULL ) {                                                    // env var overrides what we dig from system
@@ -621,8 +733,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
                free( tok );
        } else {
                if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
-                       fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
-                       return NULL;
+                       return init_err( "cannot determine localhost name\n", ctx, proto_port, 0 );
                }
                if( (tok = strchr( wbuf, '.' )) != NULL ) {
                        *tok = 0;                                                                       // we don't keep domain portion
@@ -631,8 +742,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
 
        ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
        if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
-               fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
-               return NULL;
+               return init_err( "hostname + port is too long", ctx, proto_port, EINVAL );
        }
 
        if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
@@ -647,45 +757,65 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        } else {
                ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
                if( ctx->my_ip == NULL ) {
-                       fprintf( stderr, "[WRN] rmr_init: default ip address could not be sussed out, using name\n" );
-                       strcpy( ctx->my_ip, ctx->my_name );                     // if we cannot suss it out, use the name rather than a nil pointer
+                       rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
+                       ctx->my_ip = strdup( ctx->my_name );            // if we cannot suss it out, use the name rather than a nil pointer
                }
        }
-       if( DEBUG ) fprintf( stderr, "[DBUG] default ip address: %s\n", ctx->my_ip );
+       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " default ip address: %s\n", ctx->my_ip );
 
        if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
                if( *tok == '1' ) {
-                       ctx->flags |= CTXFL_WARN;                                       // turn on some warnings (not all, just ones that shouldn't impact performance)
+                       ctx->flags |= CFL_WARN;                                 // turn on some warnings (not all, just ones that shouldn't impact performance)
                }
        }
 
 
-       if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
+       if( (interface = getenv( ENV_BIND_IF )) == NULL ) {             // if specific interface not defined, listen on all
                interface = "0.0.0.0";
        }
-       
-       snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port );           // FIXME -- si only supports 0.0.0.0 by default
+
+       snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port );
        if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
-               fprintf( stderr, "[CRI] rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
-               free_ctx( ctx );
-               return NULL;
+               rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
+               return init_err( NULL, ctx, proto_port, 0 );
+       }
+
+                                                                                               // finish all flag setting before threads to keep helgrind quiet
+       ctx->flags |= CFL_MTC_ENABLED;                          // for SI threaded receiver is the only way
+
+
+       // ---------------- setup for route table collector before invoking ----------------------------------
+       ctx->rtgate = (pthread_mutex_t *) malloc( sizeof( *ctx->rtgate ) );             // single mutex required to gate access to moving rtables
+       if( ctx->rtgate != NULL ) {
+               pthread_mutex_init( ctx->rtgate, NULL );
+       }
+
+       ctx->ephash = rmr_sym_alloc( 129 );                                     // host:port to ep symtab exists outside of any route table
+       if( ctx->ephash == NULL ) {
+               return init_err( "unable to allocate ep hash\n", ctx, proto_port, ENOMEM );
        }
 
-       if( !(flags & FL_NOTHREAD) ) {                                                                                          // skip if internal function that doesnt need a RTC
+       ctx->rtable = rt_clone_space( ctx, NULL, NULL, 0 );     // create an empty route table so that wormhole/rts calls can be used
+       if( flags & RMRFL_NOTHREAD ) {                                          // no thread prevents the collector start for very special cases
+               ctx->rtable_ready = 1;                                                  // route based sends will always fail, but rmr is ready for the non thread case
+       } else {
+               ctx->rtable_ready = 0;                                                  // no sends until a real route table is loaded in the rtc thread
+
                if( static_rtc ) {
+                       rmr_vlog( RMR_VL_INFO, "rmr_init: file based route table only for context on port %s\n", uproto_port );
                        if( pthread_create( &ctx->rtc_th,  NULL, rtc_file, (void *) ctx ) ) {   // kick the rt collector thread as just file reader
-                               fprintf( stderr, "[WRN] rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
+                               rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
                        }
                } else {
+                       rmr_vlog( RMR_VL_INFO, "rmr_init: dynamic route table for context on port %s\n", uproto_port );
                        if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the real rt collector thread
-                               fprintf( stderr, "[WRN] rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
+                               rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
                        }
                }
        }
 
-       ctx->flags |= CFL_MTC_ENABLED;                                                                                          // for SI threaded receiver is the only way
        if( pthread_create( &ctx->mtc_th,  NULL, mt_receive, (void *) ctx ) ) {         // so kick it
-               fprintf( stderr, "[WRN] rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
+               rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
        }
 
        free( proto_port );
@@ -708,8 +838,8 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
                without drastically changing anything. The user should invoke with RMRFL_NONE to
                avoid any misbehavour as there are internal flags which are suported
 */
-extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
-       return init( uproto_port, max_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
+extern void* rmr_init( char* uproto_port, int def_msg_size, int flags ) {
+       return init( uproto_port, def_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
 }
 
 /*
@@ -743,11 +873,7 @@ extern int rmr_ready( void* vctx ) {
                return FALSE;
        }
 
-       if( ctx->rtable != NULL ) {
-               return TRUE;
-       }
-
-       return FALSE;
+       return ctx->rtable_ready;
 }
 
 /*
@@ -764,13 +890,6 @@ extern int rmr_get_rcvfd( void* vctx ) {
                return -1;
        }
 
-/*
-       if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
-               fprintf( stderr, "[WRN] rmr cannot get recv fd: %s\n", nng_strerror( state ) );
-               return -1;
-       }
-*/
-
        return uta_ring_getpfd( ctx->mring );
 }
 
@@ -790,6 +909,10 @@ extern void rmr_close( void* vctx ) {
                return;
        }
 
+       if( ctx->seed_rt_fname != NULL ) {
+               free( ctx->seed_rt_fname );
+       }
+
        ctx->shutdown = 1;
 
        SItp_stats( ctx->si_ctx );                      // dump some interesting stats
@@ -810,7 +933,6 @@ extern void rmr_close( void* vctx ) {
 */
 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
        uta_ctx_t*      ctx;
-       uta_mhdr_t*     hdr;                    // header in the transport buffer
        chute_t*        chute;
        struct timespec ts;                     // time info if we have a timeout
        long    new_ms;                         // adjusted mu-sec
@@ -818,7 +940,7 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
        long    nano_sec;                       // max wait xlated to nano seconds
        int             state;
        rmr_mbuf_t*     ombuf;                  // mbuf user passed; if we timeout we return state here
-       
+
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
                errno = EINVAL;
                if( mbuf ) {
@@ -834,9 +956,11 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
 
        if( max_wait == 0 ) {                                           // one shot poll; handle wihtout sem check as that is SLOW!
                if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
+                       clock_gettime( CLOCK_REALTIME, &ts );                   // pass current time as expriry time
+                       sem_timedwait( &chute->barrier, &ts );                  // must pop the count (ring is locking so if we got a message we can pop)
                        if( ombuf ) {
                                rmr_free_msg( ombuf );                          // can't reuse, caller's must be trashed now
-                       }       
+                       }
                } else {
                        mbuf = ombuf;                                           // return original if it was given with timeout status
                        if( ombuf != NULL ) {
@@ -845,6 +969,10 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
                        }
                }
 
+               if( mbuf != NULL ) {
+                       mbuf->flags |= MFL_ADDSRC;               // turn on so if user app tries to send this buffer we reset src
+               }
+
                return mbuf;
        }
 
@@ -886,9 +1014,10 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
                mbuf = ombuf;                           // return caller's buffer if they passed one in
        } else {
                errno = 0;                                              // interrupted call state could be left; clear
-               if( DEBUG ) fprintf( stderr, "[DBUG] mt_rcv extracting from normal ring\n" );
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " mt_rcv extracting from normal ring\n" );
                if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
                        mbuf->state = RMR_OK;
+                       mbuf->flags |= MFL_ADDSRC;               // turn on so if user app tries to send this buffer we reset src
 
                        if( ombuf ) {
                                rmr_free_msg( ombuf );                                  // we cannot reuse as mbufs are queued on the ring
@@ -905,23 +1034,19 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
        return mbuf;
 }
 
-/*
-       Accept a message buffer and caller ID, send the message and then wait
-       for the receiver to tickle the semaphore letting us know that a message
-       has been received. The call_id is a value between 2 and 255, inclusive; if
-       it's not in this range an error will be returned. Max wait is the amount
-       of time in millaseconds that the call should block for. If 0 is given
-       then no timeout is set.
 
-       If the mt_call feature has not been initialised, then the attempt to use this
-       funciton will fail with RMR_ERR_NOTSUPP
 
-       If no matching message is received before the max_wait period expires, a
-       nil pointer is returned, and errno is set to ETIMEOUT. If any other error
-       occurs after the message has been sent, then a nil pointer is returned
-       with errno set to some other value.
+
+/*
+       This is the work horse for the multi-threaded call() function. It supports
+       both the rmr_mt_call() and the rmr_wormhole wh_call() functions. See the description
+       for for rmr_mt_call() modulo the caveat below.
+
+       If endpoint is given, then we assume that we're not doing normal route table
+       routing and that we should send directly to that endpoint (probably worm
+       hole).
 */
-extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
+static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait, endpoint_t* ep ) {
        rmr_mbuf_t* ombuf;                      // original mbuf passed in
        uta_ctx_t*      ctx;
        uta_mhdr_t*     hdr;                    // header in the transport buffer
@@ -932,7 +1057,7 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
        long    seconds = 0;            // max wait seconds
        long    nano_sec;                       // max wait xlated to nano seconds
        int             state;
-       
+
        errno = EINVAL;
        if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
                if( mbuf ) {
@@ -948,12 +1073,6 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
                return mbuf;
        }
 
-       if( call_id > MAX_CALL_ID || call_id < 2 ) {                                    // 0 and 1 are reserved; user app cannot supply them
-               mbuf->state = RMR_ERR_BADARG;
-               mbuf->tp_state = errno;
-               return mbuf;
-       }
-
        ombuf = mbuf;                                                                                                   // save to return timeout status with
 
        chute = &ctx->chutes[call_id];
@@ -961,7 +1080,7 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
                rmr_free_msg( chute->mbuf );
                chute->mbuf = NULL;
        }
-       
+
        hdr = (uta_mhdr_t *) mbuf->header;
        hdr->flags |= HFL_CALL_MSG;                                                                             // must signal this sent with a call
        memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID );                    // xaction that we will wait for
@@ -970,7 +1089,7 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
        mbuf->flags |= MFL_NOALLOC;                                                                             // send message without allocating a new one (expect nil from mtosend
 
        if( max_wait >= 0 ) {
-               clock_gettime( CLOCK_REALTIME, &ts );   
+               clock_gettime( CLOCK_REALTIME, &ts );
 
                if( max_wait > 999 ) {
                        seconds = max_wait / 1000;
@@ -989,7 +1108,11 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
                seconds = 1;                                                                            // use as flag later to invoked timed wait
        }
 
-       mbuf = mtosend_msg( ctx, mbuf, 0 );                                             // use internal function so as not to strip call-id; should be nil on success!
+       if( ep == NULL ) {                                                                              // normal routing
+               mbuf = mtosend_msg( ctx, mbuf, 0 );                                     // use internal function so as not to strip call-id; should be nil on success!
+       } else {
+               mbuf = send_msg( ctx, mbuf, ep->nn_sock, -1 );
+       }
        if( mbuf ) {
                if( mbuf->state != RMR_OK ) {
                        mbuf->tp_state = errno;
@@ -1024,12 +1147,75 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
        }
 
        mbuf = chute->mbuf;
-       mbuf->state = RMR_OK;
+       if( mbuf != NULL ) {
+               mbuf->state = RMR_OK;
+       }
        chute->mbuf = NULL;
 
        return mbuf;
 }
 
+/*
+       Accept a message buffer and caller ID, send the message and then wait
+       for the receiver to tickle the semaphore letting us know that a message
+       has been received. The call_id is a value between 2 and 255, inclusive; if
+       it's not in this range an error will be returned. Max wait is the amount
+       of time in millaseconds that the call should block for. If 0 is given
+       then no timeout is set.
+
+       If the mt_call feature has not been initialised, then the attempt to use this
+       funciton will fail with RMR_ERR_NOTSUPP
+
+       If no matching message is received before the max_wait period expires, a
+       nil pointer is returned, and errno is set to ETIMEOUT. If any other error
+       occurs after the message has been sent, then a nil pointer is returned
+       with errno set to some other value.
+
+       This is now just an outward facing wrapper so we can support wormhole calls.
+*/
+extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
+
+       // must vet call_id here, all others vetted by workhorse mt_call() function
+       if( call_id > MAX_CALL_ID || call_id < 2 ) {            // 0 and 1 are reserved; user app cannot supply them
+               if( mbuf != NULL ) {
+                       mbuf->state = RMR_ERR_BADARG;
+                       mbuf->tp_state = EINVAL;
+               }
+               return mbuf;
+       }
+
+       return mt_call( vctx, mbuf, call_id, max_wait, NULL );
+}
+
+
+/*
+       Given an existing message buffer, reallocate the payload portion to
+       be at least new_len bytes.  The message header will remain such that
+       the caller may use the rmr_rts_msg() function to return a payload
+       to the sender.
+
+       The mbuf passed in may or may not be reallocated and the caller must
+       use the returned pointer and should NOT assume that it can use the
+       pointer passed in with the exceptions based on the clone flag.
+
+       If the clone flag is set, then a duplicated message, with larger payload
+       size, is allocated and returned.  The old_msg pointer in this situation is
+       still valid and must be explicitly freed by the application. If the clone
+       message is not set (0), then any memory management of the old message is
+       handled by the function.
+
+       If the copy flag is set, the contents of the old message's payload is
+       copied to the reallocated payload.  If the flag is not set, then the
+       contents of the payload is undetermined.
+*/
+extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
+       if( old_msg == NULL ) {
+               return NULL;
+       }
+
+       return realloc_payload( old_msg, new_len, copy, clone );        // message allocation is transport specific, so this is a passthrough
+}
+
 /*
        Enable low latency things in the transport (when supported).
 */