Add ability to save route table updates to disk
[ric-plt/lib/rmr.git] / src / rmr / si / src / rmr_si.c
index 8606241..b8bb2a0 100644 (file)
@@ -67,6 +67,7 @@
 #include "ring_static.c"                       // message ring support
 #include "rt_generic_static.c"         // route table things not transport specific
 #include "rtable_si_static.c"          // route table things -- transport specific
 #include "ring_static.c"                       // message ring support
 #include "rt_generic_static.c"         // route table things not transport specific
 #include "rtable_si_static.c"          // route table things -- transport specific
+#include "alarm.c"
 #include "rtc_static.c"                                // route table collector (thread code)
 #include "tools_static.c"
 #include "sr_si_static.c"                      // send/receive static functions
 #include "rtc_static.c"                                // route table collector (thread code)
 #include "tools_static.c"
 #include "sr_si_static.c"                      // send/receive static functions
 */
 static void free_ctx( uta_ctx_t* ctx ) {
        if( ctx ) {
 */
 static void free_ctx( uta_ctx_t* ctx ) {
        if( ctx ) {
-               if( ctx->rtg_addr ) {
+               if( ctx->rtg_addr ){
                        free( ctx->rtg_addr );
                }
                        free( ctx->rtg_addr );
                }
+               uta_ring_free( ctx->mring );
+               uta_ring_free( ctx->zcb_mring );
+               if( ctx->chutes ){
+                       free( ctx->chutes );
+               }
+               if( ctx->fd2ep ){
+                       rmr_sym_free( ctx->fd2ep );
+               }
+               if( ctx->my_name ){
+                       free( ctx->my_name );
+               }
+               if( ctx->my_ip ){
+                       free( ctx->my_ip );
+               }
+               if( ctx->rtable ){
+                       rmr_sym_free( ctx->rtable->hash );
+                       free( ctx->rtable );
+               }
+               if ( ctx->ephash ){
+                       free( ctx->ephash );
+               }
+               free( ctx );
        }
 }
 
        }
 }
 
@@ -165,6 +188,10 @@ extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned
        This provides an external path to the realloc static function as it's called by an
        outward facing mbuf api function. Used to reallocate a message with a different
        trace data size.
        This provides an external path to the realloc static function as it's called by an
        outward facing mbuf api function. Used to reallocate a message with a different
        trace data size.
+
+       User programmes must use this with CAUTION!  The mbuf passed in is NOT freed and
+       is still valid following this call. The caller is reponsible for maintainting
+       a pointer to both old and new messages and invoking rmr_free_msg() on both!
 */
 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
        return realloc_msg( msg, new_tr_size );
 */
 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
        return realloc_msg( msg, new_tr_size );
@@ -175,14 +202,16 @@ extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
        Return the message to the available pool, or free it outright.
 */
 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
        Return the message to the available pool, or free it outright.
 */
 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
-       //fprintf( stderr, "SKIPPING FREE: %p\n", mbuf );
-       //return;
-
        if( mbuf == NULL ) {
        if( mbuf == NULL ) {
+               fprintf( stderr, ">>>FREE  nil buffer\n" );
                return;
        }
 
                return;
        }
 
