Fix binding to IPv6 interfaces
[ric-plt/lib/rmr.git] / src / rmr / common / src / rt_generic_static.c
index 5a2b1fa..0de69c8 100644 (file)
@@ -1,10 +1,10 @@
-// :vi sw=4 ts=4 noet:
+ // :vi sw=4 ts=4 noet2
 /*
 ==================================================================================
-       Copyright (c) 2019 Nokia
-       Copyright (c) 2018-2019 AT&T Intellectual Property.
+       Copyright (c) 2019-2020 Nokia
+       Copyright (c) 2018-2020 AT&T Intellectual Property.
 
-   Licensed under the Apache License, Version 2.0 (the "License");
+   Licensed under the Apache License, Version 2.0 (the "License") ;
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at
 
 #include <sys/stat.h>
 #include <unistd.h>
 #include <netdb.h>
+#include <pthread.h>
+#include <immintrin.h>
+#include <stdbool.h>
 
+#include <RIC_message_types.h>         // needed for route manager messages
+
+#define ALL 1
+#define SOME 0
 
 /*
        Passed to a symtab foreach callback to construct a list of pointers from
        a current symtab.
 */
 typedef struct thing_list {
+       int error;                                      // if a realloc failed, this will be set
        int nalloc;
        int nused;
        void** things;
@@ -63,6 +71,10 @@ typedef struct thing_list {
 /*
        Dump some stats for an endpoint in the RT. This is generally called to
        verify endpoints after a table load/change.
+
+       This is called by the for-each mechanism of the symtab and the prototype is
+       fixe; we don't really use some of the parms, but have dummy references to
+       keep sonar from complaining.
 */
 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
        int*    counter;
@@ -74,14 +86,19 @@ static void ep_stats( void* st, void* entry, char const* name, void* thing, void
 
        if( (counter = (int *) vcounter) != NULL ) {
                (*counter)++;
+       } else {
+               rmr_vlog( RMR_VL_DEBUG, "ep_stas: nil counter %p %p %p", st, entry, name );     // dummy refs
+               return;
        }
 
-       fprintf( stderr, "[DBUG] RMR rt endpoint: target=%s open=%d\n", ep->name, ep->open );
+       rmr_vlog_force( RMR_VL_DEBUG, "rt endpoint: target=%s open=%d\n", ep->name, ep->open );
 }
 
 /*
        Called to count meid entries in the table. The meid points to an 'owning' endpoint
        so we can list what we find
+
+       See note in ep_stats about dummy refs.
 */
 static void meid_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
        int*    counter;
@@ -93,20 +110,25 @@ static void meid_stats( void* st, void* entry, char const* name, void* thing, vo
 
        if( (counter = (int *) vcounter) != NULL ) {
                (*counter)++;
+       } else {
+               rmr_vlog( RMR_VL_DEBUG, "meid_stas: nil counter %p %p %p", st, entry, name );   // dummy refs
        }
 
-       fprintf( stderr, "[DBUG] RMR meid=%s owner=%s open=%d\n", name, ep->name, ep->open );
+       rmr_vlog_force( RMR_VL_DEBUG, "meid=%s owner=%s open=%d\n", name, ep->name, ep->open );
 }
 
 /*
        Dump counts for an endpoint in the RT. The vid parm is assumed to point to
        the 'source' information and is added to each message.
+
+       See note above about dummy references.
 */
 static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
        endpoint_t* ep;
        char*   id;
 
        if( (ep = (endpoint_t *) thing) == NULL ) {
+               rmr_vlog( RMR_VL_DEBUG, "ep_counts: nil thing %p %p %p", st, entry, name );     // dummy refs
                return;
        }
 
@@ -114,7 +136,7 @@ static void ep_counts( void* st, void* entry, char const* name, void* thing, voi
                id = "missing";
        }
 
-       fprintf( stderr, "[INFO] RMR sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n",
+       rmr_vlog_force( RMR_VL_INFO, "sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n",
                (long long) time( NULL ),
                id,
                ep->name,
@@ -130,11 +152,12 @@ static void ep_counts( void* st, void* entry, char const* name, void* thing, voi
 */
 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
        int*    counter;
-       rtable_ent_t* rte;                      // thing is really an rte
+       rtable_ent_t const* rte;                // thing is really an rte
        int             mtype;
        int             sid;
 
        if( (rte = (rtable_ent_t *) thing) == NULL ) {
+               rmr_vlog( RMR_VL_DEBUG, "rte_stats: nil thing %p %p %p", st, entry, name );     // dummy refs
                return;
        }
 
@@ -145,7 +168,7 @@ static void rte_stats( void* st, void* entry, char const* name, void* thing, voi
        mtype = rte->key & 0xffff;
        sid = (int) (rte->key >> 32);
 
-       fprintf( stderr, "[DBUG] RMR rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
+       rmr_vlog_force( RMR_VL_DEBUG, "rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
 }
 
 /*
@@ -155,26 +178,26 @@ static void  rt_stats( route_table_t* rt ) {
        int* counter;
 
        if( rt == NULL ) {
-               fprintf( stderr, "[DBUG] rtstats: nil table\n" );
+               rmr_vlog_force( RMR_VL_DEBUG, "rtstats: nil table\n" );
                return;
        }
 
        counter = (int *) malloc( sizeof( int ) );
        *counter = 0;
-       fprintf( stderr, "[DBUG] RMR route table stats:\n" );
-       fprintf( stderr, "[DBUG] RMR route table endpoints:\n" );
-       rmr_sym_foreach_class( rt->hash, RT_NAME_SPACE, ep_stats, counter );            // run endpoints (names) in the active table
-       fprintf( stderr, "[DBUG] RMR rtable: %d known endpoints\n", *counter );
+       rmr_vlog_force( RMR_VL_DEBUG, "route table stats:\n" );
+       rmr_vlog_force( RMR_VL_DEBUG, "route table endpoints:\n" );
+       rmr_sym_foreach_class( rt->ephash, RT_NAME_SPACE, ep_stats, counter );          // run endpoints (names) in the active table
+       rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d known endpoints\n", *counter );
 
-       fprintf( stderr, "[DBUG] RMR route table entries:\n" );
+       rmr_vlog_force( RMR_VL_DEBUG, "route table entries:\n" );
        *counter = 0;
        rmr_sym_foreach_class( rt->hash, RT_MT_SPACE, rte_stats, counter );                     // run message type entries
-       fprintf( stderr, "[DBUG] RMR rtable: %d mt entries in table\n", *counter );
+       rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d mt entries in table\n", *counter );
 
-       fprintf( stderr, "[DBUG] RMR route table meid map:\n" );
+       rmr_vlog_force( RMR_VL_DEBUG, "route table meid map:\n" );
        *counter = 0;
        rmr_sym_foreach_class( rt->hash, RT_ME_SPACE, meid_stats, counter );            // run meid space
-       fprintf( stderr, "[DBUG] RMR rtable: %d meids in map\n", *counter );
+       rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d meids in map\n", *counter );
 
        free( counter );
 }
@@ -185,21 +208,181 @@ static void  rt_stats( route_table_t* rt ) {
 */
 static void  rt_epcounts( route_table_t* rt, char* id ) {
        if( rt == NULL ) {
-               fprintf( stderr, "[INFO] RMR endpoint: no counts: empty table\n" );
+               rmr_vlog_force( RMR_VL_INFO, "endpoint: no counts: empty table\n" );
+               return;
+       }
+
+       rmr_sym_foreach_class( rt->ephash, 1, ep_counts, id );          // run endpoints in the active table
+}
+
+
+static void dump_tables( uta_ctx_t *ctx ) {
+       if( ctx->old_rtable != NULL ) {
+               rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
+               rt_stats( ctx->old_rtable );
+       } else {
+               rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
+       }
+       rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
+       rt_stats( ctx->rtable );
+}
+
+// ------------ route manager communication -------------------------------------------------
+/*
+       Send a request for a table update to the route manager. Updates come in
+       async, so send and go.
+
+       pctx is the private context for the thread; ctx is the application context
+       that we need to be able to send the application ID in case rt mgr needs to
+       use it to idenfity us.
+
+       Returns 0 if we were not able to send a request.
+*/
+static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) {
+       rmr_mbuf_t*     smsg;
+       int     state = 0;
+
+       if( ctx->rtg_whid < 0 ) {
+               return state;
+       }
+
+       smsg = rmr_alloc_msg( pctx, 1024 );
+       if( smsg != NULL ) {
+               smsg->mtype = RMRRM_REQ_TABLE;
+               smsg->sub_id = 0;
+               snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, time( NULL ) );
+               rmr_vlog( RMR_VL_INFO, "rmr_rtc: requesting table: (%s) whid=%d\n", smsg->payload, ctx->rtg_whid );
+               smsg->len = strlen( smsg->payload ) + 1;
+
+               smsg = rmr_wh_send_msg( pctx, ctx->rtg_whid, smsg );
+               if( (state = smsg->state) != RMR_OK ) {
+                       rmr_vlog( RMR_VL_INFO, "rmr_rtc: send failed: %d whid=%d\n", smsg->state, ctx->rtg_whid );
+                       rmr_wh_close( ctx, ctx->rtg_whid );                                     // send failed, assume connection lost
+                       ctx->rtg_whid = -1;
+               }
+
+               rmr_free_msg( smsg );
+       }
+
+       return state;
+}
+
+/*
+       Send an ack to the route table manager for a table ID that we are
+       processing.      State is 1 for OK, and 0 for failed. Reason might
+       be populated if we know why there was a failure.
+
+       Context should be the PRIVATE context that we use for messages
+       to route manger and NOT the user's context.
+
+       If a message buffere is passed we use that and use return to sender
+       assuming that this might be a response to a call and that is needed
+       to send back to the proper calling thread. If msg is nil, we allocate
+       and use it.
+*/
+static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int state, char* reason ) {
+       int             use_rts = 1;
+       int             payload_size = 1024;
+
+       if( ctx == NULL || ctx->rtg_whid < 0 ) {
                return;
        }
 
-       rmr_sym_foreach_class( rt->hash, 1, ep_counts, id );            // run endpoints in the active table
+       if( ctx->flags & CFL_NO_RTACK ) {               // don't ack if reading from file etc
+               return;
+       }
+
+       if( smsg != NULL ) {
+               smsg = rmr_realloc_payload( smsg, payload_size, FALSE, FALSE );         // ensure it's large enough to send a response
+       } else {
+               use_rts = 0;
+               smsg = rmr_alloc_msg( ctx, payload_size );
+       }
+
+       if( smsg != NULL ) {
+               smsg->mtype = RMRRM_TABLE_STATE;
+               smsg->sub_id = -1;
+               snprintf( smsg->payload, payload_size-1, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR",
+                       table_id == NULL ? "<id-missing>" : table_id, reason == NULL ? "" : reason );
+
+               smsg->len = strlen( smsg->payload ) + 1;
+
+               rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d table=%s\n", smsg->payload, state, ctx->rtg_whid, table_id );
+               if( use_rts ) {
+                       smsg = rmr_rts_msg( ctx, smsg );
+               } else {
+                       smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
+               }
+               if( (state = smsg->state) != RMR_OK ) {
+                       rmr_vlog( RMR_VL_WARN, "unable to send table state: %d\n", smsg->state );
+                       rmr_wh_close( ctx, ctx->rtg_whid );                                     // send failed, assume connection lost
+                       ctx->rtg_whid = -1;
+               }
+
+               if( ! use_rts ) {
+                       rmr_free_msg( smsg );                   // if not our message we must free the leftovers
+               }
+       }
 }
 
+// ---- alarm generation --------------------------------------------------------------------------
+
+/*
+       Given the user's context (not the thread private context) look to see if the application isn't
+       working fast enough and we're dropping messages. If the drop counter has changed since the last
+       peeked, and we have not raised an alarm, then we will alarm. If the counter hasn't changed, then we
+       set a timer and if the counter still hasn't changed when it expires we will clear the alarm.
+
+       The private context is what we use to send so as not to interfere with the user flow.
+*/
+static void alarm_if_drops( uta_ctx_t* uctx, uta_ctx_t* pctx ) {
+       static  int alarm_raised = 0;
+       static  int ok2clear = 0;                                       // time that we can clear
+       static  int lastd = 0;                                          // the last counter value so we can compute delta
+       static  int prob_id = 0;                                        // problem ID we assume alarm manager handles dups between processes
+
+       rmr_vlog( RMR_VL_DEBUG, "checking for drops... raised=%d 0k2clear=%d lastd=%d probid=%d\n", alarm_raised, ok2clear, lastd, prob_id );
+       if( ! alarm_raised ) {
+               if( uctx->dcount - lastd == 0 ) {                       // not actively dropping, ok to do nothing
+                       return;
+               }
+
+               alarm_raised = 1;
+               uta_alarm( pctx, ALARM_DROPS | ALARM_RAISE, prob_id, "application running slow; RMR is dropping messages" );
+               rmr_vlog( RMR_VL_INFO, "drop alarm raised" );
+       } else {
+               if( uctx->dcount - lastd != 0 ) {                       // still dropping or dropping again; we've alarmed so nothing to do
+                       lastd = uctx->dcount;
+                       ok2clear = 0;                                                   // reset the timer
+                       return;
+               }
+
+               if( ok2clear == 0 ) {                                           // first round where not dropping
+                       ok2clear = time( NULL ) + 60;                   // we'll clear the alarm in 60s
+               } else {
+                       if( time( NULL ) > ok2clear ) {                 // things still stable after expiry
+                               rmr_vlog( RMR_VL_INFO, "drop alarm cleared\n" );
+                               alarm_raised = 0;
+                               uta_alarm( pctx, ALARM_DROPS | ALARM_CLEAR, prob_id, "RMR message dropping has stopped" );
+                               prob_id++;
+                       }
+               }
+       }
+}
+
+// ---- utility -----------------------------------------------------------------------------------
+
+int isspace_with_fence(int c) {
+       _mm_lfence();
+       return isspace( c );
+}
 
-// ------------------------------------------------------------------------------------------------
 /*
        Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
        must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
 */
 static char* clip( char* buf ) {
-       char*   tok;
+       char*   tok=NULL;
 
        while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
                buf++;
@@ -215,7 +398,7 @@ static char* clip( char* buf ) {
                }
        }
 
-       for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
+       for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace_with_fence( *tok ); tok-- ); // trim trailing spaces too
        *(tok+1) = 0;
 
        return buf;
@@ -230,30 +413,107 @@ static char* clip( char* buf ) {
 */
 static char* ensure_nlterm( char* buf ) {
        char*   nb = NULL;
-       int             len = 1;
+       int             len = 0;
 
-       nb = buf;
-       if( buf == NULL || (len = strlen( buf )) < 2 ) {
-               if( (nb = (char *) malloc( sizeof( char ) * 2 )) != NULL ) {
-                       *nb = '\n';
-                       *(nb+1) = 0;
-               }
-       } else {
-               if( buf[len-1] != '\n' ) {
-                       fprintf( stderr, "[WRN] rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
-                       if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
-                               memcpy( nb, buf, len );
-                               *(nb+len) = '\n';                       // insert \n and nil into the two extra bytes we allocated
-                               *(nb+len+1) = 0;
-                       }
+       if( buf != NULL ) {
+               len = strlen( buf );
+       }
 
-                       free( buf );
-               }
+       nb = buf;                                                       // default to returning original as is
+       switch( len ) {
+               case 0:
+                       nb = strdup( "\n" );
+                       break;
+
+               case 1:
+                       if( *buf != '\n' ) {            // not a newline; realloc
+                               rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
+                               nb = strdup( " \n" );
+                               *nb = *buf;
+                               free( buf );
+                       }
+                       break;
+
+               default:
+                       if( buf[len-1] != '\n' ) {              // not newline terminated, realloc
+                               rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
+                               if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
+                                       memcpy( nb, buf, len );
+                                       *(nb+len) = '\n';                       // insert \n and nil into the two extra bytes we allocated
+                                       *(nb+len+1) = 0;
+                                       free( buf );
+                               }
+                       }
+                       break;
        }
 
        return nb;
 }
 
+/*
+       Roll the new table into the active and the active into the old table. We
+       must have the lock on the active table to do this. It's possible that there
+       is no active table (first load), so we have to account for that (no locking).
+*/
+static void roll_tables( uta_ctx_t* ctx ) {
+
+       if( ctx->new_rtable == NULL || ctx->new_rtable->error ) {
+               rmr_vlog( RMR_VL_WARN, "new route table NOT rolled in: nil pointer or error indicated\n" );
+               ctx->old_rtable = ctx->new_rtable;
+               ctx->new_rtable = NULL;
+               return;
+       }
+
+       if( ctx->rtable != NULL ) {                                                     // initially there isn't one, so must check!
+               pthread_mutex_lock( ctx->rtgate );                              // must hold lock to move to active
+               ctx->old_rtable = ctx->rtable;                                  // currently active becomes old and allowed to 'drain'
+               ctx->rtable = ctx->new_rtable;                                  // one we've been adding to becomes active
+               pthread_mutex_unlock( ctx->rtgate );
+       } else {
+               ctx->old_rtable = NULL;                                         // ensure there isn't an old reference
+               ctx->rtable = ctx->new_rtable;                          // make new the active one
+       }
+
+       ctx->new_rtable = NULL;
+}
+
+/*
+       Given a thing list, extend the array of pointers by 1/2 of the current
+       number allocated. If we cannot realloc an array, then we set the error
+       flag.  Unlikely, but will prevent a crash, AND will prevent us from
+       trying to use the table since we couldn't capture everything.
+*/
+static void extend_things( thing_list_t* tl ) {
+       int old_alloc;
+       void*   old_things;
+       void*   old_names;
+
+       if( tl == NULL ) {
+               return;
+       }
+
+       old_alloc = tl->nalloc;                         // capture current things
+       old_things = tl->things;
+       old_names = tl->names;
+
+       tl->nalloc += tl->nalloc/2;                     // new allocation size
+
+       tl->things = (void **) malloc( sizeof( void * ) * tl->nalloc );                 // allocate larger arrays
+       tl->names = (const char **) malloc( sizeof( char * ) * tl->nalloc );
+
+       if( tl->things == NULL || tl->names == NULL ){                          // if either failed, then set error
+               tl->error = 1;
+               return;
+       }
+
+       memcpy( tl->things, old_things, sizeof( void * ) * old_alloc );
+       memcpy( tl->names, old_names, sizeof( void * ) * old_alloc );
+
+       free( old_things );
+       free( old_names );
+}
+
+// ------------ entry update functions ---------------------------------------------------------------
 /*
        Given a message type create a route table entry and add to the hash keyed on the
        message type.  Once in the hash, endpoints can be added with uta_add_ep. Size
@@ -268,7 +528,7 @@ static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups
        }
 
        if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
-               fprintf( stderr, "[ERR] rmr_add_rte: malloc failed for entry\n" );
+               rmr_vlog( RMR_VL_ERR, "rmr_add_rte: malloc failed for entry\n" );
                return NULL;
        }
        memset( rte, 0, sizeof( *rte ) );
@@ -281,7 +541,6 @@ static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups
 
        if( nrrgroups ) {
                if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
-                       fprintf( stderr, "rmr_add_rte: malloc failed for rrgroup array\n" );
                        free( rte );
                        return NULL;
                }
@@ -298,7 +557,7 @@ static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups
 
        rmr_sym_map( rt->hash, key, rte );                                                      // add to hash using numeric mtype as key
 
-       if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups );
+       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups );
        return rte;
 }
 
@@ -319,46 +578,55 @@ static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups
 */
 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
        rtable_ent_t*   rte;            // route table entry added
-       char*   tok;
+       char const*     tok;
        int             ntoks;
        uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
-       char*   tokens[128];
-       char*   gtokens[64];
-       int             i;
+       char*   tokens[128];
+       char*   gtokens[64];
        int             ngtoks;                         // number of tokens in the group list
        int             grp;                            // index into group list
+       int             cgidx;                          // contiguous group index (prevents the addition of a contiguous group without ep)
+       int             has_ep = FALSE;         // indicates if an endpoint was added in a given round robin group
 
        ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
        rr_field = clip( rr_field );
 
-       if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
+       if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
                (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
                has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
 
                key = build_rt_key( subid, atoi( ts_field ) );
 
-               if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
+               if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
 
                if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) {                                       // split round robin groups
                        if( strcmp( gtokens[0], "%meid" ) == 0 ) {
                                ngtoks = 0;                                                                                                                                     // special indicator that uses meid to find endpoint, no rrobin
                        }
                        rte = uta_add_rte( ctx->new_rtable, key, ngtoks );                                                              // get/create entry for this key
+                       rte->mtype = atoi( ts_field );                                                                                                  // capture mtype for debugging
+
+                       for( grp = 0, cgidx = 0; grp < ngtoks; grp++ ) {
+                               int             i;                                      // avoid sonar grumbling by defining this here
 
-                       for( grp = 0; grp < ngtoks; grp++ ) {
-                               if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) {          // remove any referneces to our ip addrs
+                               if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) {          // remove any references to our ip addrs
                                        for( i = 0; i < ntoks; i++ ) {
                                                if( strcmp( tokens[i], ctx->my_name ) != 0 ) {                                  // don't add if it is us -- cannot send to ourself
-                                                       if( DEBUG > 1  || (vlevel > 1)) fprintf( stderr, "[DBUG] add endpoint  ts=%s %s\n", ts_field, tokens[i] );
-                                                       uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
+                                                       if( DEBUG > 1  || (vlevel > 1)) rmr_vlog_force( RMR_VL_DEBUG, "add endpoint  ts=%s %s\n", ts_field, tokens[i] );
+                                                       uta_add_ep( ctx->new_rtable, rte, tokens[i], cgidx );
+                                                       has_ep = TRUE;
                                                }
                                        }
+                                       if( has_ep ) {
+                                               cgidx++;        // only increment to the next contiguous group if the current one has at least one endpoint
+                                               has_ep = FALSE;
+                                       }
                                }
                        }
                }
        } else {
                if( DEBUG || (vlevel > 2) ) {
-                       fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] );
+                       rmr_vlog_force( RMR_VL_DEBUG, "build entry: ts_entry not of form msg-type,sender: %s\n", ts_field );
                }
        }
 }
