Correct bug identified in static analysis
[ric-plt/lib/rmr.git] / src / rmr / si / src / rmr_si.c
index 047401d..54a1bba 100644 (file)
@@ -1,8 +1,8 @@
 // vim: ts=4 sw=4 noet :
 /*
 ==================================================================================
 // vim: ts=4 sw=4 noet :
 /*
 ==================================================================================
-       Copyright (c) 2019-2020 Nokia
-       Copyright (c) 2018-2020 AT&T Intellectual Property.
+       Copyright (c) 2019-2021 Nokia
+       Copyright (c) 2018-2021 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
 #include "si95/socket_if.h"
 #include "si95/siproto.h"
 
 #include "si95/socket_if.h"
 #include "si95/siproto.h"
 
+
 #define SI95_BUILD     1                       // we drop some common functions for si
 
 #include "rmr.h"                               // things the users see
 #include "rmr_agnostic.h"              // agnostic things (must be included before private)
 #include "rmr_si_private.h"            // things that we need too
 #define SI95_BUILD     1                       // we drop some common functions for si
 
 #include "rmr.h"                               // things the users see
 #include "rmr_agnostic.h"              // agnostic things (must be included before private)
 #include "rmr_si_private.h"            // things that we need too
+
 #include "rmr_symtab.h"
 #include "rmr_logging.h"
 
 #include "ring_static.c"                       // message ring support
 #include "rt_generic_static.c"         // route table things not transport specific
 #include "rtable_si_static.c"          // route table things -- transport specific
 #include "rmr_symtab.h"
 #include "rmr_logging.h"
 
 #include "ring_static.c"                       // message ring support
 #include "rt_generic_static.c"         // route table things not transport specific
 #include "rtable_si_static.c"          // route table things -- transport specific
+#include "alarm.c"
 #include "rtc_static.c"                                // route table collector (thread code)
 #include "tools_static.c"
 #include "sr_si_static.c"                      // send/receive static functions
 #include "rtc_static.c"                                // route table collector (thread code)
 #include "tools_static.c"
 #include "sr_si_static.c"                      // send/receive static functions
 
 //------------------------------------------------------------------------------
 
 
 //------------------------------------------------------------------------------
 
+/*
+       If we have an EP, up the counters based on state.
+       This isn't needed, but it makes driving the code under unit test easier so we
+       induldge in the bean counter's desire for coverage numbers.
+*/
+static inline void incr_ep_counts( int state, endpoint_t* ep ) {
+       if( ep != NULL ) {
+               switch( state ) {
+                       case RMR_OK:
+                               ep->scounts[EPSC_GOOD]++;
+                               break;
+
+                       case RMR_ERR_RETRY:
+                               ep->scounts[EPSC_TRANS]++;
+                               break;
+
+                       default:
+                               ep->scounts[EPSC_FAIL]++;
+                               break;
+               }
+       }
+}
 
 /*
        Clean up a context.
 */
 static void free_ctx( uta_ctx_t* ctx ) {
        if( ctx ) {
 
 /*
        Clean up a context.
 */
 static void free_ctx( uta_ctx_t* ctx ) {
        if( ctx ) {
-               if( ctx->rtg_addr ) {
+               if( ctx->rtg_addr ){
                        free( ctx->rtg_addr );
                }
                        free( ctx->rtg_addr );
                }
+               uta_ring_free( ctx->mring );
+               uta_ring_free( ctx->zcb_mring );
+               if( ctx->chutes ){
+                       free( ctx->chutes );
+               }
+               if( ctx->fd2ep ){
+                       rmr_sym_free( ctx->fd2ep );
+               }
+               if( ctx->my_name ){
+                       free( ctx->my_name );
+               }
+               if( ctx->my_ip ){
+                       free( ctx->my_ip );
+               }
+               if( ctx->rtable ){
+                       rmr_sym_free( ctx->rtable->hash );
+                       free( ctx->rtable );
+               }
+               if ( ctx->ephash ){
+                       free( ctx->ephash );
+               }
+               free( ctx );
        }
 }
 
        }
 }
 