-       if( !mbuf->ring || ! uta_ring_insert( mbuf->ring, mbuf ) ) {                    // just queue, free if ring is full
+#ifdef KEEP
+       if( mbuf->flags & MFL_HUGE ||                                                   // don't cache oversized messages
+               ! mbuf->ring ||                                                                         // cant cache if no ring
+               ! uta_ring_insert( mbuf->ring, mbuf ) ) {                       // or ring is full
+
                if( mbuf->tp_buf ) {
                        free( mbuf->tp_buf );
                        mbuf->tp_buf = NULL;            // just in case user tries to reuse this mbuf; this will be an NPE
                if( mbuf->tp_buf ) {
                        free( mbuf->tp_buf );
                        mbuf->tp_buf = NULL;            // just in case user tries to reuse this mbuf; this will be an NPE
@@ -191,6 +220,16 @@ extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
                mbuf->cookie = 0;                       // should signal a bad mbuf (if not reallocated)
                free( mbuf );
        }
                mbuf->cookie = 0;                       // should signal a bad mbuf (if not reallocated)
                free( mbuf );
        }
+#else
+       // always free, never manage a pool
+       if( mbuf->tp_buf ) {
+               free( mbuf->tp_buf );
+               mbuf->tp_buf = NULL;            // just in case user tries to reuse this mbuf; this will be an NPE
+       }
+
+       mbuf->cookie = 0;                       // should signal a bad mbuf (if not reallocated)
+       free( mbuf );
+#endif
 }
 
 /*
 }
 
 /*
@@ -263,7 +302,6 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
        int                     nn_sock;                        // endpoint socket for send
        uta_ctx_t*      ctx;
 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
        int                     nn_sock;                        // endpoint socket for send
        uta_ctx_t*      ctx;
-       int                     state;
        char*           hold_src;                       // we need the original source if send fails
        char*           hold_ip;                        // also must hold original ip
        int                     sock_ok = 0;            // true if we found a valid endpoint socket
        char*           hold_src;                       // we need the original source if send fails
        char*           hold_ip;                        // also must hold original ip
        int                     sock_ok = 0;            // true if we found a valid endpoint socket
@@ -305,7 +343,7 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
        msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
        hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
        hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );                                        // both the src host and src ip
        msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
        hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
        hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );                                        // both the src host and src ip
-       strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );        // must overlay the source to be ours
+       zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );    // must overlay the source to be ours
        msg = send_msg( ctx, msg, nn_sock, -1 );
        if( msg ) {
                if( ep != NULL ) {
        msg = send_msg( ctx, msg, nn_sock, -1 );
        if( msg ) {
                if( ep != NULL ) {
@@ -324,8 +362,8 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
                                        break;
                        }
                }
                                        break;
                        }
                }
-               strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );    // always return original source so rts can be called again
-               strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );   // always return original source so rts can be called again
+               zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );        // always replace original source & ip so rts can be called again
+               zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );
                msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
        }
 
                msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
        }
 
@@ -373,7 +411,7 @@ extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
                return msg;
        }
 
                return msg;
        }
 
-       return rmr_mt_call( vctx, msg, 1, 1000 );               // use the reserved call-id of 1 and wait up to 1 sec
+    return mt_call( vctx, msg, 1, 1000, NULL );                // use the reserved call-id of 1 and wait up to 1 sec
 }
 
 /*
 }
 
 /*
@@ -422,6 +460,10 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
 }
 
 /*
 }
 
 /*
+       DEPRECATED -- this function is not needed in the SI world, and when NNG goes away this will
+               too.  This function likely will not behave as expected in SI, and we are pretty sure it
+               isn't being used as there was an abort triggering reference to rmr_rcv() until now.
+
        This blocks until the message with the 'expect' ID is received. Messages which are received
        before the expected message are queued onto the message ring.  The function will return
        a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
        This blocks until the message with the 'expect' ID is received. Messages which are received
        before the expected message are queued onto the message ring.  The function will return
        a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
@@ -458,22 +500,25 @@ extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect,
        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n",  expect );
 
        while( queued < allow2queue ) {
        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n",  expect );
 
        while( queued < allow2queue ) {
-               msg = rcv_msg( ctx, msg );                                      // hard wait for next
-               if( msg->state == RMR_OK ) {
-                       if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
-                               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
-                               return msg;
-                       }
-
-                       if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
-                               if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
-                               errno = ENOBUFS;
-                               return NULL;
+               msg = rmr_rcv_msg( ctx, msg );                                  // hard wait for next
+               if( msg != NULL ) {
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific checking message; queued=%d allowed=%d state=%d\n",  queued, allow2queue, msg->state );
+                       if( msg->state == RMR_OK ) {
+                               if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
+                                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific matched (%s); %d messages were queued\n", msg->xaction, queued );
+                                       return msg;
+                               }
+
+                               if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
+                                       if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
+                                       errno = ENOBUFS;
+                                       return NULL;
+                               }
+
+                               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
+                               queued++;
+                               msg = NULL;
                        }
                        }
-
-                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
-                       queued++;
-                       msg = NULL;
                }
        }
 
                }
        }
 
@@ -535,12 +580,12 @@ extern int rmr_set_rtimeout( void* vctx, int time ) {
                                that we know about. The _user_ should ensure that the supplied length also
                                includes the trace data length maximum as they are in control of that.
 */
                                that we know about. The _user_ should ensure that the supplied length also
                                includes the trace data length maximum as they are in control of that.
 */
-static void* init(  char* uproto_port, int max_msg_size, int flags ) {
+static void* init( char* uproto_port, int def_msg_size, int flags ) {
        static  int announced = 0;
        uta_ctx_t*      ctx = NULL;
        char    bind_info[256];                         // bind info
        char*   proto = "tcp";                          // pointer into the proto/port string user supplied
        static  int announced = 0;
        uta_ctx_t*      ctx = NULL;
        char    bind_info[256];                         // bind info
        char*   proto = "tcp";                          // pointer into the proto/port string user supplied
-       char*   port;
+       char*   port;                                           // pointer into the proto_port buffer at the port value
        char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
        char*   proto_port;
        char    wbuf[1024];                                     // work buffer
        char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
        char*   proto_port;
        char    wbuf[1024];                                     // work buffer
@@ -552,31 +597,41 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        int             old_vlevel;
 
        old_vlevel = rmr_vlog_init();                   // initialise and get the current level
        int             old_vlevel;
 
        old_vlevel = rmr_vlog_init();                   // initialise and get the current level
-       rmr_set_vlevel( RMR_VL_INFO );          // we WILL announce our version etc
 
        if( ! announced ) {
 
        if( ! announced ) {
-               rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/g mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
-                       RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
+               rmr_set_vlevel( RMR_VL_INFO );          // we WILL announce our version
+               rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95 p=%s mv=%d flg=%02x id=a (%s %s.%s.%s built: %s)\n",
+                       uproto_port, RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
                announced = 1;
                announced = 1;
+
+               rmr_set_vlevel( old_vlevel );           // return logging to the desired state
+               uta_dump_env();                                                 // spit out environment settings meaningful to us if in info mode
        }
        }
-       rmr_set_vlevel( old_vlevel );           // return logging to the desired state
 
        errno = 0;
        if( uproto_port == NULL ) {
                proto_port = strdup( DEF_COMM_PORT );
 
        errno = 0;
        if( uproto_port == NULL ) {
                proto_port = strdup( DEF_COMM_PORT );
+               rmr_vlog( RMR_VL_WARN, "user passed nil as the listen port, using default: %s\n", proto_port );
        } else {
                proto_port = strdup( uproto_port );             // so we can modify it
        }
 
        } else {
                proto_port = strdup( uproto_port );             // so we can modify it
        }
 
-       if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
+       if ( proto_port == NULL ){
                errno = ENOMEM;
                return NULL;
        }
                errno = ENOMEM;
                return NULL;
        }
+
+       if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
+               errno = ENOMEM;
+               goto err;
+       }
        memset( ctx, 0, sizeof( uta_ctx_t ) );
 
        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" );
        memset( ctx, 0, sizeof( uta_ctx_t ) );
 
        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" );
-       ctx->nrivers = 256;                                                             // number of input flows we'll manage
+       ctx->snarf_rt_fd = -1;
+       ctx->nrivers = MAX_RIVERS;                                              // the array allows for fast index mapping for fd values < max
        ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
        ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
+       ctx->river_hash = rmr_sym_alloc( 129 );                         // connections with fd values > FD_MAX have to e hashed
        memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers );
        for( i = 0; i < ctx->nrivers; i++ ) {
                ctx->rivers[i].state = RS_NEW;                          // force allocation of accumulator on first received packet
        memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers );
        for( i = 0; i < ctx->nrivers; i++ ) {
                ctx->rivers[i].state = RS_NEW;                          // force allocation of accumulator on first received packet
@@ -584,7 +639,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
 
        ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
        ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
 
        ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
        ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
-       ctx->max_ibm = max_msg_size < 1024 ? 1024 : max_msg_size;                                       // larger than their request doesn't hurt
+       ctx->max_ibm = def_msg_size < 1024 ? 1024 : def_msg_size;                                       // larger than their request doesn't hurt
        ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + TP_HDR_LEN + 64;             // add in header size, transport hdr, and a bit of fudge
 
        ctx->mring = uta_mk_ring( 4096 );                               // message ring is always on for si
        ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + TP_HDR_LEN + 64;             // add in header size, transport hdr, and a bit of fudge
 
        ctx->mring = uta_mk_ring( 4096 );                               // message ring is always on for si
@@ -593,6 +648,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        if( ! (flags & RMRFL_NOLOCK) ) {                                // user did not specifically ask that it be off; turn it on
                uta_ring_config( ctx->mring, RING_RLOCK );                      // concurrent rcv calls require read lock
                uta_ring_config( ctx->zcb_mring, RING_WLOCK );          // concurrent free calls from userland require write lock
        if( ! (flags & RMRFL_NOLOCK) ) {                                // user did not specifically ask that it be off; turn it on
                uta_ring_config( ctx->mring, RING_RLOCK );                      // concurrent rcv calls require read lock
                uta_ring_config( ctx->zcb_mring, RING_WLOCK );          // concurrent free calls from userland require write lock
+               uta_ring_config( ctx->zcb_mring, RING_FRLOCK );         // concurrent message allocatieon calls from userland require read lock, but can be fast
        } else {
                rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" );
        }
        } else {
                rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" );
        }
@@ -601,18 +657,14 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
 
 
        ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
 
 
        ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
-       if( max_msg_size > 0 ) {
-               ctx->max_plen = max_msg_size;
+       if( def_msg_size > 0 ) {
+               ctx->max_plen = def_msg_size;
        }
 
        }
 
-       // we're using a listener to get rtg updates, so we do NOT need this.
-       //uta_lookup_rtg( ctx );                                                        // attempt to fill in rtg info; rtc will handle missing values/errors
-
        ctx->si_ctx = SIinitialise( SI_OPT_FG );                // FIX ME: si needs to streamline and drop fork/bg stuff
        if( ctx->si_ctx == NULL ) {
                rmr_vlog( RMR_VL_CRIT, "unable to initialise SI95 interface\n" );
        ctx->si_ctx = SIinitialise( SI_OPT_FG );                // FIX ME: si needs to streamline and drop fork/bg stuff
        if( ctx->si_ctx == NULL ) {
                rmr_vlog( RMR_VL_CRIT, "unable to initialise SI95 interface\n" );
-               free_ctx( ctx );
-               return NULL;
+               goto err;
        }
 
        if( (port = strchr( proto_port, ':' )) != NULL ) {
        }
 
        if( (port = strchr( proto_port, ':' )) != NULL ) {
@@ -625,11 +677,10 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        } else {
                port = proto_port;                      // assume something like "1234" was passed
        }
        } else {
                port = proto_port;                      // assume something like "1234" was passed
        }
+       rmr_vlog( RMR_VL_INFO, "listen port = %s\n", port );
 
 
-       if( (tok = getenv( "ENV_RTG_PORT" )) != NULL ) {                                // must check port here -- if < 1 then we just start static file 'listener'
-               if( atoi( tok ) < 1 ) {
-                       static_rtc = 1;
-               }
+       if( (tok = getenv( ENV_RTG_PORT )) != NULL && atoi( tok ) < 0 ) {       // must check here -- if < 0 then we just start static file 'listener'
+               static_rtc = 1;
        }
 
        if( (tok = getenv( ENV_SRC_ID )) != NULL ) {                                                    // env var overrides what we dig from system
        }
 
        if( (tok = getenv( ENV_SRC_ID )) != NULL ) {                                                    // env var overrides what we dig from system
@@ -648,7 +699,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        } else {
                if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
                        rmr_vlog( RMR_VL_CRIT, "rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
        } else {
                if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
                        rmr_vlog( RMR_VL_CRIT, "rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
-                       return NULL;
+                       goto err;
                }
                if( (tok = strchr( wbuf, '.' )) != NULL ) {
                        *tok = 0;                                                                       // we don't keep domain portion
                }
                if( (tok = strchr( wbuf, '.' )) != NULL ) {
                        *tok = 0;                                                                       // we don't keep domain portion
@@ -658,7 +709,8 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
        if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
                rmr_vlog( RMR_VL_CRIT, "rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
        ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
        if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
                rmr_vlog( RMR_VL_CRIT, "rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
-               return NULL;
+               errno = EINVAL;
+               goto err;
        }
 
        if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
        }
 
        if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
@@ -674,7 +726,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
                ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
                if( ctx->my_ip == NULL ) {
                        rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
                ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
                if( ctx->my_ip == NULL ) {
                        rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
-                       strcpy( ctx->my_ip, ctx->my_name );                     // if we cannot suss it out, use the name rather than a nil pointer
+                       ctx->my_ip = strdup( ctx->my_name );            // if we cannot suss it out, use the name rather than a nil pointer
                }
        }
        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " default ip address: %s\n", ctx->my_ip );
                }
        }
        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " default ip address: %s\n", ctx->my_ip );
@@ -686,28 +738,46 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        }
 
 
        }
 
 