@@ -372,10 +640,10 @@ static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* r
 */
 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
        rtable_ent_t*   rte;            // route table entry to be 'deleted'
-       char*   tok;
+       char const*     tok;
        int             ntoks;
        uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
-       char*   tokens[128];
+       char*   tokens[128];
 
        if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
                return;
@@ -383,7 +651,7 @@ static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vle
 
        ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
 
-       if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
+       if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
                (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
                has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
 
@@ -391,20 +659,22 @@ static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vle
                rte = rmr_sym_pull( ctx->new_rtable->hash, key );                       // get it
                if( rte != NULL ) {
                        if( DEBUG || (vlevel > 1) ) {
-                                fprintf( stderr, "[DBUG] delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
+                                rmr_vlog_force( RMR_VL_DEBUG, "delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
                        }
                        rmr_sym_ndel( ctx->new_rtable->hash, key );                     // clear from the new table
                        del_rte( NULL, NULL, NULL, rte, NULL );                         // clean up the memory: reduce ref and free if ref == 0
                } else {
                        if( DEBUG || (vlevel > 1) ) {
-                               fprintf( stderr, "[DBUG] delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
+                               rmr_vlog_force( RMR_VL_DEBUG, "delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
                        }
                }
        } else {
-               if( DEBUG ) fprintf( stderr, "[DBUG] delete rte skipped: %s\n", ts_field );
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "delete rte skipped: %s\n", ts_field );
        }
 }
 
+// -------------------------- parse functions --------------------------------------------------
+
 /*
        Given the tokens from an mme_ar (meid add/replace) entry, add the entries.
        the 'owner' which should be the dns name or IP address of an enpoint
@@ -417,9 +687,9 @@ static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vle
        to send messages to.
 */
 static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, int vlevel ) {
-       char*   tok;
+       char const*     tok;
        int             ntoks;
-       char*   tokens[128];
+       char*   tokens[128];
        int             i;
        int             state;
        endpoint_t*     ep;                                             // endpoint struct for the owner
@@ -431,10 +701,9 @@ static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, in
        for( i = 0; i < ntoks; i++ ) {
                if( (ep = rt_ensure_ep( rtab, owner )) != NULL ) {
                        state = rmr_sym_put( rtab->hash, tokens[i], RT_ME_SPACE, ep );                                          // slam this one in if new; replace if there
-                       if( DEBUG || (vlevel > 1) ) fprintf( stderr, "[DBUG] parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state );
-fprintf( stderr, "[DBUG] parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state );
+                       if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state );
                } else {
-                       fprintf( stderr, "[WRN] rmr parse_meid_ar: unable to create an endpoint for owner: %s", owner );
+                       rmr_vlog( RMR_VL_WARN, "rmr parse_meid_ar: unable to create an endpoint for owner: %s", owner );
                }
        }
 }
@@ -449,9 +718,9 @@ fprintf( stderr, "[DBUG] parse_meid_ar: add/replace meid: %s owned by: %s state=
        This function assumes the caller has vetted the pointers as needed.
 */
 static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) {
-       char*   tok;
+       char const*     tok;
        int             ntoks;
-       char*   tokens[128];
+       char*   tokens[128];
        int             i;
 
        if( rtab->hash == NULL ) {
@@ -463,21 +732,26 @@ static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) {
        ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
        for( i = 0; i < ntoks; i++ ) {
                rmr_sym_del( rtab->hash, tokens[i], RT_ME_SPACE );                                              // and it only took my little finger to blow it away!
-               if( DEBUG || (vlevel > 1) ) fprintf( stderr, "[DBUG] parse_meid_del: meid deleted: %s\n", tokens[i] );
+               if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_del: meid deleted: %s\n", tokens[i] );
        }
 }
 
 /*
        Parse a partially parsed meid record. Tokens[0] should be one of:
                meid_map, mme_ar, mme_del.
+
+       pctx is the private context needed to return an ack/nack using the provided
+       message buffer with the route managers address info.
 */
-static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel ) {
+static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char** tokens, int ntoks, int vlevel ) {
+       char wbuf[1024];
+
        if( tokens == NULL || ntoks < 1 ) {
                return;                                                 // silent but should never happen
        }
 
        if( ntoks < 2 ) {                                       // must have at least two for any valid request record
-               fprintf( stderr, "[ERR] meid_parse: not enough tokens on %s record\n", tokens[0] );
+               rmr_vlog( RMR_VL_ERR, "meid_parse: not enough tokens on %s record\n", tokens[0] );
                return;
        }
 
@@ -485,70 +759,150 @@ static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel )
                tokens[1] = clip( tokens[1] );
                if( *(tokens[1]) == 's' ) {
                        if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
-                               if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] meid map start: dropping incomplete table\n" );
+                               if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid map start: dropping incomplete table\n" );
                                uta_rt_drop( ctx->new_rtable );
+                               ctx->new_rtable = NULL;
+                               send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );        // nack the one that was pending as and never made it
+                       }
+
+                       if( ctx->table_id != NULL ) {
+                               free( ctx->table_id );
+                       }
+                       if( ntoks > 2 ) {
+                               ctx->table_id = strdup( clip( tokens[2] ) );
+                       } else {
+                               ctx->table_id = NULL;
                        }
 