@@ -165,6 +212,10 @@ extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned
        This provides an external path to the realloc static function as it's called by an
        outward facing mbuf api function. Used to reallocate a message with a different
        trace data size.
        This provides an external path to the realloc static function as it's called by an
        outward facing mbuf api function. Used to reallocate a message with a different
        trace data size.
+
+       User programmes must use this with CAUTION!  The mbuf passed in is NOT freed and
+       is still valid following this call. The caller is reponsible for maintainting
+       a pointer to both old and new messages and invoking rmr_free_msg() on both!
 */
 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
        return realloc_msg( msg, new_tr_size );
 */
 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
        return realloc_msg( msg, new_tr_size );
@@ -175,14 +226,16 @@ extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
        Return the message to the available pool, or free it outright.
 */
 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
        Return the message to the available pool, or free it outright.
 */
 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
-       //fprintf( stderr, "SKIPPING FREE: %p\n", mbuf );
-       //return;
-
        if( mbuf == NULL ) {
        if( mbuf == NULL ) {
+               fprintf( stderr, ">>>FREE  nil buffer\n" );
                return;
        }
 
                return;
        }
 
-       if( !mbuf->ring || ! uta_ring_insert( mbuf->ring, mbuf ) ) {                    // just queue, free if ring is full
+#ifdef KEEP
+       if( mbuf->flags & MFL_HUGE ||                                                   // don't cache oversized messages
+               ! mbuf->ring ||                                                                         // cant cache if no ring
+               ! uta_ring_insert( mbuf->ring, mbuf ) ) {                       // or ring is full
+
                if( mbuf->tp_buf ) {
                        free( mbuf->tp_buf );
                        mbuf->tp_buf = NULL;            // just in case user tries to reuse this mbuf; this will be an NPE
                if( mbuf->tp_buf ) {
                        free( mbuf->tp_buf );
                        mbuf->tp_buf = NULL;            // just in case user tries to reuse this mbuf; this will be an NPE
@@ -191,6 +244,16 @@ extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
                mbuf->cookie = 0;                       // should signal a bad mbuf (if not reallocated)
                free( mbuf );
        }
                mbuf->cookie = 0;                       // should signal a bad mbuf (if not reallocated)
                free( mbuf );
        }
+#else
+       // always free, never manage a pool
+       if( mbuf->tp_buf ) {
+               free( mbuf->tp_buf );
+               mbuf->tp_buf = NULL;            // just in case user tries to reuse this mbuf; this will be an NPE
+       }
+
+       mbuf->cookie = 0;                       // should signal a bad mbuf (if not reallocated)
+       free( mbuf );
+#endif
 }
 
 /*
 }
 
 /*
@@ -204,7 +267,7 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
                ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
 
                d1 = DATA1_ADDR( msg->header );
                ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
 
                d1 = DATA1_ADDR( msg->header );
-               d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
+               d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                 // must blot out so it doesn't queue on a chute at the other end
        }
 
        return mtosend_msg( vctx, msg, max_to );
        }
 
        return mtosend_msg( vctx, msg, max_to );
@@ -263,7 +326,6 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
        int                     nn_sock;                        // endpoint socket for send
        uta_ctx_t*      ctx;
 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
        int                     nn_sock;                        // endpoint socket for send
        uta_ctx_t*      ctx;
-       int                     state;
        char*           hold_src;                       // we need the original source if send fails
        char*           hold_ip;                        // also must hold original ip
        int                     sock_ok = 0;            // true if we found a valid endpoint socket
        char*           hold_src;                       // we need the original source if send fails
        char*           hold_ip;                        // also must hold original ip
        int                     sock_ok = 0;            // true if we found a valid endpoint socket
@@ -301,31 +363,16 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
                }
        }
 
                }
        }
 
-
        msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
        hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
        hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );                                        // both the src host and src ip
        msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
        hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
        hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );                                        // both the src host and src ip
-       strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );        // must overlay the source to be ours
+       zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );    // must overlay the source to be ours
        msg = send_msg( ctx, msg, nn_sock, -1 );
        if( msg ) {
        msg = send_msg( ctx, msg, nn_sock, -1 );
        if( msg ) {
-               if( ep != NULL ) {
-                       switch( msg->state ) {
-                               case RMR_OK:
-                                       ep->scounts[EPSC_GOOD]++;
-                                       break;
-
-                               case RMR_ERR_RETRY:
-                                       ep->scounts[EPSC_TRANS]++;
-                                       break;
-
-                               default:
-                                       // FIX ME uta_fd_failed( nn_sock );                     // we don't have an ep so this requires a look up/search to mark it failed
-                                       ep->scounts[EPSC_FAIL]++;
-                                       break;
-                       }
-               }
-               strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );    // always return original source so rts can be called again
-               strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );   // always return original source so rts can be called again
+               incr_ep_counts(  msg->state, ep );                              // update counts
+
+               zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );        // always replace original source & ip so rts can be called again
+               zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );
                msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
        }
 
                msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
        }
 
@@ -373,7 +420,7 @@ extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
                return msg;
        }
 
                return msg;
        }
 
-       return rmr_mt_call( vctx, msg, 1, 1000 );               // use the reserved call-id of 1 and wait up to 1 sec
+    return mt_call( vctx, msg, 1, 1000, NULL );                // use the reserved call-id of 1 and wait up to 1 sec
 }
 
 /*
 }
 
 /*
@@ -422,6 +469,10 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
 }
 
 /*
 }
 
 /*
+       DEPRECATED -- this function is not needed in the SI world, and when NNG goes away this will
+               too.  This function likely will not behave as expected in SI, and we are pretty sure it
+               isn't being used as there was an abort triggering reference to rmr_rcv() until now.
+
        This blocks until the message with the 'expect' ID is received. Messages which are received
        before the expected message are queued onto the message ring.  The function will return
        a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
        This blocks until the message with the 'expect' ID is received. Messages which are received
        before the expected message are queued onto the message ring.  The function will return
        a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
@@ -458,22 +509,25 @@ extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect,
        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n",  expect );
 
        while( queued < allow2queue ) {
        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n",  expect );
 
        while( queued < allow2queue ) {
-               msg = rcv_msg( ctx, msg );                                      // hard wait for next
-               if( msg->state == RMR_OK ) {
-                       if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
-                               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
-                               return msg;
-                       }
-
-                       if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
-                               if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
-                               errno = ENOBUFS;
-                               return NULL;
+               msg = rmr_rcv_msg( ctx, msg );                                  // hard wait for next
+               if( msg != NULL ) {
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific checking message; queued=%d allowed=%d state=%d\n",  queued, allow2queue, msg->state );
+                       if( msg->state == RMR_OK ) {
+                               if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
+                                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific matched (%s); %d messages were queued\n", msg->xaction, queued );
+                                       return msg;
+                               }
+
+                               if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
+                                       if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
+                                       errno = ENOBUFS;
+                                       return NULL;
+                               }
+
+                               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
+                               queued++;
+                               msg = NULL;
                        }
                        }
-
-                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
-                       queued++;
-                       msg = NULL;
                }
        }
 
                }
        }
 
@@ -523,6 +577,35 @@ extern int rmr_set_rtimeout( void* vctx, int time ) {
        return 0;
 }
 
        return 0;
 }
 
+/*
+       Common cleanup on initialisation error. These are hard to force, and this helps to ensure
+       all code is tested by providing a callable rather than a block of "goto" code.
+
+       There is a return value so that where we need this we get dinked only for one
+       uncovered line rather than two:
+               init_err(...);
+               return NULL;
+
+       That's a hack, and is yet another example of the testing tail wagging the dog.
+*/
+static inline void* init_err( char* msg, void* ctx, void* port, int errval ) {
+       if( errval != 0 ) {                     // if not letting it be what a sysllib set it to...
+               errno = errval;
+       }
+
+       if( port ) {                            // free things if allocated
+               free( port );
+       }
+       if( ctx ) {
+               free_ctx( ctx );
+       }
+
+       if( msg ) {                                                                     // crit message if supplied
+               rmr_vlog( RMR_VL_CRIT, "rmr_init: %s: %s", msg, strerror( errno ) );
+       }
+
+       return NULL;
+}
 
 /*
        This is the actual init workhorse. The user visible function meerly ensures that the
 
 /*
        This is the actual init workhorse. The user visible function meerly ensures that the
@@ -535,12 +618,12 @@ extern int rmr_set_rtimeout( void* vctx, int time ) {
                                that we know about. The _user_ should ensure that the supplied length also
                                includes the trace data length maximum as they are in control of that.
 */
                                that we know about. The _user_ should ensure that the supplied length also
                                includes the trace data length maximum as they are in control of that.
 */
-static void* init(  char* uproto_port, int max_msg_size, int flags ) {
+static void* init( char* uproto_port, int def_msg_size, int flags ) {
        static  int announced = 0;
        uta_ctx_t*      ctx = NULL;
        char    bind_info[256];                         // bind info
        char*   proto = "tcp";                          // pointer into the proto/port string user supplied
        static  int announced = 0;
        uta_ctx_t*      ctx = NULL;
        char    bind_info[256];                         // bind info
        char*   proto = "tcp";                          // pointer into the proto/port string user supplied
-       char*   port;
+       char*   port;                                           // pointer into the proto_port buffer at the port value
        char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
        char*   proto_port;
        char    wbuf[1024];                                     // work buffer
        char*   interface = NULL;                       // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
        char*   proto_port;
        char    wbuf[1024];                                     // work buffer
@@ -552,31 +635,39 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        int             old_vlevel;
 
        old_vlevel = rmr_vlog_init();                   // initialise and get the current level
        int             old_vlevel;
 
        old_vlevel = rmr_vlog_init();                   // initialise and get the current level
-       rmr_set_vlevel( RMR_VL_INFO );          // we WILL announce our version etc
 
        if( ! announced ) {
 
        if( ! announced ) {
-               rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/g mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
-                       RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
+               rmr_set_vlevel( RMR_VL_INFO );          // we WILL announce our version
+               rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95 p=%s mv=%d flg=%02x id=a (%s %s.%s.%s built: %s)\n",
+                       uproto_port, RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
                announced = 1;
                announced = 1;
+
+               rmr_set_vlevel( old_vlevel );           // return logging to the desired state
+               uta_dump_env();                                                 // spit out environment settings meaningful to us if in info mode
        }
        }
-       rmr_set_vlevel( old_vlevel );           // return logging to the desired state
 
        errno = 0;
        if( uproto_port == NULL ) {
                proto_port = strdup( DEF_COMM_PORT );
 
        errno = 0;
        if( uproto_port == NULL ) {
                proto_port = strdup( DEF_COMM_PORT );
+               rmr_vlog( RMR_VL_WARN, "user passed nil as the listen port, using default: %s\n", proto_port );
        } else {
                proto_port = strdup( uproto_port );             // so we can modify it
        }
 
        } else {
                proto_port = strdup( uproto_port );             // so we can modify it
        }
 
+       if ( proto_port == NULL ){
+               return init_err( "unable to alloc proto port string", NULL, NULL, ENOMEM );
+       }
+
        if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
        if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
-               errno = ENOMEM;
-               return NULL;
+               return init_err( "unable to allocate context", ctx, proto_port, ENOMEM );
        }
        memset( ctx, 0, sizeof( uta_ctx_t ) );
 
        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" );
        }
        memset( ctx, 0, sizeof( uta_ctx_t ) );
 
        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" );
-       ctx->nrivers = 256;                                                             // number of input flows we'll manage
+       ctx->snarf_rt_fd = -1;
+       ctx->nrivers = MAX_RIVERS;                                              // the array allows for fast index mapping for fd values < max
        ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
        ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
+       ctx->river_hash = rmr_sym_alloc( 129 );                         // connections with fd values > FD_MAX have to e hashed
        memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers );
        for( i = 0; i < ctx->nrivers; i++ ) {
                ctx->rivers[i].state = RS_NEW;                          // force allocation of accumulator on first received packet
        memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers );
        for( i = 0; i < ctx->nrivers; i++ ) {
                ctx->rivers[i].state = RS_NEW;                          // force allocation of accumulator on first received packet
@@ -584,7 +675,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
 
        ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
        ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
 
        ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
        ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
-       ctx->max_ibm = max_msg_size < 1024 ? 1024 : max_msg_size;                                       // larger than their request doesn't hurt
+       ctx->max_ibm = def_msg_size < 1024 ? 1024 : def_msg_size;                                       // larger than their request doesn't hurt
        ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + TP_HDR_LEN + 64;             // add in header size, transport hdr, and a bit of fudge
 
        ctx->mring = uta_mk_ring( 4096 );                               // message ring is always on for si
        ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + TP_HDR_LEN + 64;             // add in header size, transport hdr, and a bit of fudge
 
        ctx->mring = uta_mk_ring( 4096 );                               // message ring is always on for si
@@ -593,6 +684,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        if( ! (flags & RMRFL_NOLOCK) ) {                                // user did not specifically ask that it be off; turn it on
                uta_ring_config( ctx->mring, RING_RLOCK );                      // concurrent rcv calls require read lock
                uta_ring_config( ctx->zcb_mring, RING_WLOCK );          // concurrent free calls from userland require write lock
        if( ! (flags & RMRFL_NOLOCK) ) {                                // user did not specifically ask that it be off; turn it on
                uta_ring_config( ctx->mring, RING_RLOCK );                      // concurrent rcv calls require read lock
                uta_ring_config( ctx->zcb_mring, RING_WLOCK );          // concurrent free calls from userland require write lock
+               uta_ring_config( ctx->zcb_mring, RING_FRLOCK );         // concurrent message allocatieon calls from userland require read lock, but can be fast
        } else {
                rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" );
        }
        } else {
                rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" );
        }
@@ -601,18 +693,13 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
 
 
        ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
 
 
        ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
-       if( max_msg_size > 0 ) {
-               ctx->max_plen = max_msg_size;
+       if( def_msg_size > 0 ) {
+               ctx->max_plen = def_msg_size;
        }
 
        }
 
-       // we're using a listener to get rtg updates, so we do NOT need this.
-       //uta_lookup_rtg( ctx );                                                        // attempt to fill in rtg info; rtc will handle missing values/errors
-
        ctx->si_ctx = SIinitialise( SI_OPT_FG );                // FIX ME: si needs to streamline and drop fork/bg stuff
        if( ctx->si_ctx == NULL ) {
        ctx->si_ctx = SIinitialise( SI_OPT_FG );                // FIX ME: si needs to streamline and drop fork/bg stuff
        if( ctx->si_ctx == NULL ) {
-               rmr_vlog( RMR_VL_CRIT, "unable to initialise SI95 interface\n" );
-               free_ctx( ctx );
-               return NULL;
+               return init_err( "unable to initialise SI95 interface\n", ctx, proto_port, 0 );
        }
 
        if( (port = strchr( proto_port, ':' )) != NULL ) {
        }
 
        if( (port = strchr( proto_port, ':' )) != NULL ) {
@@ -625,11 +712,10 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        } else {
                port = proto_port;                      // assume something like "1234" was passed
        }
        } else {
                port = proto_port;                      // assume something like "1234" was passed
        }
+       rmr_vlog( RMR_VL_INFO, "listen port = %s\n", port );
 
 
-       if( (tok = getenv( "ENV_RTG_PORT" )) != NULL ) {                                // must check port here -- if < 1 then we just start static file 'listener'
-               if( atoi( tok ) < 1 ) {
-                       static_rtc = 1;
-               }
+       if( (tok = getenv( ENV_RTG_PORT )) != NULL && atoi( tok ) < 0 ) {       // must check here -- if < 0 then we just start static file 'listener'
+               static_rtc = 1;
        }
 
        if( (tok = getenv( ENV_SRC_ID )) != NULL ) {                                                    // env var overrides what we dig from system
        }
 
        if( (tok = getenv( ENV_SRC_ID )) != NULL ) {                                                    // env var overrides what we dig from system
@@ -647,8 +733,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
                free( tok );
        } else {
                if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
                free( tok );
        } else {
                if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
-                       rmr_vlog( RMR_VL_CRIT, "rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
-                       return NULL;
+                       return init_err( "cannot determine localhost name\n", ctx, proto_port, 0 );
                }
                if( (tok = strchr( wbuf, '.' )) != NULL ) {
                        *tok = 0;                                                                       // we don't keep domain portion
                }
                if( (tok = strchr( wbuf, '.' )) != NULL ) {
                        *tok = 0;                                                                       // we don't keep domain portion
@@ -657,8 +742,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
 
        ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
        if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
 
        ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
        if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
-               rmr_vlog( RMR_VL_CRIT, "rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
-               return NULL;
+               return init_err( "hostname + port is too long", ctx, proto_port, EINVAL );
        }
 
        if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
        }
 
        if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
@@ -674,40 +758,56 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
                ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
                if( ctx->my_ip == NULL ) {
                        rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
                ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
                if( ctx->my_ip == NULL ) {
                        rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
-                       strcpy( ctx->my_ip, ctx->my_name );                     // if we cannot suss it out, use the name rather than a nil pointer
+                       ctx->my_ip = strdup( ctx->my_name );            // if we cannot suss it out, use the name rather than a nil pointer
                }
        }
        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " default ip address: %s\n", ctx->my_ip );
 
        if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
                if( *tok == '1' ) {
                }
        }
        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " default ip address: %s\n", ctx->my_ip );
 
        if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
                if( *tok == '1' ) {
-                       ctx->flags |= CTXFL_WARN;                                       // turn on some warnings (not all, just ones that shouldn't impact performance)
+                       ctx->flags |= CFL_WARN;                                 // turn on some warnings (not all, just ones that shouldn't impact performance)
                }
        }
 
 
                }
        }
 
 