-       if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
+       if( (interface = getenv( ENV_BIND_IF )) == NULL ) {             // if specific interface not defined, listen on all
                interface = "0.0.0.0";
        }
 
                interface = "0.0.0.0";
        }
 
-       snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port );           // FIXME -- si only supports 0.0.0.0 by default
+       snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port );
        if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
                rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
        if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
                rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
-               free_ctx( ctx );
-               return NULL;
+               goto err;
        }
 
                                                                                                // finish all flag setting before threads to keep helgrind quiet
        ctx->flags |= CFL_MTC_ENABLED;                          // for SI threaded receiver is the only way
 
        }
 
                                                                                                // finish all flag setting before threads to keep helgrind quiet
        ctx->flags |= CFL_MTC_ENABLED;                          // for SI threaded receiver is the only way
 
-       if( flags & RMRFL_NOTHREAD ) {                          // thread set to off; no route table collector started (could be called by the rtc thread itself)
-               ctx->rtable = rt_clone_space( NULL, NULL, 0 );          // creates an empty route table so that wormholes still can be used
+
+       // ---------------- setup for route table collector before invoking ----------------------------------
+       ctx->rtgate = (pthread_mutex_t *) malloc( sizeof( *ctx->rtgate ) );             // single mutex required to gate access to moving rtables
+       if( ctx->rtgate != NULL ) {
+               pthread_mutex_init( ctx->rtgate, NULL );
+       }
+
+       ctx->ephash = rmr_sym_alloc( 129 );                                     // host:port to ep symtab exists outside of any route table
+       if( ctx->ephash == NULL ) {
+               rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to allocate ep hash\n" );
+               errno = ENOMEM;
+               goto err;
+       }
+
+       ctx->rtable = rt_clone_space( ctx, NULL, NULL, 0 );     // create an empty route table so that wormhole/rts calls can be used
+       if( flags & RMRFL_NOTHREAD ) {                                          // no thread prevents the collector start for very special cases
+               ctx->rtable_ready = 1;                                                  // route based sends will always fail, but rmr is ready for the non thread case
        } else {
        } else {
+               ctx->rtable_ready = 0;                                                  // no sends until a real route table is loaded in the rtc thread
+
                if( static_rtc ) {
                if( static_rtc ) {
+                       rmr_vlog( RMR_VL_INFO, "rmr_init: file based route table only for context on port %s\n", uproto_port );
                        if( pthread_create( &ctx->rtc_th,  NULL, rtc_file, (void *) ctx ) ) {   // kick the rt collector thread as just file reader
                                rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
                        }
                } else {
                        if( pthread_create( &ctx->rtc_th,  NULL, rtc_file, (void *) ctx ) ) {   // kick the rt collector thread as just file reader
                                rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
                        }
                } else {
+                       rmr_vlog( RMR_VL_INFO, "rmr_init: dynamic route table for context on port %s\n", uproto_port );
                        if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the real rt collector thread
                                rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
                        }
                        if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the real rt collector thread
                                rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
                        }
@@ -720,6 +790,11 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
 
        free( proto_port );
        return (void *) ctx;
 
        free( proto_port );
        return (void *) ctx;
+
+err:
+       free( proto_port );
+       free_ctx( ctx );
+       return NULL;
 }
 
 /*
 }
 
 /*
@@ -738,8 +813,8 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
                without drastically changing anything. The user should invoke with RMRFL_NONE to
                avoid any misbehavour as there are internal flags which are suported
 */
                without drastically changing anything. The user should invoke with RMRFL_NONE to
                avoid any misbehavour as there are internal flags which are suported
 */
-extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
-       return init( uproto_port, max_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
+extern void* rmr_init( char* uproto_port, int def_msg_size, int flags ) {
+       return init( uproto_port, def_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
 }
 
 /*
 }
 
 /*
@@ -773,11 +848,7 @@ extern int rmr_ready( void* vctx ) {
                return FALSE;
        }
 
                return FALSE;
        }
 
-       if( ctx->rtable != NULL ) {
-               return TRUE;
-       }
-
-       return FALSE;
+       return ctx->rtable_ready;
 }
 
 /*
 }
 
 /*
@@ -794,13 +865,6 @@ extern int rmr_get_rcvfd( void* vctx ) {
                return -1;
        }
 
                return -1;
        }
 
-/*
-       if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
-               rmr_vlog( RMR_VL_WARN, "rmr cannot get recv fd: %s\n", nng_strerror( state ) );
-               return -1;
-       }
-*/
-
        return uta_ring_getpfd( ctx->mring );
 }
 
        return uta_ring_getpfd( ctx->mring );
 }
 
@@ -840,7 +904,6 @@ extern void rmr_close( void* vctx ) {
 */
 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
        uta_ctx_t*      ctx;
 */
 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
        uta_ctx_t*      ctx;
-       uta_mhdr_t*     hdr;                    // header in the transport buffer
        chute_t*        chute;
        struct timespec ts;                     // time info if we have a timeout
        long    new_ms;                         // adjusted mu-sec
        chute_t*        chute;
        struct timespec ts;                     // time info if we have a timeout
        long    new_ms;                         // adjusted mu-sec
@@ -864,6 +927,8 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
 
        if( max_wait == 0 ) {                                           // one shot poll; handle wihtout sem check as that is SLOW!
                if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
 
        if( max_wait == 0 ) {                                           // one shot poll; handle wihtout sem check as that is SLOW!
                if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
+                       clock_gettime( CLOCK_REALTIME, &ts );                   // pass current time as expriry time
+                       sem_timedwait( &chute->barrier, &ts );                  // must pop the count (ring is locking so if we got a message we can pop)
                        if( ombuf ) {
                                rmr_free_msg( ombuf );                          // can't reuse, caller's must be trashed now
                        }
                        if( ombuf ) {
                                rmr_free_msg( ombuf );                          // can't reuse, caller's must be trashed now
                        }
@@ -878,6 +943,7 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
                if( mbuf != NULL ) {
                        mbuf->flags |= MFL_ADDSRC;               // turn on so if user app tries to send this buffer we reset src
                }
                if( mbuf != NULL ) {
                        mbuf->flags |= MFL_ADDSRC;               // turn on so if user app tries to send this buffer we reset src
                }
+
                return mbuf;
        }
 
                return mbuf;
        }
 
@@ -978,12 +1044,6 @@ static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_w
                return mbuf;
        }
 
                return mbuf;
        }
 