-                       ctx->new_rtable = uta_rt_clone_all( ctx->rtable );              // start with a clone of everything (mtype, endpoint refs and meid)
+                       ctx->new_rtable = prep_new_rt( ctx, ALL );                                      // start with a clone of everything (mtype, endpoint refs and meid)
                        ctx->new_rtable->mupdates = 0;
-                       if( DEBUG || (vlevel > 1)  ) fprintf( stderr, "[DBUG] meid_parse: meid map start found\n" );
+
+                       if( DEBUG || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "meid_parse: meid map start found\n" );
                } else {
                        if( strcmp( tokens[1], "end" ) == 0 ) {                                                         // wrap up the table we were building
                                if( ntoks > 2 ) {                                                                                               // meid_map | end | <count> |??? given
                                        if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) {          // count they added didn't match what we received
-                                               fprintf( stderr, "[ERR] meid_parse: meid map update had wrong number of records: received %d expected %s\n", ctx->new_rtable->mupdates, tokens[2] );
+                                               rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n",
+                                                               ctx->new_rtable->mupdates, tokens[2] );
+                                               snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
+                                               send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
                                                uta_rt_drop( ctx->new_rtable );
                                                ctx->new_rtable = NULL;
                                                return;
                                        }
 
-                                       if( DEBUG ) fprintf( stderr, "[DBUG] meid_parse: meid map update ended; found expected number of entries: %s\n", tokens[2] );
+                                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid_parse: meid map update ended; found expected number of entries: %s\n", tokens[2] );
                                }
 
                                if( ctx->new_rtable ) {
-                                       uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
-                                       ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
-                                       ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
-                                       ctx->new_rtable = NULL;
-                                       if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of meid map noticed\n" );
+                                       roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
+                                       if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of meid map noticed\n" );
+                                       send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
 
                                        if( vlevel > 0 ) {
-                                               fprintf( stderr, "[DBUG] old route table:\n" );
-                                               rt_stats( ctx->old_rtable );
-                                               fprintf( stderr, "[DBUG] new route table:\n" );
+                                               if( ctx->old_rtable != NULL ) {
+                                                       rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
+                                                       rt_stats( ctx->old_rtable );
+                                               } else {
+                                                       rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
+                                               }
+                                               rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
                                                rt_stats( ctx->rtable );
                                        }
                                } else {
-                                       if( DEBUG ) fprintf( stderr, "[DBUG] end of meid map noticed, but one was not started!\n" );
+                                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "end of meid map noticed, but one was not started!\n" );
                                        ctx->new_rtable = NULL;
                                }
                        }
                }
 
                return;