-       if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
+       if( (interface = getenv( ENV_BIND_IF )) == NULL ) {             // if specific interface not defined, listen on all
                interface = "0.0.0.0";
        }
 
                interface = "0.0.0.0";
        }
 
-       snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port );           // FIXME -- si only supports 0.0.0.0 by default
+       snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port );
        if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
                rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
        if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
                rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
-               free_ctx( ctx );
-               return NULL;
+               return init_err( NULL, ctx, proto_port, 0 );
        }
 
                                                                                                // finish all flag setting before threads to keep helgrind quiet
        ctx->flags |= CFL_MTC_ENABLED;                          // for SI threaded receiver is the only way
 
        }
 
                                                                                                // finish all flag setting before threads to keep helgrind quiet
        ctx->flags |= CFL_MTC_ENABLED;                          // for SI threaded receiver is the only way
 
-       if( flags & RMRFL_NOTHREAD ) {                          // thread set to off; no route table collector started (could be called by the rtc thread itself)
-               ctx->rtable = rt_clone_space( NULL, NULL, 0 );          // creates an empty route table so that wormholes still can be used
+
+       // ---------------- setup for route table collector before invoking ----------------------------------
+       ctx->rtgate = (pthread_mutex_t *) malloc( sizeof( *ctx->rtgate ) );             // single mutex required to gate access to moving rtables
+       if( ctx->rtgate != NULL ) {
+               pthread_mutex_init( ctx->rtgate, NULL );
+       }
+
+       ctx->ephash = rmr_sym_alloc( 129 );                                     // host:port to ep symtab exists outside of any route table
+       if( ctx->ephash == NULL ) {
+               return init_err( "unable to allocate ep hash\n", ctx, proto_port, ENOMEM );
+       }
+
+       ctx->rtable = rt_clone_space( ctx, NULL, NULL, 0 );     // create an empty route table so that wormhole/rts calls can be used
+       if( flags & RMRFL_NOTHREAD ) {                                          // no thread prevents the collector start for very special cases
+               ctx->rtable_ready = 1;                                                  // route based sends will always fail, but rmr is ready for the non thread case
        } else {
        } else {
+               ctx->rtable_ready = 0;                                                  // no sends until a real route table is loaded in the rtc thread
+
                if( static_rtc ) {
                if( static_rtc ) {
+                       rmr_vlog( RMR_VL_INFO, "rmr_init: file based route table only for context on port %s\n", uproto_port );
                        if( pthread_create( &ctx->rtc_th,  NULL, rtc_file, (void *) ctx ) ) {   // kick the rt collector thread as just file reader
                                rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
                        }
                } else {
                        if( pthread_create( &ctx->rtc_th,  NULL, rtc_file, (void *) ctx ) ) {   // kick the rt collector thread as just file reader
                                rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
                        }
                } else {
+                       rmr_vlog( RMR_VL_INFO, "rmr_init: dynamic route table for context on port %s\n", uproto_port );
                        if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the real rt collector thread
                                rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
                        }
                        if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the real rt collector thread
                                rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
                        }
@@ -738,8 +838,8 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
                without drastically changing anything. The user should invoke with RMRFL_NONE to
                avoid any misbehavour as there are internal flags which are suported
 */
                without drastically changing anything. The user should invoke with RMRFL_NONE to
                avoid any misbehavour as there are internal flags which are suported
 */
-extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
-       return init( uproto_port, max_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
+extern void* rmr_init( char* uproto_port, int def_msg_size, int flags ) {
+       return init( uproto_port, def_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
 }
 
 /*
 }
 
 /*
@@ -773,11 +873,7 @@ extern int rmr_ready( void* vctx ) {
                return FALSE;
        }
 
                return FALSE;
        }
 
-       if( ctx->rtable != NULL ) {
-               return TRUE;
-       }
-
-       return FALSE;
+       return ctx->rtable_ready;
 }
 
 /*
 }
 
 /*
@@ -794,13 +890,6 @@ extern int rmr_get_rcvfd( void* vctx ) {
                return -1;
        }
 
                return -1;
        }
 
-/*
-       if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
-               rmr_vlog( RMR_VL_WARN, "rmr cannot get recv fd: %s\n", nng_strerror( state ) );
-               return -1;
-       }
-*/
-
        return uta_ring_getpfd( ctx->mring );
 }
 
        return uta_ring_getpfd( ctx->mring );
 }
 
@@ -820,6 +909,10 @@ extern void rmr_close( void* vctx ) {
                return;
        }
 
                return;
        }
 
+       if( ctx->seed_rt_fname != NULL ) {
+               free( ctx->seed_rt_fname );
+       }
+
        ctx->shutdown = 1;
 
        SItp_stats( ctx->si_ctx );                      // dump some interesting stats
        ctx->shutdown = 1;
 
        SItp_stats( ctx->si_ctx );                      // dump some interesting stats
@@ -840,7 +933,6 @@ extern void rmr_close( void* vctx ) {
 */
 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
        uta_ctx_t*      ctx;
 */
 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
        uta_ctx_t*      ctx;
-       uta_mhdr_t*     hdr;                    // header in the transport buffer
        chute_t*        chute;
        struct timespec ts;                     // time info if we have a timeout
        long    new_ms;                         // adjusted mu-sec
        chute_t*        chute;
        struct timespec ts;                     // time info if we have a timeout
        long    new_ms;                         // adjusted mu-sec
@@ -864,6 +956,8 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
 
        if( max_wait == 0 ) {                                           // one shot poll; handle wihtout sem check as that is SLOW!
                if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
 
        if( max_wait == 0 ) {                                           // one shot poll; handle wihtout sem check as that is SLOW!
                if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
+                       clock_gettime( CLOCK_REALTIME, &ts );                   // pass current time as expriry time
+                       sem_timedwait( &chute->barrier, &ts );                  // must pop the count (ring is locking so if we got a message we can pop)
                        if( ombuf ) {
                                rmr_free_msg( ombuf );                          // can't reuse, caller's must be trashed now
                        }
                        if( ombuf ) {
                                rmr_free_msg( ombuf );                          // can't reuse, caller's must be trashed now
                        }
@@ -878,6 +972,7 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
                if( mbuf != NULL ) {
                        mbuf->flags |= MFL_ADDSRC;               // turn on so if user app tries to send this buffer we reset src
                }
                if( mbuf != NULL ) {
                        mbuf->flags |= MFL_ADDSRC;               // turn on so if user app tries to send this buffer we reset src
                }
+
                return mbuf;
        }
 
                return mbuf;
        }
 
@@ -939,23 +1034,19 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
        return mbuf;
 }
 
        return mbuf;
 }
 