-       if( call_id > MAX_CALL_ID || call_id < 2 ) {                                    // 0 and 1 are reserved; user app cannot supply them
-               mbuf->state = RMR_ERR_BADARG;
-               mbuf->tp_state = errno;
-               return mbuf;
-       }
-
        ombuf = mbuf;                                                                                                   // save to return timeout status with
 
        chute = &ctx->chutes[call_id];
        ombuf = mbuf;                                                                                                   // save to return timeout status with
 
        chute = &ctx->chutes[call_id];
@@ -1085,6 +1145,16 @@ static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_w
        This is now just an outward facing wrapper so we can support wormhole calls.
 */
 extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
        This is now just an outward facing wrapper so we can support wormhole calls.
 */
 extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
+
+       // must vet call_id here, all others vetted by workhorse mt_call() function
+       if( call_id > MAX_CALL_ID || call_id < 2 ) {            // 0 and 1 are reserved; user app cannot supply them
+               if( mbuf != NULL ) {
+                       mbuf->state = RMR_ERR_BADARG;
+                       mbuf->tp_state = EINVAL;
+               }
+               return mbuf;
+       }
+
        return mt_call( vctx, mbuf, call_id, max_wait, NULL );
 }
 
        return mt_call( vctx, mbuf, call_id, max_wait, NULL );
 }