-       }       
+       }
 
-       if( ! ctx->new_rtable ) {                       // for any other mmap entries, there must be a table in progress or we punt
-               if( DEBUG ) fprintf( stderr, "[DBUG] meid update/delte (%s) encountered, but table update not started\n", tokens[0] );
+       if( ! ctx->new_rtable ) {                       // for any other mmap entries, there must be a table in progress or we punt
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid update/delte (%s) encountered, but table update not started\n", tokens[0] );
                return;
        }
 
        if( strcmp( tokens[0], "mme_ar" ) == 0 ) {
                if( ntoks < 3  || tokens[1] == NULL || tokens[2] == NULL ) {
-                       fprintf( stderr, "[ERR] meid_parse: mme_ar record didn't have enough tokens found %d\n", ntoks );
+                       rmr_vlog( RMR_VL_ERR, "meid_parse: mme_ar record didn't have enough tokens found %d\n", ntoks );
                        return;
                }
                parse_meid_ar( ctx->new_rtable,  tokens[1], tokens[2], vlevel );
                ctx->new_rtable->mupdates++;
+               return;
        }
 
-       if( strcmp( tokens[0], "mme_del" ) == 0 ) {
-               if( ntoks < 2 ) {
-                       fprintf( stderr, "[ERR] meid_parse: mme_del record didn't have enough tokens\n" );
-                       return;
-               }
+       if( strcmp( tokens[0], "mme_del" ) == 0 ) {                                             // ntoks < 2 already validated
                parse_meid_del( ctx->new_rtable,  tokens[1], vlevel );
                ctx->new_rtable->mupdates++;
+               return;
+       }
+}
+
+/*
+       This will close the current table snarf file (in *.inc) and open a new one.
+       The curent one is renamed. The final file name is determined by the setting of
+       RMR_SNARF_RT, and if not set then the variable RMR_SEED_RT is used and given
+       an additional extension of .snarf.  If neither seed or snarf environment vars are
+       set then this does nothing.
+
+       If this is called before the tmp snarf file is opened, then this just opens the file.
+*/
+static void cycle_snarfed_rt( uta_ctx_t* ctx ) {
+       static int              ok2warn = 0;    // some warnings squelched on first call
+
+       char*   seed_fname;                             // the filename from env
+       char    tfname[512];                    // temp fname
+       char    wfname[512];                    // working buffer for filename
+       char*   snarf_fname = NULL;             // prevent overlay of the static table if snarf_rt not given
+
+       if( ctx == NULL ) {
+               return;
+       }
+
+       if( (snarf_fname = getenv(  ENV_STASH_RT )) == NULL ) {                         // specific place to stash the rt not given
+               if( (seed_fname = getenv( ENV_SEED_RT )) != NULL ) {                    // no seed, we leave in the default file
+                       memset( wfname, 0, sizeof( wfname ) );
+                       snprintf( wfname, sizeof( wfname ) - 1, "%s.stash", seed_fname );
+                       snarf_fname = wfname;
+               }
+       }
+
+       if( snarf_fname == NULL ) {
+               rmr_vlog( RMR_VL_DEBUG, "cycle_snarf: no file to save in" );
+               return;
+       }
+
+       memset( tfname, 0, sizeof( tfname ) );
+       snprintf( tfname, sizeof( tfname ) -1, "%s.inc", snarf_fname );         // must ensure tmp file is moveable
+
+       if( ctx->snarf_rt_fd >= 0 ) {
+               char* msg= "### captured from route manager\n";
+               write( ctx->snarf_rt_fd, msg, strlen( msg ) );
+               if( close( ctx->snarf_rt_fd ) < 0 ) {
+                       rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to close working rt snarf file: %s\n", strerror( errno ) );
+                       return;
+               }
+
+               if( unlink( snarf_fname ) < 0  && ok2warn ) {                                   // first time through this can fail and we ignore it
+                       rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to unlink old static table: %s: %s\n", snarf_fname, strerror( errno ) );
+               }
+
+               if( rename( tfname, snarf_fname ) ) {
+                       rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to move new route table to seed aname : %s -> %s: %s\n", tfname, snarf_fname, strerror( errno ) );
+               } else {
+                       rmr_vlog( RMR_VL_INFO, "latest route table info saved in: %s\n", snarf_fname );
+               }
+       }
+       ok2warn = 1;
+
+       ctx->snarf_rt_fd = open( tfname, O_WRONLY | O_CREAT | O_TRUNC, 0660 );
+       if( ctx->snarf_rt_fd < 0 ) {
+               rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to open trt file: %s: %s\n", tfname, strerror( errno ) );
+       } else {
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: rt snarf file opened: %s\n", tfname );
        }
 }
 
@@ -574,38 +928,57 @@ static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel )
 
 
        For a RT update, we expect:
-               newrt|{start|end [hold]}
+               newrt | start | <table-id>
+               newrt | end | <count>
                rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
                mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
                mse| <mtype>[,sender] | <sub-id> | %meid
 
 
        For a meid map update we expect:
-               meid_map | start
+               meid_map | start | <table-id>
                meid_map | end | <count> | <md5-hash>
                mme_ar | <e2term-id> | <meid0> <meid1>...<meidn>
                mme_del | <meid0> <meid1>...<meidn>
 
+
+       The pctx is our private context that must be used to send acks/status
+       messages back to the route manager.  The regular ctx is the ctx that
+       the user has been given and thus that's where we have to hang the route
+       table we're working with.
+
+       If mbuf is given, and we need to ack, then we ack using the mbuf and a
+       return to sender call (allows route manager to use wh_call() to send
+       an update and rts is required to get that back to the right thread).
+       If mbuf is nil, then one will be allocated (in ack) and a normal wh_send
+       will be used.
 */
-static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
+static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf ) {
        int i;
        int ntoks;                                                      // number of tokens found in something
        int ngtoks;
        int     grp;                                                    // group number
-       rtable_ent_t*   rte;                            // route table entry added
+       rtable_ent_t const*     rte;                    // route table entry added
        char*   tokens[128];
-       char*   tok;                                            // pointer into a token or string
+       char*   tok=NULL;                                               // pointer into a token or string
+       char    wbuf[1024];
 
        if( ! buf ) {
                return;
        }
 
+       if( ctx && ctx->snarf_rt_fd  >= 0 ) {                                                           // if snarfing table as it arrives, write this puppy
+               write( ctx->snarf_rt_fd, buf, strlen( buf ) );
+               write( ctx->snarf_rt_fd, "\n", 1 );
+       }
+
        while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
                buf++;
        }
-       for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
+       for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace_with_fence( *tok ); tok-- ); // trim trailing spaces too
        *(tok+1) = 0;
 
+       memset( tokens, 0, sizeof( tokens ) );
        if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
                tokens[0] = clip( tokens[0] );
                switch( *(tokens[0]) ) {
@@ -620,7 +993,7 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
                                }
 
                                if( ntoks < 3 ) {
-                                       if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
+                                       if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
                                        break;
                                }
 
@@ -631,50 +1004,76 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
                        case 'n':                                                                                               // newrt|{start|end}
                                tokens[1] = clip( tokens[1] );
                                if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
-                                       if( ctx->new_rtable ) {
-                                               uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
-                                               ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
-                                               ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
-                                               ctx->new_rtable = NULL;
-                                               if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of route table noticed\n" );
+                                       if( ctx && ctx->snarf_rt_fd >= 0 ) {
+                                               cycle_snarfed_rt( ctx );                                        // make it available and open a new one
+                                       }
 
-                                               if( vlevel > 0 ) {
-                                                       fprintf( stderr, "[DBUG] old route table:\n" );
-                                                       rt_stats( ctx->old_rtable );
-                                                       fprintf( stderr, "[DBUG] new route table:\n" );
-                                                       rt_stats( ctx->rtable );
+                                       if( ntoks >2 ) {
+                                               if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
+                                                       rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
+                                                               ctx->new_rtable->updates, tokens[2] );
+                                                       snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
+                                                       send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
+                                                       uta_rt_drop( ctx->new_rtable );
+                                                       ctx->new_rtable = NULL;
+                                                       break;
                                                }
+                                       }
+
+                                       if( ctx->new_rtable ) {
+                                               roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
+                                               if( DEBUG > 1 || (vlevel > 1) ) {
+                                                       rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
+                                                       dump_tables( ctx );
+                                               }
+
+                                               send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
+                                               ctx->rtable_ready = 1;                                                  // route based sends can now happen
+                                               ctx->flags |= CFL_FULLRT;                                               // indicate we have seen a complete route table
                                        } else {
-                                               if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of route table noticed, but one was not started!\n" );
+                                               if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" );
                                                ctx->new_rtable = NULL;
                                        }