-/*
-       Accept a message buffer and caller ID, send the message and then wait
-       for the receiver to tickle the semaphore letting us know that a message
-       has been received. The call_id is a value between 2 and 255, inclusive; if
-       it's not in this range an error will be returned. Max wait is the amount
-       of time in millaseconds that the call should block for. If 0 is given
-       then no timeout is set.
 
 
-       If the mt_call feature has not been initialised, then the attempt to use this
-       funciton will fail with RMR_ERR_NOTSUPP
 
 
-       If no matching message is received before the max_wait period expires, a
-       nil pointer is returned, and errno is set to ETIMEOUT. If any other error
-       occurs after the message has been sent, then a nil pointer is returned
-       with errno set to some other value.
+
+/*
+       This is the work horse for the multi-threaded call() function. It supports
+       both the rmr_mt_call() and the rmr_wormhole wh_call() functions. See the description
+       for for rmr_mt_call() modulo the caveat below.
+
+       If endpoint is given, then we assume that we're not doing normal route table
+       routing and that we should send directly to that endpoint (probably worm
+       hole).
 */
 */
-extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
+static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait, endpoint_t* ep ) {
        rmr_mbuf_t* ombuf;                      // original mbuf passed in
        uta_ctx_t*      ctx;
        uta_mhdr_t*     hdr;                    // header in the transport buffer
        rmr_mbuf_t* ombuf;                      // original mbuf passed in
        uta_ctx_t*      ctx;
        uta_mhdr_t*     hdr;                    // header in the transport buffer
@@ -982,12 +1073,6 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
                return mbuf;
        }
 
                return mbuf;
        }
 