-                               } else {                                                                                        // start a new table.
-                                       if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
-                                               if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
+                               } else {                                                                                                                        // start a new table.
+                                       if( ctx->new_rtable != NULL ) {                                                                 // one in progress?  this forces it out
+                                               send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );                        // nack the one that was pending as end never made it
+
+                                               if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
                                                uta_rt_drop( ctx->new_rtable );
+                                               ctx->new_rtable = NULL;
                                        }
 
-                                       ctx->new_rtable = NULL;
-                                       ctx->new_rtable = uta_rt_clone( ctx->rtable );  // create by cloning endpoint and meidtentries from active table
-                                       if( DEBUG > 1 || (vlevel > 1)  ) fprintf( stderr, "[DBUG] start of route table noticed\n" );
+                                       if( ctx->table_id != NULL ) {
+                                               free( ctx->table_id );
+                                       }
+                                       if( ntoks >2 ) {
+                                               ctx->table_id = strdup( clip( tokens[2] ) );
+                                       } else {
+                                               ctx->table_id = NULL;
+                                       }
+
+                                       ctx->new_rtable = prep_new_rt( ctx, SOME );                     // wait for old table to drain and shift it back to new
+                                       ctx->new_rtable->updates = 0;                                           // init count of entries received
+
+                                       if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of route table noticed\n" );
                                }
                                break;
 
-                       case 'm':                                                               // mse entry or one of the meid_ records
+                       case 'm':                                                                       // mse entry or one of the meid_ records
                                if( strcmp( tokens[0], "mse" ) == 0 ) {
                                        if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
                                                break;
                                        }
 
                                        if( ntoks < 4 ) {
-                                               if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
+                                               if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
                                                break;
                                        }
 
                                        build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
                                        ctx->new_rtable->updates++;
                                } else {
-                                       meid_parser( ctx, tokens, ntoks, vlevel );
+                                       meid_parser( ctx, pctx, mbuf, tokens, ntoks, vlevel );
                                }
                                break;
 
@@ -692,16 +1091,25 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
                                break;
 
                        case 'u':                                                                                               // update current table, not a total replacement
+                               if( ! (ctx->flags & CFL_FULLRT) ) {                                     // we cannot update until we have a full table from route generator
+                                       rmr_vlog( RMR_VL_WARN, "route table update ignored: full table not previously recevied" );
+                                       break;
+                               }
+
                                tokens[1] = clip( tokens[1] );
                                if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
                                        if( ctx->new_rtable == NULL ) {                                 // update table not in progress
                                                break;
                                        }
+                                       if( ctx->snarf_rt_fd >= 0 ) {
+                                               cycle_snarfed_rt( ctx );                                        // make it available and open a new one
+                                       }
 
                                        if( ntoks >2 ) {
                                                if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
-                                                       fprintf( stderr, "[ERR] rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
+                                                       rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
                                                                ctx->new_rtable->updates, tokens[2] );
+                                                       send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
                                                        uta_rt_drop( ctx->new_rtable );
                                                        ctx->new_rtable = NULL;
                                                        break;
@@ -709,36 +1117,42 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
                                        }
 
                                        if( ctx->new_rtable ) {
-                                               uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
-                                               ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
-                                               ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
-                                               ctx->new_rtable = NULL;
-                                               if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of rt update noticed\n" );
-
-                                               if( vlevel > 0 ) {
-                                                       fprintf( stderr, "[DBUG] old route table:\n" );
-                                                       rt_stats( ctx->old_rtable );
-                                                       fprintf( stderr, "[DBUG] updated route table:\n" );
-                                                       rt_stats( ctx->rtable );
+                                               roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
+                                               if( DEBUG > 1 || (vlevel > 1) )  {
+                                                       rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
+                                                       dump_tables( ctx );
                                                }
+
+                                               send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
+                                               ctx->rtable_ready = 1;                                                  // route based sends can now happen
                                        } else {
-                                               if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of rt update noticed, but one was not started!\n" );
+                                               if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed, but one was not started!\n" );
                                                ctx->new_rtable = NULL;
                                        }
                                } else {                                                                                        // start a new table.
                                        if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
-                                               if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
+                                               if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
+                                               send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );                        // nack the one that was pending as end never made it
                                                uta_rt_drop( ctx->new_rtable );
+                                               ctx->new_rtable = NULL;
                                        }
 
-                                       ctx->new_rtable = uta_rt_clone_all( ctx->rtable );      // start with a clone of everything (endpts and entries)
-                                       ctx->new_rtable->updates = 0;                                           // init count of updates received
-                                       if( DEBUG > 1 || (vlevel > 1)  ) fprintf( stderr, "[DBUG] start of rt update noticed\n" );
+                                       if( ntoks > 2 ) {
+                                               if( ctx->table_id != NULL ) {
+                                                       free( ctx->table_id );
+                                               }
+                                               ctx->table_id = strdup( clip( tokens[2] ) );
+                                       }
+
+                                       ctx->new_rtable = prep_new_rt( ctx, ALL );                              // start with a copy of everything in the live table
+                                       ctx->new_rtable->updates = 0;                                                   // init count of updates received
+
+                                       if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of rt update noticed\n" );
                                }
                                break;
 
                        default:
-                               if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: unrecognised request: %s\n", tokens[0] );
+                               if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: unrecognised request: %s\n", tokens[0] );
                                break;
                }
        }
@@ -761,42 +1175,50 @@ static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
        char*   eor;                            // end of the record
        int             rcount = 0;                     // record count for debug
 
-       if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
-               return;
+       if( (fname = ctx->seed_rt_fname) == NULL ) {
+               if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
+                       return;
+               }
+
+               ctx->seed_rt_fname = strdup( fname );
+               fname = ctx->seed_rt_fname;
        }
 
        if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) {                     // read file into a single buffer (nil terminated string)
-               fprintf( stderr, "[WRN] rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
+               rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
                return;
        }
 
-       if( DEBUG ) fprintf( stderr, "[DBUG] rmr: seed route table successfully opened: %s\n", fname );
+       if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "seed route table successfully opened: %s\n", fname );
        for( eor = fbuf; *eor; eor++ ) {                                        // fix broken systems that use \r or \r\n to terminate records
                if( *eor == '\r' ) {
                        *eor = '\n';                                                            // will look like a blank line which is ok
                }
        }
 
-       for( rec = fbuf; rec && *rec; rec = eor+1 ) {
+       rec = fbuf;
+       while( rec && *rec ) {
                rcount++;
                if( (eor = strchr( rec, '\n' )) != NULL ) {
                        *eor = 0;
                } else {
-                       fprintf( stderr, "[WRN] rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
-                       fprintf( stderr, "[WRN] rmr read_static: seed route table not used: %s\n", fname );
+                       rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
+                       rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table not used: %s\n", fname );
                        free( fbuf );
                        return;
                }
 
-               parse_rt_rec( ctx, rec, vlevel );
+               parse_rt_rec( ctx, NULL, rec, vlevel, NULL );           // no pvt context as we can't ack
+
+               rec = eor+1;
        }
 
-       if( DEBUG ) fprintf( stderr, "[DBUG] rmr_read_static:  seed route table successfully parsed: %d records\n", rcount );
+       if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_read_static:  seed route table successfully parsed: %d records\n", rcount );
        free( fbuf );
 }
 
 /*
-       Callback driven for each named thing in a symtab. We collect the pointers to those
+       Callback driven for each thing in a symtab. We collect the pointers to those
        things for later use (cloning).
 */
 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
@@ -807,11 +1229,16 @@ static void collect_things( void* st, void* entry, char const* name, void* thing
        }
 
        if( thing == NULL ) {
+               rmr_vlog_force( RMR_VL_DEBUG, "collect things given nil thing: %p %p %p\n", st, entry, name );  // dummy ref for sonar
                return;
        }
 
-       tl->names[tl->nused] = name;                    // the name/key
+       tl->names[tl->nused] = name;                    // the name/key (space 0 uses int keys, so name can be nil and that is OK)
        tl->things[tl->nused++] = thing;                // save a reference to the thing
+
+       if( tl->nused >= tl->nalloc ) {
+               extend_things( tl );                            // not enough allocated
+       }
 }
 
 /*
@@ -836,6 +1263,7 @@ static void del_rte( void* st, void* entry, char const* name, void* thing, void*
        int i;
 
        if( (rte = (rtable_ent_t *) thing) == NULL ) {
+               rmr_vlog_force( RMR_VL_DEBUG, "delrte given nil table: %p %p %p\n", st, entry, name );  // dummy ref for sonar
                return;
        }
 
@@ -848,7 +1276,9 @@ static void del_rte( void* st, void* entry, char const* name, void* thing, void*
                for( i = 0; i < rte->nrrgroups; i++ ) {
                        if( rte->rrgroups[i] ) {
                                free( rte->rrgroups[i]->epts );                 // ditch list of endpoint pointers (end points are reused; don't trash them)
+                               free( rte->rrgroups[i] );                               // but must free the rrg itself too
                        }
+
                }
 
                free( rte->rrgroups );
@@ -867,7 +1297,7 @@ static void del_rte( void* st, void* entry, char const* name, void* thing, void*
        an empty buffer, as opposed to a nil, so the caller can generate defaults
        or error if an empty/missing file isn't tolerated.
 */
-static char* uta_fib( char* fname ) {
+static char* uta_fib( char const* fname ) {
        struct stat     stats;
        off_t           fsize = 8192;   // size of the file
        off_t           nread;                  // number of bytes read
@@ -883,7 +1313,7 @@ static char* uta_fib( char* fname ) {
                                fsize = stats.st_size;                                          // stat ok, save the file size
                        }
                } else {
-                       fsize = 8192;                                                           // stat failed, we'll leave the file open and try to read a default max of 8k
+                       fsize = 8192;                                                           // stat failed, we'll leave the file open and try to read a default max of 8k
                }
        }
 
@@ -918,21 +1348,31 @@ static char* uta_fib( char* fname ) {
        return buf;
 }
 
+// --------------------- initialisation/creation ---------------------------------------------
 /*
        Create and initialise a route table; Returns a pointer to the table struct.
 */
-static route_table_t* uta_rt_init( ) {
+static route_table_t* uta_rt_init( uta_ctx_t* ctx ) {
        route_table_t*  rt;
 
+       if( ctx == NULL ) {
+               return NULL;
+       }
        if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
                return NULL;
        }
 
+       memset( rt, 0, sizeof( *rt ) );
+
        if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) {
                free( rt );
                return NULL;
        }
 
+       rt->gate = ctx->rtgate;                                         // single mutex needed for all route tables
+       rt->ephash = ctx->ephash;                                       // all route tables share a common endpoint hash
+       pthread_mutex_init( rt->gate, NULL );
+
        return rt;
 }
 