-       if( call_id > MAX_CALL_ID || call_id < 2 ) {                                    // 0 and 1 are reserved; user app cannot supply them
-               mbuf->state = RMR_ERR_BADARG;
-               mbuf->tp_state = errno;
-               return mbuf;
-       }
-
        ombuf = mbuf;                                                                                                   // save to return timeout status with
 
        chute = &ctx->chutes[call_id];
        ombuf = mbuf;                                                                                                   // save to return timeout status with
 
        chute = &ctx->chutes[call_id];
@@ -1023,7 +1108,11 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
                seconds = 1;                                                                            // use as flag later to invoked timed wait
        }
 
                seconds = 1;                                                                            // use as flag later to invoked timed wait
        }
 
-       mbuf = mtosend_msg( ctx, mbuf, 0 );                                             // use internal function so as not to strip call-id; should be nil on success!
+       if( ep == NULL ) {                                                                              // normal routing
+               mbuf = mtosend_msg( ctx, mbuf, 0 );                                     // use internal function so as not to strip call-id; should be nil on success!
+       } else {
+               mbuf = send_msg( ctx, mbuf, ep->nn_sock, -1 );
+       }
        if( mbuf ) {
                if( mbuf->state != RMR_OK ) {
                        mbuf->tp_state = errno;
        if( mbuf ) {
                if( mbuf->state != RMR_OK ) {
                        mbuf->tp_state = errno;
@@ -1066,6 +1155,39 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
        return mbuf;
 }
 
        return mbuf;
 }
 
+/*
+       Accept a message buffer and caller ID, send the message and then wait
+       for the receiver to tickle the semaphore letting us know that a message
+       has been received. The call_id is a value between 2 and 255, inclusive; if
+       it's not in this range an error will be returned. Max wait is the amount
+       of time in millaseconds that the call should block for. If 0 is given
+       then no timeout is set.
+
+       If the mt_call feature has not been initialised, then the attempt to use this
+       funciton will fail with RMR_ERR_NOTSUPP
+
+       If no matching message is received before the max_wait period expires, a
+       nil pointer is returned, and errno is set to ETIMEOUT. If any other error
+       occurs after the message has been sent, then a nil pointer is returned
+       with errno set to some other value.
+
+       This is now just an outward facing wrapper so we can support wormhole calls.
+*/
+extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
+
+       // must vet call_id here, all others vetted by workhorse mt_call() function
+       if( call_id > MAX_CALL_ID || call_id < 2 ) {            // 0 and 1 are reserved; user app cannot supply them
+               if( mbuf != NULL ) {
+                       mbuf->state = RMR_ERR_BADARG;
+                       mbuf->tp_state = EINVAL;
+               }
+               return mbuf;
+       }
+
+       return mt_call( vctx, mbuf, call_id, max_wait, NULL );
+}
+
+
 /*
        Given an existing message buffer, reallocate the payload portion to
        be at least new_len bytes.  The message header will remain such that
 /*
        Given an existing message buffer, reallocate the payload portion to
        be at least new_len bytes.  The message header will remain such that