@@ -942,18 +1382,24 @@ static route_table_t* uta_rt_init( ) {
        Space is the space in the old table to copy. Space 0 uses an integer key and
        references rte structs. All other spaces use a string key and reference endpoints.
 */
-static route_table_t* rt_clone_space( route_table_t* srt, route_table_t* nrt, int space ) {
-       endpoint_t*             ep;             // an endpoint
-       rtable_ent_t*   rte;    // a route table entry
+static route_table_t* rt_clone_space( uta_ctx_t* ctx, route_table_t* srt, route_table_t* nrt, int space ) {
+       endpoint_t*     ep;                     // an endpoint (ignore sonar complaint about const*)
+       rtable_ent_t*   rte;    // a route table entry  (ignore sonar complaint about const*)
        void*   sst;                    // source symtab
        void*   nst;                    // new symtab
        thing_list_t things;    // things from the space to copy
        int             i;
        int             free_on_err = 0;
 
+       if( ctx == NULL ) {
+               return NULL;
+       }
        if( nrt == NULL ) {                             // make a new table if needed
                free_on_err = 1;
-               nrt = uta_rt_init();
+               nrt = uta_rt_init( ctx );
+               if( nrt == NULL ) {
+                       return NULL;
+               }
        }
 
        if( srt == NULL ) {             // source was nil, just give back the new table
@@ -962,24 +1408,49 @@ static route_table_t* rt_clone_space( route_table_t* srt, route_table_t* nrt, in
 
        things.nalloc = 2048;
        things.nused = 0;
+       things.error = 0;
        things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
        things.names = (const char **) malloc( sizeof( char * ) * things.nalloc );
-       if( things.things == NULL ) {
+       if( things.things == NULL || things.names == NULL ){
+               if( things.things != NULL ) {
+                       free( things.things );
+               }
+               if( things.names != NULL ) {
+                       free( things.names );
+               }
+
                if( free_on_err ) {
-                       free( nrt->hash );
+                       rmr_sym_free( nrt->hash );
                        free( nrt );
                        nrt = NULL;
+               } else {
+                       nrt->error = 1;
                }
 
                return nrt;
        }
+       memset( things.things, 0, sizeof( sizeof( void * ) * things.nalloc ) );
+       memset( things.names, 0, sizeof( char * ) * things.nalloc );
 
        sst = srt->hash;                                                                                        // convenience pointers (src symtab)
        nst = nrt->hash;
 
        rmr_sym_foreach_class( sst, space, collect_things, &things );           // collect things from this space
+       if( things.error ) {                            // something happened and capture failed
+               rmr_vlog( RMR_VL_ERR, "unable to clone route table: unable to capture old contents\n" );
+               free( things.things );
+               free( things.names );
+               if( free_on_err ) {
+                       rmr_sym_free( nrt->hash );
+                       free( nrt );
+                       nrt = NULL;
+               } else {
+                       nrt->error = 1;
+               }
+               return nrt;
+       }
 
-       if( DEBUG ) fprintf( stderr, "[DBUG] clone space cloned %d things in space %d\n",  things.nused, space );
+       if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "clone space cloned %d things in space %d\n",  things.nused, space );
        for( i = 0; i < things.nused; i++ ) {
                if( space ) {                                                                                           // string key, epoint reference
                        ep = (endpoint_t *) things.things[i];
@@ -997,62 +1468,114 @@ static route_table_t* rt_clone_space( route_table_t* srt, route_table_t* nrt, in
 }
 
 /*
-       Creates a new route table and then clones the parts of the table which we must keep with each newrt|start.
-       The endpoint and meid entries in the hash must be preserved.
+       Given a destination route table (drt), clone from the source (srt) into it.
+       If drt is nil, alloc a new one. If srt is nil, then nothing is done (except to
+       allocate the drt if that was nil too). If all is true (1), then we will clone both
+       the MT and the ME spaces; otherwise only the ME space is cloned.
 */
-static route_table_t* uta_rt_clone( route_table_t* srt ) {
+static route_table_t* uta_rt_clone( uta_ctx_t* ctx, route_table_t* srt, route_table_t* drt, int all ) {
        endpoint_t*             ep;                             // an endpoint
        rtable_ent_t*   rte;                    // a route table entry
-       route_table_t*  nrt = NULL;             // new route table
        int i;
 
+       if( ctx == NULL ) {
+               return NULL;
+       }
+       if( drt == NULL ) {
+               drt = uta_rt_init( ctx );
+       }
        if( srt == NULL ) {
-               return uta_rt_init();           // no source to clone, just return an empty table
+               return drt;
        }
 
-       nrt = rt_clone_space( srt, nrt, RT_NAME_SPACE );                // allocate a new one, add endpoint refs
-       rt_clone_space( srt, nrt, RT_ME_SPACE );                                // add meid refs to new
+       drt->ephash = ctx->ephash;                                              // all rts reference the same EP symtab
+       rt_clone_space( ctx, srt, drt, RT_ME_SPACE );
+       if( all ) {
+               rt_clone_space( ctx, srt, drt, RT_MT_SPACE );
+       }
 
-       return nrt;
+       return drt;
 }
 
 /*
-       Creates a new route table and then clones  _all_ of the given route table (references 
-       both endpoints AND the route table entries. Needed to support a partial update where 
-       some route table entries will not be deleted if not explicitly in the update and when 
-       we are adding/replacing meid references.
+       Prepares the "new" route table for populating. If the old_rtable is not nil, then
+       we wait for it's use count to reach 0. Then the table is cleared, and moved on the
+       context to be referenced by the new pointer; the old pointer is set to nil.
+
+       If the old table doesn't exist, then a new table is created and the new pointer is
+       set to reference it.
+
+       The ME namespace references endpoints which do not need to be released, therefore we
+       do not need to run that portion of the table to deref like we do for the RTEs.
 */
-static route_table_t* uta_rt_clone_all( route_table_t* srt ) {
-       endpoint_t*             ep;                             // an endpoint
-       rtable_ent_t*   rte;                    // a route table entry
-       route_table_t*  nrt = NULL;             // new route table
-       int i;
+static route_table_t* prep_new_rt( uta_ctx_t* ctx, int all ) {
+       int counter = 0;
+       int ref_count;
+       route_table_t*  rt;
 
-       if( srt == NULL ) {
-               return uta_rt_init();           // no source to clone, just return an empty table
+       if( ctx == NULL ) {
+               return NULL;
        }
 
-       nrt = rt_clone_space( srt, nrt, RT_MT_SPACE );                  // create new, clone all spaces to it
-       rt_clone_space( srt, nrt, RT_NAME_SPACE );
-       rt_clone_space( srt, nrt, RT_ME_SPACE );
+       if( (rt = ctx->old_rtable) != NULL ) {
+               ctx->old_rtable = NULL;
 
-       return nrt;
+               pthread_mutex_lock( ctx->rtgate );
+               ref_count = rt->ref_count;
+               pthread_mutex_unlock( ctx->rtgate );
+
+               while( ref_count > 0 ) {                                // wait for all who are using to stop
+                       if( counter++ > 1000 ) {
+                               rmr_vlog( RMR_VL_WARN, "rt_prep_newrt:  internal mishap, ref count on table seems wedged" );
+                               break;
+                       }
+
+                       usleep( 1000 );                                         // small sleep to yield the processer if that is needed
+
+                       pthread_mutex_lock( ctx->rtgate );
+                       ref_count = rt->ref_count;
+                       pthread_mutex_unlock( ctx->rtgate );
+               }
+
+               if( rt->hash != NULL ) {
+                       rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL );            // deref and drop if needed
+                       rmr_sym_clear( rt->hash );                                                                      // clear all entries from the old table
+               }
+
+               rt->error = 0;                                                                  // table with errors can be here, so endure clear before attempt to load
+       } else {
+               rt = NULL;
+       }
+
+       pthread_mutex_destroy(ctx->rtgate);
+       rt = uta_rt_clone( ctx, ctx->rtable, rt, all );         // also sets the ephash pointer
+       if( rt != NULL ) {                                                                      // very small chance for nil, but not zero, so test
+               rt->ref_count = 0;                                                              // take no chances; ensure it's 0!
+       } else {
+               rmr_vlog( RMR_VL_ERR, "route table clone returned nil; marking dummy table as error\n" );
+               rt = uta_rt_init( ctx );                                                // must hav something, but mark it in error state
+               rt->error = 1;
+       }
+
+       return rt;
 }
 
+
 /*
        Given a name, find the endpoint struct in the provided route table.
 */
 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
 
-       if( rt == NULL || rt->hash == NULL || ep_name == NULL || *ep_name == 0 ) {
+       if( rt == NULL || rt->ephash == NULL || ep_name == NULL || *ep_name == 0 ) {
                return NULL;
        }
 
-       return rmr_sym_get( rt->hash, ep_name, 1 );
+       return rmr_sym_get( rt->ephash, ep_name, 1 );
 }
 
 /*
        Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
+       Does NOT destroy the gate as it's a common gate for ALL route tables.
 */
 static void uta_rt_drop( route_table_t* rt ) {
        if( rt == NULL ) {
@@ -1073,25 +1596,26 @@ static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
        endpoint_t*     ep;
 
        if( !rt || !ep_name || ! *ep_name ) {
-               fprintf( stderr, "[WRN] rmr: rt_ensure:  internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
+               rmr_vlog( RMR_VL_WARN, "rt_ensure:  internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
                errno = EINVAL;
                return NULL;
        }
 
-       if( (ep = uta_get_ep( rt, ep_name )) == NULL ) {                                        // not there yet, make
+       if( (ep = uta_get_ep( rt, ep_name )) == NULL ) {                                        // not there yet, make
                if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
-                       fprintf( stderr, "[WRN] rmr: rt_ensure:  malloc failed for endpoint creation: %s\n", ep_name );
+                       rmr_vlog( RMR_VL_WARN, "rt_ensure:  malloc failed for endpoint creation: %s\n", ep_name );
                        errno = ENOMEM;
                        return NULL;
                }
 
+               ep->notify = 1;                                                         // show notification on first connection failure
                ep->open = 0;                                                           // not connected
                ep->addr = uta_h2ip( ep_name );
                ep->name = strdup( ep_name );
                pthread_mutex_init( &ep->gate, NULL );          // init with default attrs
                memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
 
-               rmr_sym_put( rt->hash, ep_name, 1, ep );
+               rmr_sym_put( rt->ephash, ep_name, 1, ep );
        }
 
        return ep;
@@ -1114,5 +1638,65 @@ static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
        return key;
 }
 
+/*
+       Given a route table and meid string, find the owner (if known). Returns a pointer to
+       the endpoint struct or nil.
+*/
+static inline endpoint_t*  get_meid_owner( route_table_t *rt, char const* meid ) {
+       endpoint_t const* ep;           // the ep we found in the hash
+
+       if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) {
+               return NULL;
+       }
+
+       return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE );
+}
+
+/*
+       This returns a pointer to the currently active route table and ups
+       the reference count so that the route table is not freed while it
+       is being used. The caller MUST call release_rt() when finished
+       with the pointer.
+
+       Care must be taken: the ctx->rtable pointer _could_ change during the time
+       between the release of the lock and the return. Therefore we MUST grab
+       the current pointer when we have the lock so that if it does we don't
+       return a pointer to the wrong table.
+
+       This will return NULL if there is no active table.
+*/
+static inline route_table_t* get_rt( uta_ctx_t* ctx ) {
+       route_table_t*  rrt;                    // return value
+
+       if( ctx == NULL || ctx->rtable == NULL ) {
+               return NULL;
+       }
+
+       pthread_mutex_lock( ctx->rtgate );                              // must hold lock to bump use
+       rrt = ctx->rtable;                                                              // must stash the pointer while we hold lock
+       rrt->ref_count++;
+       pthread_mutex_unlock( ctx->rtgate );
+
+       return rrt;                                                                             // pointer we upped the count with
+}
+
+/*
+       This will "release" the route table by reducing the use counter
+       in the table. The table may not be freed until the counter reaches
+       0, so it's imparative that the pointer be "released" when it is
+       fetched by get_rt().  Once the caller has released the table it
+       may not safely use the pointer that it had.
+*/
+static inline void release_rt( uta_ctx_t* ctx, route_table_t* rt ) {
+       if( ctx == NULL || rt == NULL ) {
+               return;
+       }
+
+       pthread_mutex_lock( ctx->rtgate );                              // must hold lock
+       if( rt->ref_count > 0 ) {                                               // something smells if it's already 0, don't do antyhing if it is
+               rt->ref_count--;
+       }
+       pthread_mutex_unlock( ctx->rtgate );
+}
 
 #endif