Fix RMR routing statistic data printout crash
[ric-plt/lib/rmr.git] / src / rmr / common / src / rt_generic_static.c
index 3fc7ded..9a8d1fd 100644 (file)
@@ -1,10 +1,10 @@
-// :vi sw=4 ts=4 noet:
+ // :vi sw=4 ts=4 noet2
 /*
 ==================================================================================
-       Copyright (c) 2019 Nokia
-       Copyright (c) 2018-2019 AT&T Intellectual Property.
+       Copyright (c) 2019-2020 Nokia
+       Copyright (c) 2018-2020 AT&T Intellectual Property.
 
-   Licensed under the Apache License, Version 2.0 (the "License");
+   Licensed under the Apache License, Version 2.0 (the "License") ;
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at
 
 #include <sys/stat.h>
 #include <unistd.h>
 #include <netdb.h>
+#include <pthread.h>
 
 #include <RIC_message_types.h>         // needed for route manager messages
 
+#define ALL 1
+#define SOME 0
 
 /*
        Passed to a symtab foreach callback to construct a list of pointers from
        a current symtab.
 */
 typedef struct thing_list {
+       int error;                                      // if a realloc failed, this will be set
        int nalloc;
        int nused;
        void** things;
@@ -65,6 +69,10 @@ typedef struct thing_list {
 /*
        Dump some stats for an endpoint in the RT. This is generally called to
        verify endpoints after a table load/change.
+
+       This is called by the for-each mechanism of the symtab and the prototype is
+       fixe; we don't really use some of the parms, but have dummy references to
+       keep sonar from complaining.
 */
 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
        int*    counter;
@@ -76,6 +84,9 @@ static void ep_stats( void* st, void* entry, char const* name, void* thing, void
 
        if( (counter = (int *) vcounter) != NULL ) {
                (*counter)++;
+       } else {
+               rmr_vlog( RMR_VL_DEBUG, "ep_stas: nil counter %p %p %p", st, entry, name );     // dummy refs
+               return;
        }
 
        rmr_vlog_force( RMR_VL_DEBUG, "rt endpoint: target=%s open=%d\n", ep->name, ep->open );
@@ -84,6 +95,8 @@ static void ep_stats( void* st, void* entry, char const* name, void* thing, void
 /*
        Called to count meid entries in the table. The meid points to an 'owning' endpoint
        so we can list what we find
+
+       See note in ep_stats about dummy refs.
 */
 static void meid_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
        int*    counter;
@@ -95,6 +108,8 @@ static void meid_stats( void* st, void* entry, char const* name, void* thing, vo
 
        if( (counter = (int *) vcounter) != NULL ) {
                (*counter)++;
+       } else {
+               rmr_vlog( RMR_VL_DEBUG, "meid_stas: nil counter %p %p %p", st, entry, name );   // dummy refs
        }
 
        rmr_vlog_force( RMR_VL_DEBUG, "meid=%s owner=%s open=%d\n", name, ep->name, ep->open );
@@ -103,12 +118,15 @@ static void meid_stats( void* st, void* entry, char const* name, void* thing, vo
 /*
        Dump counts for an endpoint in the RT. The vid parm is assumed to point to
        the 'source' information and is added to each message.
+
+       See note above about dummy references.
 */
 static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
        endpoint_t* ep;
        char*   id;
 
        if( (ep = (endpoint_t *) thing) == NULL ) {
+               rmr_vlog( RMR_VL_DEBUG, "ep_counts: nil thing %p %p %p", st, entry, name );     // dummy refs
                return;
        }
 
@@ -132,11 +150,12 @@ static void ep_counts( void* st, void* entry, char const* name, void* thing, voi
 */
 static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
        int*    counter;
-       rtable_ent_t* rte;                      // thing is really an rte
+       rtable_ent_t const* rte;                // thing is really an rte
        int             mtype;
        int             sid;
 
        if( (rte = (rtable_ent_t *) thing) == NULL ) {
+               rmr_vlog( RMR_VL_DEBUG, "rte_stats: nil thing %p %p %p", st, entry, name );     // dummy refs
                return;
        }
 
@@ -165,7 +184,7 @@ static void  rt_stats( route_table_t* rt ) {
        *counter = 0;
        rmr_vlog_force( RMR_VL_DEBUG, "route table stats:\n" );
        rmr_vlog_force( RMR_VL_DEBUG, "route table endpoints:\n" );
-       rmr_sym_foreach_class( rt->hash, RT_NAME_SPACE, ep_stats, counter );            // run endpoints (names) in the active table
+       rmr_sym_foreach_class( rt->ephash, RT_NAME_SPACE, ep_stats, counter );          // run endpoints (names) in the active table
        rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d known endpoints\n", *counter );
 
        rmr_vlog_force( RMR_VL_DEBUG, "route table entries:\n" );
@@ -191,7 +210,19 @@ static void  rt_epcounts( route_table_t* rt, char* id ) {
                return;
        }
 
-       rmr_sym_foreach_class( rt->hash, 1, ep_counts, id );            // run endpoints in the active table
+       rmr_sym_foreach_class( rt->ephash, 1, ep_counts, id );          // run endpoints in the active table
+}
+
+
+static void dump_tables( uta_ctx_t *ctx ) {
+       if( ctx->old_rtable != NULL ) {
+               rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
+               rt_stats( ctx->old_rtable );
+       } else {
+               rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
+       }
+       rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
+       rt_stats( ctx->rtable );
 }
 
 // ------------ route manager communication -------------------------------------------------
@@ -217,10 +248,10 @@ static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) {
        if( smsg != NULL ) {
                smsg->mtype = RMRRM_REQ_TABLE;
                smsg->sub_id = 0;
-               snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, (long) time( NULL ) );
+               snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, time( NULL ) );
                rmr_vlog( RMR_VL_INFO, "rmr_rtc: requesting table: (%s) whid=%d\n", smsg->payload, ctx->rtg_whid );
                smsg->len = strlen( smsg->payload ) + 1;
-       
+
                smsg = rmr_wh_send_msg( pctx, ctx->rtg_whid, smsg );
                if( (state = smsg->state) != RMR_OK ) {
                        rmr_vlog( RMR_VL_INFO, "rmr_rtc: send failed: %d whid=%d\n", smsg->state, ctx->rtg_whid );
@@ -236,15 +267,21 @@ static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) {
 
 /*
        Send an ack to the route table manager for a table ID that we are
-       processing.      State is 1 for OK, and 0 for failed. Reason might 
+       processing.      State is 1 for OK, and 0 for failed. Reason might
        be populated if we know why there was a failure.
 
        Context should be the PRIVATE context that we use for messages
        to route manger and NOT the user's context.
+
+       If a message buffere is passed we use that and use return to sender
+       assuming that this might be a response to a call and that is needed
+       to send back to the proper calling thread. If msg is nil, we allocate
+       and use it.
 */
-static void send_rt_ack( uta_ctx_t* ctx, char* table_id, int state, char* reason ) {
-       rmr_mbuf_t*     smsg;
-       
+static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int state, char* reason ) {
+       int             use_rts = 1;
+       int             payload_size = 1024;
+
        if( ctx == NULL || ctx->rtg_whid < 0 ) {
                return;
        }
@@ -253,34 +290,91 @@ static void send_rt_ack( uta_ctx_t* ctx, char* table_id, int state, char* reason
                return;
        }
 
-       smsg = rmr_alloc_msg( ctx, 1024 );
+       if( smsg != NULL ) {
+               smsg = rmr_realloc_payload( smsg, payload_size, FALSE, FALSE );         // ensure it's large enough to send a response
+       } else {
+               use_rts = 0;
+               smsg = rmr_alloc_msg( ctx, payload_size );
+       }
+
        if( smsg != NULL ) {
                smsg->mtype = RMRRM_TABLE_STATE;
-               smsg->sub_id = 0;
-               snprintf( smsg->payload, 1024, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR", 
+               smsg->sub_id = -1;
+               snprintf( smsg->payload, payload_size-1, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR",
                        table_id == NULL ? "<id-missing>" : table_id, reason == NULL ? "" : reason );
 
                smsg->len = strlen( smsg->payload ) + 1;
-       
-               rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d\n", smsg->payload, smsg->state, ctx->rtg_whid );
-               smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
+
+               rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d table=%s\n", smsg->payload, state, ctx->rtg_whid, table_id );
+               if( use_rts ) {
+                       smsg = rmr_rts_msg( ctx, smsg );
+               } else {
+                       smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
+               }
                if( (state = smsg->state) != RMR_OK ) {
                        rmr_vlog( RMR_VL_WARN, "unable to send table state: %d\n", smsg->state );
                        rmr_wh_close( ctx, ctx->rtg_whid );                                     // send failed, assume connection lost
                        ctx->rtg_whid = -1;
                }
 
-               rmr_free_msg( smsg );
+               if( ! use_rts ) {
+                       rmr_free_msg( smsg );                   // if not our message we must free the leftovers
+               }
+       }
+}
+
+// ---- alarm generation --------------------------------------------------------------------------
+
+/*
+       Given the user's context (not the thread private context) look to see if the application isn't
+       working fast enough and we're dropping messages. If the drop counter has changed since the last
+       peeked, and we have not raised an alarm, then we will alarm. If the counter hasn't changed, then we
+       set a timer and if the counter still hasn't changed when it expires we will clear the alarm.
+
+       The private context is what we use to send so as not to interfere with the user flow.
+*/
+static void alarm_if_drops( uta_ctx_t* uctx, uta_ctx_t* pctx ) {
+       static  int alarm_raised = 0;
+       static  int ok2clear = 0;                                       // time that we can clear
+       static  int lastd = 0;                                          // the last counter value so we can compute delta
+       static  int prob_id = 0;                                        // problem ID we assume alarm manager handles dups between processes
+
+       rmr_vlog( RMR_VL_DEBUG, "checking for drops... raised=%d 0k2clear=%d lastd=%d probid=%d\n", alarm_raised, ok2clear, lastd, prob_id );
+       if( ! alarm_raised ) {
+               if( uctx->dcount - lastd == 0 ) {                       // not actively dropping, ok to do nothing
+                       return;
+               }
+
+               alarm_raised = 1;
+               uta_alarm( pctx, ALARM_DROPS | ALARM_RAISE, prob_id, "application running slow; RMR is dropping messages" );
+               rmr_vlog( RMR_VL_INFO, "drop alarm raised" );
+       } else {
+               if( uctx->dcount - lastd != 0 ) {                       // still dropping or dropping again; we've alarmed so nothing to do
+                       lastd = uctx->dcount;
+                       ok2clear = 0;                                                   // reset the timer
+                       return;
+               }
+
+               if( ok2clear == 0 ) {                                           // first round where not dropping
+                       ok2clear = time( NULL ) + 60;                   // we'll clear the alarm in 60s
+               } else {
+                       if( time( NULL ) > ok2clear ) {                 // things still stable after expiry
+                               rmr_vlog( RMR_VL_INFO, "drop alarm cleared\n" );
+                               alarm_raised = 0;
+                               uta_alarm( pctx, ALARM_DROPS | ALARM_CLEAR, prob_id, "RMR message dropping has stopped" );
+                               prob_id++;
+                       }
+               }
        }
 }
 
-// ------------------------------------------------------------------------------------------------
+// ---- utility -----------------------------------------------------------------------------------
 /*
        Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
        must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
 */
 static char* clip( char* buf ) {
-       char*   tok;
+       char*   tok;
 
        while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
                buf++;
@@ -311,30 +405,107 @@ static char* clip( char* buf ) {
 */
 static char* ensure_nlterm( char* buf ) {
        char*   nb = NULL;
-       int             len = 1;
+       int             len = 0;
 
-       nb = buf;
-       if( buf == NULL || (len = strlen( buf )) < 2 ) {
-               if( (nb = (char *) malloc( sizeof( char ) * 2 )) != NULL ) {
-                       *nb = '\n';
-                       *(nb+1) = 0;
-               }
-       } else {
-               if( buf[len-1] != '\n' ) {
-                       rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
-                       if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
-                               memcpy( nb, buf, len );
-                               *(nb+len) = '\n';                       // insert \n and nil into the two extra bytes we allocated
-                               *(nb+len+1) = 0;
-                       }
+       if( buf != NULL ) {
+               len = strlen( buf );
+       }
 
-                       free( buf );
-               }
+       nb = buf;                                                       // default to returning original as is
+       switch( len ) {
+               case 0:
+                       nb = strdup( "\n" );
+                       break;
+
+               case 1:
+                       if( *buf != '\n' ) {            // not a newline; realloc
+                               rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
+                               nb = strdup( " \n" );
+                               *nb = *buf;
+                               free( buf );
+                       }
+                       break;
+
+               default:
+                       if( buf[len-1] != '\n' ) {              // not newline terminated, realloc
+                               rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
+                               if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
+                                       memcpy( nb, buf, len );
+                                       *(nb+len) = '\n';                       // insert \n and nil into the two extra bytes we allocated
+                                       *(nb+len+1) = 0;
+                                       free( buf );
+                               }
+                       }
+                       break;
        }
 
        return nb;
 }
 
+/*
+       Roll the new table into the active and the active into the old table. We
+       must have the lock on the active table to do this. It's possible that there
+       is no active table (first load), so we have to account for that (no locking).
+*/
+static void roll_tables( uta_ctx_t* ctx ) {
+
+       if( ctx->new_rtable == NULL || ctx->new_rtable->error ) {
+               rmr_vlog( RMR_VL_WARN, "new route table NOT rolled in: nil pointer or error indicated\n" );
+               ctx->old_rtable = ctx->new_rtable;
+               ctx->new_rtable = NULL;
+               return;
+       }
+
+       if( ctx->rtable != NULL ) {                                                     // initially there isn't one, so must check!
+               pthread_mutex_lock( ctx->rtgate );                              // must hold lock to move to active
+               ctx->old_rtable = ctx->rtable;                                  // currently active becomes old and allowed to 'drain'
+               ctx->rtable = ctx->new_rtable;                                  // one we've been adding to becomes active
+               pthread_mutex_unlock( ctx->rtgate );
+       } else {
+               ctx->old_rtable = NULL;                                         // ensure there isn't an old reference
+               ctx->rtable = ctx->new_rtable;                          // make new the active one
+       }
+
+       ctx->new_rtable = NULL;
+}
+
+/*
+       Given a thing list, extend the array of pointers by 1/2 of the current
+       number allocated. If we cannot realloc an array, then we set the error
+       flag.  Unlikely, but will prevent a crash, AND will prevent us from
+       trying to use the table since we couldn't capture everything.
+*/
+static void extend_things( thing_list_t* tl ) {
+       int old_alloc;
+       void*   old_things;
+       void*   old_names;
+
+       if( tl == NULL ) {
+               return;
+       }
+
+       old_alloc = tl->nalloc;                         // capture current things
+       old_things = tl->things;
+       old_names = tl->names;
+
+       tl->nalloc += tl->nalloc/2;                     // new allocation size
+
+       tl->things = (void **) malloc( sizeof( void * ) * tl->nalloc );                 // allocate larger arrays
+       tl->names = (const char **) malloc( sizeof( char * ) * tl->nalloc );
+
+       if( tl->things == NULL || tl->names == NULL ){                          // if either failed, then set error
+               tl->error = 1;
+               return;
+       }
+
+       memcpy( tl->things, old_things, sizeof( void * ) * old_alloc );
+       memcpy( tl->names, old_names, sizeof( void * ) * old_alloc );
+
+       free( old_things );
+       free( old_names );
+}
+
+// ------------ entry update functions ---------------------------------------------------------------
 /*
        Given a message type create a route table entry and add to the hash keyed on the
        message type.  Once in the hash, endpoints can be added with uta_add_ep. Size
@@ -399,19 +570,20 @@ static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups
 */
 static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
        rtable_ent_t*   rte;            // route table entry added
-       char*   tok;
+       char const*     tok;
        int             ntoks;
        uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
-       char*   tokens[128];
-       char*   gtokens[64];
-       int             i;
+       char*   tokens[128];
+       char*   gtokens[64];
        int             ngtoks;                         // number of tokens in the group list
        int             grp;                            // index into group list
+       int             cgidx;                          // contiguous group index (prevents the addition of a contiguous group without ep)
+       int             has_ep = FALSE;         // indicates if an endpoint was added in a given round robin group
 
        ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
        rr_field = clip( rr_field );
 
-       if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
+       if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
                (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
                has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
 
@@ -426,20 +598,27 @@ static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* r
                        rte = uta_add_rte( ctx->new_rtable, key, ngtoks );                                                              // get/create entry for this key
                        rte->mtype = atoi( ts_field );                                                                                                  // capture mtype for debugging
 
-                       for( grp = 0; grp < ngtoks; grp++ ) {
-                               if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) {          // remove any referneces to our ip addrs
+                       for( grp = 0, cgidx = 0; grp < ngtoks; grp++ ) {
+                               int             i;                                      // avoid sonar grumbling by defining this here
+
+                               if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) {          // remove any references to our ip addrs
                                        for( i = 0; i < ntoks; i++ ) {
                                                if( strcmp( tokens[i], ctx->my_name ) != 0 ) {                                  // don't add if it is us -- cannot send to ourself
                                                        if( DEBUG > 1  || (vlevel > 1)) rmr_vlog_force( RMR_VL_DEBUG, "add endpoint  ts=%s %s\n", ts_field, tokens[i] );
-                                                       uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
+                                                       uta_add_ep( ctx->new_rtable, rte, tokens[i], cgidx );
+                                                       has_ep = TRUE;
                                                }
                                        }
+                                       if( has_ep ) {
+                                               cgidx++;        // only increment to the next contiguous group if the current one has at least one endpoint
+                                               has_ep = FALSE;
+                                       }
                                }
                        }
                }
        } else {
                if( DEBUG || (vlevel > 2) ) {
-                       rmr_vlog_force( RMR_VL_DEBUG, "entry not included, sender not matched: %s\n", tokens[1] );
+                       rmr_vlog_force( RMR_VL_DEBUG, "build entry: ts_entry not of form msg-type,sender: %s\n", ts_field );
                }
        }
 }
@@ -453,10 +632,10 @@ static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* r
 */
 static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
        rtable_ent_t*   rte;            // route table entry to be 'deleted'
-       char*   tok;
+       char const*     tok;
        int             ntoks;
        uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
-       char*   tokens[128];
+       char*   tokens[128];
 
        if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
                return;
@@ -464,7 +643,7 @@ static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vle
 
        ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
 
-       if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
+       if( ((tok = strchr( ts_field, ',' )) == NULL ) ||                                       // no sender names (generic entry for all)
                (uta_has_str( ts_field,  ctx->my_name, ',', 127) >= 0) ||               // our name is in the list
                has_myip( ts_field, ctx->ip_list, ',', 127 ) ) {                                // the list has one of our IP addresses
 
@@ -486,6 +665,8 @@ static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vle
        }
 }
 
+// -------------------------- parse functions --------------------------------------------------
+
 /*
        Given the tokens from an mme_ar (meid add/replace) entry, add the entries.
        the 'owner' which should be the dns name or IP address of an enpoint
@@ -498,9 +679,9 @@ static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vle
        to send messages to.
 */
 static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, int vlevel ) {
-       char*   tok;
+       char const*     tok;
        int             ntoks;
-       char*   tokens[128];
+       char*   tokens[128];
        int             i;
        int             state;
        endpoint_t*     ep;                                             // endpoint struct for the owner
@@ -529,9 +710,9 @@ static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, in
        This function assumes the caller has vetted the pointers as needed.
 */
 static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) {
-       char*   tok;
+       char const*     tok;
        int             ntoks;
-       char*   tokens[128];
+       char*   tokens[128];
        int             i;
 
        if( rtab->hash == NULL ) {
@@ -550,8 +731,13 @@ static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) {
 /*
        Parse a partially parsed meid record. Tokens[0] should be one of:
                meid_map, mme_ar, mme_del.
+
+       pctx is the private context needed to return an ack/nack using the provided
+       message buffer with the route managers address info.
 */
-static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel ) {
+static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char** tokens, int ntoks, int vlevel ) {
+       char wbuf[1024];
+
        if( tokens == NULL || ntoks < 1 ) {
                return;                                                 // silent but should never happen
        }
@@ -567,16 +753,31 @@ static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel )
                        if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
                                if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid map start: dropping incomplete table\n" );
                                uta_rt_drop( ctx->new_rtable );
+                               ctx->new_rtable = NULL;
+                               send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );        // nack the one that was pending as and never made it
+                       }
+
+                       if( ctx->table_id != NULL ) {
+                               free( ctx->table_id );
+                       }
+                       if( ntoks > 2 ) {
+                               ctx->table_id = strdup( clip( tokens[2] ) );
+                       } else {
+                               ctx->table_id = NULL;
                        }
 
-                       ctx->new_rtable = uta_rt_clone_all( ctx->rtable );              // start with a clone of everything (mtype, endpoint refs and meid)
+                       ctx->new_rtable = prep_new_rt( ctx, ALL );                                      // start with a clone of everything (mtype, endpoint refs and meid)
                        ctx->new_rtable->mupdates = 0;
+
                        if( DEBUG || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "meid_parse: meid map start found\n" );
                } else {
                        if( strcmp( tokens[1], "end" ) == 0 ) {                                                         // wrap up the table we were building
                                if( ntoks > 2 ) {                                                                                               // meid_map | end | <count> |??? given
                                        if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) {          // count they added didn't match what we received
-                                               rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n", ctx->new_rtable->mupdates, tokens[2] );
+                                               rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n",
+                                                               ctx->new_rtable->mupdates, tokens[2] );
+                                               snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
+                                               send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
                                                uta_rt_drop( ctx->new_rtable );
                                                ctx->new_rtable = NULL;
                                                return;
@@ -586,15 +787,17 @@ static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel )
                                }
 
                                if( ctx->new_rtable ) {
-                                       uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
-                                       ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
-                                       ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
-                                       ctx->new_rtable = NULL;
+                                       roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
                                        if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of meid map noticed\n" );
+                                       send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
 
                                        if( vlevel > 0 ) {
-                                               rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
-                                               rt_stats( ctx->old_rtable );
+                                               if( ctx->old_rtable != NULL ) {
+                                                       rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
+                                                       rt_stats( ctx->old_rtable );
+                                               } else {
+                                                       rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
+                                               }
                                                rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
                                                rt_stats( ctx->rtable );
                                        }
@@ -606,9 +809,9 @@ static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel )
                }
 
                return;
-       }       
+       }
 
-       if( ! ctx->new_rtable ) {                       // for any other mmap entries, there must be a table in progress or we punt
+       if( ! ctx->new_rtable ) {                       // for any other mmap entries, there must be a table in progress or we punt
                if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid update/delte (%s) encountered, but table update not started\n", tokens[0] );
                return;
        }
@@ -620,15 +823,78 @@ static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel )
                }
                parse_meid_ar( ctx->new_rtable,  tokens[1], tokens[2], vlevel );
                ctx->new_rtable->mupdates++;
+               return;
        }
 
-       if( strcmp( tokens[0], "mme_del" ) == 0 ) {
-               if( ntoks < 2 ) {
-                       rmr_vlog( RMR_VL_ERR, "meid_parse: mme_del record didn't have enough tokens\n" );
-                       return;
-               }
+       if( strcmp( tokens[0], "mme_del" ) == 0 ) {                                             // ntoks < 2 already validated
                parse_meid_del( ctx->new_rtable,  tokens[1], vlevel );
                ctx->new_rtable->mupdates++;
+               return;
+       }
+}
+
+/*
+       This will close the current table snarf file (in *.inc) and open a new one.
+       The curent one is renamed. The final file name is determined by the setting of
+       RMR_SNARF_RT, and if not set then the variable RMR_SEED_RT is used and given
+       an additional extension of .snarf.  If neither seed or snarf environment vars are
+       set then this does nothing.
+
+       If this is called before the tmp snarf file is opened, then this just opens the file.
+*/
+static void cycle_snarfed_rt( uta_ctx_t* ctx ) {
+       static int              ok2warn = 0;    // some warnings squelched on first call
+
+       char*   seed_fname;                             // the filename from env
+       char    tfname[512];                    // temp fname
+       char    wfname[512];                    // working buffer for filename
+       char*   snarf_fname = NULL;             // prevent overlay of the static table if snarf_rt not given
+
+       if( ctx == NULL ) {
+               return;
+       }
+
+       if( (snarf_fname = getenv(  ENV_STASH_RT )) == NULL ) {                         // specific place to stash the rt not given
+               if( (seed_fname = getenv( ENV_SEED_RT )) != NULL ) {                    // no seed, we leave in the default file
+                       memset( wfname, 0, sizeof( wfname ) );
+                       snprintf( wfname, sizeof( wfname ) - 1, "%s.stash", seed_fname );
+                       snarf_fname = wfname;
+               }
+       }
+
+       if( snarf_fname == NULL ) {
+               rmr_vlog( RMR_VL_DEBUG, "cycle_snarf: no file to save in" );
+               return;
+       }
+
+       memset( tfname, 0, sizeof( tfname ) );
+       snprintf( tfname, sizeof( tfname ) -1, "%s.inc", snarf_fname );         // must ensure tmp file is moveable
+
+       if( ctx->snarf_rt_fd >= 0 ) {
+               char* msg= "### captured from route manager\n";
+               write( ctx->snarf_rt_fd, msg, strlen( msg ) );
+               if( close( ctx->snarf_rt_fd ) < 0 ) {
+                       rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to close working rt snarf file: %s\n", strerror( errno ) );
+                       return;
+               }
+
+               if( unlink( snarf_fname ) < 0  && ok2warn ) {                                   // first time through this can fail and we ignore it
+                       rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to unlink old static table: %s: %s\n", snarf_fname, strerror( errno ) );
+               }
+
+               if( rename( tfname, snarf_fname ) ) {
+                       rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to move new route table to seed aname : %s -> %s: %s\n", tfname, snarf_fname, strerror( errno ) );
+               } else {
+                       rmr_vlog( RMR_VL_INFO, "latest route table info saved in: %s\n", snarf_fname );
+               }
+       }
+       ok2warn = 1;
+
+       ctx->snarf_rt_fd = open( tfname, O_WRONLY | O_CREAT | O_TRUNC, 0660 );
+       if( ctx->snarf_rt_fd < 0 ) {
+               rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to open trt file: %s: %s\n", tfname, strerror( errno ) );
+       } else {
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: rt snarf file opened: %s\n", tfname );
        }
 }
 
@@ -672,13 +938,19 @@ static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel )
        messages back to the route manager.  The regular ctx is the ctx that
        the user has been given and thus that's where we have to hang the route
        table we're working with.
+
+       If mbuf is given, and we need to ack, then we ack using the mbuf and a
+       return to sender call (allows route manager to use wh_call() to send
+       an update and rts is required to get that back to the right thread).
+       If mbuf is nil, then one will be allocated (in ack) and a normal wh_send
+       will be used.
 */
-static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vlevel ) {
+static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf ) {
        int i;
        int ntoks;                                                      // number of tokens found in something
        int ngtoks;
        int     grp;                                                    // group number
-       rtable_ent_t*   rte;                            // route table entry added
+       rtable_ent_t const*     rte;                    // route table entry added
        char*   tokens[128];
        char*   tok;                                            // pointer into a token or string
        char    wbuf[1024];
@@ -687,12 +959,18 @@ static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vleve
                return;
        }
 
+       if( ctx && ctx->snarf_rt_fd  >= 0 ) {                                                           // if snarfing table as it arrives, write this puppy
+               write( ctx->snarf_rt_fd, buf, strlen( buf ) );
+               write( ctx->snarf_rt_fd, "\n", 1 );
+       }
+
        while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
                buf++;
        }
        for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- );    // trim trailing spaces too
        *(tok+1) = 0;
 
+       memset( tokens, 0, sizeof( tokens ) );
        if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
                tokens[0] = clip( tokens[0] );
                switch( *(tokens[0]) ) {
@@ -718,12 +996,16 @@ static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vleve
                        case 'n':                                                                                               // newrt|{start|end}
                                tokens[1] = clip( tokens[1] );
                                if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
+                                       if( ctx && ctx->snarf_rt_fd >= 0 ) {
+                                               cycle_snarfed_rt( ctx );                                        // make it available and open a new one
+                                       }
+
                                        if( ntoks >2 ) {
                                                if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
                                                        rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
                                                                ctx->new_rtable->updates, tokens[2] );
                                                        snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
-                                                       send_rt_ack( pctx, ctx->table_id, !RMR_OK, wbuf );
+                                                       send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
                                                        uta_rt_drop( ctx->new_rtable );
                                                        ctx->new_rtable = NULL;
                                                        break;
@@ -731,30 +1013,26 @@ static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vleve
                                        }
 
                                        if( ctx->new_rtable ) {
-                                               uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
-                                               ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
-                                               ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
-                                               ctx->new_rtable = NULL;
-                                               if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
-
-                                               if( vlevel > 0 ) {
-                                                       rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
-                                                       rt_stats( ctx->old_rtable );
-                                                       rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
-                                                       rt_stats( ctx->rtable );
+                                               roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
+                                               if( DEBUG > 1 || (vlevel > 1) ) {
+                                                       rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
+                                                       dump_tables( ctx );
                                                }
 
-                                               send_rt_ack( pctx, ctx->table_id, RMR_OK, NULL );
+                                               send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
+                                               ctx->rtable_ready = 1;                                                  // route based sends can now happen
+                                               ctx->flags |= CFL_FULLRT;                                               // indicate we have seen a complete route table
                                        } else {
                                                if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" );
                                                ctx->new_rtable = NULL;
                                        }
                                } else {                                                                                                                        // start a new table.
                                        if( ctx->new_rtable != NULL ) {                                                                 // one in progress?  this forces it out
-                                               send_rt_ack( pctx, ctx->table_id, !RMR_OK, "table not complete" );                      // nack the one that was pending as end never made it
+                                               send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );                        // nack the one that was pending as end never made it
 
                                                if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
                                                uta_rt_drop( ctx->new_rtable );
+                                               ctx->new_rtable = NULL;
                                        }
 
                                        if( ctx->table_id != NULL ) {
@@ -766,14 +1044,14 @@ static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vleve
                                                ctx->table_id = NULL;
                                        }
 
-                                       ctx->new_rtable = NULL;
-                                       ctx->new_rtable = uta_rt_clone( ctx->rtable );  // create by cloning endpoint and meidtentries from active table
+                                       ctx->new_rtable = prep_new_rt( ctx, SOME );                     // wait for old table to drain and shift it back to new
                                        ctx->new_rtable->updates = 0;                                           // init count of entries received
+
                                        if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of route table noticed\n" );
                                }
                                break;
 
-                       case 'm':                                                               // mse entry or one of the meid_ records
+                       case 'm':                                                                       // mse entry or one of the meid_ records
                                if( strcmp( tokens[0], "mse" ) == 0 ) {
                                        if( ! ctx->new_rtable ) {                       // bad sequence, or malloc issue earlier; ignore siliently
                                                break;
@@ -787,7 +1065,7 @@ static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vleve
                                        build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
                                        ctx->new_rtable->updates++;
                                } else {
-                                       meid_parser( ctx, tokens, ntoks, vlevel );
+                                       meid_parser( ctx, pctx, mbuf, tokens, ntoks, vlevel );
                                }
                                break;
 
@@ -805,16 +1083,25 @@ static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vleve
                                break;
 
                        case 'u':                                                                                               // update current table, not a total replacement
+                               if( ! (ctx->flags & CFL_FULLRT) ) {                                     // we cannot update until we have a full table from route generator
+                                       rmr_vlog( RMR_VL_WARN, "route table update ignored: full table not previously recevied" );
+                                       break;
+                               }
+
                                tokens[1] = clip( tokens[1] );
                                if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
                                        if( ctx->new_rtable == NULL ) {                                 // update table not in progress
                                                break;
                                        }
+                                       if( ctx->snarf_rt_fd >= 0 ) {
+                                               cycle_snarfed_rt( ctx );                                        // make it available and open a new one
+                                       }
 
                                        if( ntoks >2 ) {
                                                if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
                                                        rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
                                                                ctx->new_rtable->updates, tokens[2] );
+                                                       send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
                                                        uta_rt_drop( ctx->new_rtable );
                                                        ctx->new_rtable = NULL;
                                                        break;
@@ -822,18 +1109,14 @@ static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vleve
                                        }
 
                                        if( ctx->new_rtable ) {
-                                               uta_rt_drop( ctx->old_rtable );                         // time to drop one that was previously replaced
-                                               ctx->old_rtable = ctx->rtable;                          // currently active becomes old and allowed to 'drain'
-                                               ctx->rtable = ctx->new_rtable;                          // one we've been adding to becomes active
-                                               ctx->new_rtable = NULL;
-                                               if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
-
-                                               if( vlevel > 0 ) {
-                                                       rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
-                                                       rt_stats( ctx->old_rtable );
-                                                       rmr_vlog_force( RMR_VL_DEBUG, "updated route table:\n" );
-                                                       rt_stats( ctx->rtable );
+                                               roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
+                                               if( DEBUG > 1 || (vlevel > 1) )  {
+                                                       rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
+                                                       dump_tables( ctx );
                                                }
+
+                                               send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
+                                               ctx->rtable_ready = 1;                                                  // route based sends can now happen
                                        } else {
                                                if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed, but one was not started!\n" );
                                                ctx->new_rtable = NULL;
@@ -841,18 +1124,21 @@ static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vleve
                                } else {                                                                                        // start a new table.
                                        if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
                                                if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
+                                               send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );                        // nack the one that was pending as end never made it
                                                uta_rt_drop( ctx->new_rtable );
+                                               ctx->new_rtable = NULL;
                                        }
 
-                                       if( ntoks >2 ) {
+                                       if( ntoks > 2 ) {
                                                if( ctx->table_id != NULL ) {
                                                        free( ctx->table_id );
                                                }
                                                ctx->table_id = strdup( clip( tokens[2] ) );
                                        }
 
-                                       ctx->new_rtable = uta_rt_clone_all( ctx->rtable );      // start with a clone of everything (endpts and entries)
-                                       ctx->new_rtable->updates = 0;                                           // init count of updates received
+                                       ctx->new_rtable = prep_new_rt( ctx, ALL );                              // start with a copy of everything in the live table
+                                       ctx->new_rtable->updates = 0;                                                   // init count of updates received
+
                                        if( DEBUG > 1 || (vlevel > 1)  ) rmr_vlog_force( RMR_VL_DEBUG, "start of rt update noticed\n" );
                                }
                                break;
@@ -881,8 +1167,13 @@ static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
        char*   eor;                            // end of the record
        int             rcount = 0;                     // record count for debug
 
-       if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
-               return;
+       if( (fname = ctx->seed_rt_fname) == NULL ) {
+               if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
+                       return;
+               }
+
+               ctx->seed_rt_fname = strdup( fname );
+               fname = ctx->seed_rt_fname;
        }
 
        if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) {                     // read file into a single buffer (nil terminated string)
@@ -909,7 +1200,7 @@ static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
                        return;
                }
 
-               parse_rt_rec( ctx, NULL, rec, vlevel );                 // no pvt context as we can't ack
+               parse_rt_rec( ctx, NULL, rec, vlevel, NULL );           // no pvt context as we can't ack
 
                rec = eor+1;
        }
@@ -919,7 +1210,7 @@ static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
 }
 
 /*
-       Callback driven for each named thing in a symtab. We collect the pointers to those
+       Callback driven for each thing in a symtab. We collect the pointers to those
        things for later use (cloning).
 */
 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
@@ -930,11 +1221,16 @@ static void collect_things( void* st, void* entry, char const* name, void* thing
        }
 
        if( thing == NULL ) {
+               rmr_vlog_force( RMR_VL_DEBUG, "collect things given nil thing: %p %p %p\n", st, entry, name );  // dummy ref for sonar
                return;
        }
 
-       tl->names[tl->nused] = name;                    // the name/key
+       tl->names[tl->nused] = name;                    // the name/key (space 0 uses int keys, so name can be nil and that is OK)
        tl->things[tl->nused++] = thing;                // save a reference to the thing
+
+       if( tl->nused >= tl->nalloc ) {
+               extend_things( tl );                            // not enough allocated
+       }
 }
 
 /*
@@ -959,6 +1255,7 @@ static void del_rte( void* st, void* entry, char const* name, void* thing, void*
        int i;
 
        if( (rte = (rtable_ent_t *) thing) == NULL ) {
+               rmr_vlog_force( RMR_VL_DEBUG, "delrte given nil table: %p %p %p\n", st, entry, name );  // dummy ref for sonar
                return;
        }
 
@@ -971,7 +1268,9 @@ static void del_rte( void* st, void* entry, char const* name, void* thing, void*
                for( i = 0; i < rte->nrrgroups; i++ ) {
                        if( rte->rrgroups[i] ) {
                                free( rte->rrgroups[i]->epts );                 // ditch list of endpoint pointers (end points are reused; don't trash them)
+                               free( rte->rrgroups[i] );                               // but must free the rrg itself too
                        }
+
                }
 
                free( rte->rrgroups );
@@ -990,7 +1289,7 @@ static void del_rte( void* st, void* entry, char const* name, void* thing, void*
        an empty buffer, as opposed to a nil, so the caller can generate defaults
        or error if an empty/missing file isn't tolerated.
 */
-static char* uta_fib( char* fname ) {
+static char* uta_fib( char const* fname ) {
        struct stat     stats;
        off_t           fsize = 8192;   // size of the file
        off_t           nread;                  // number of bytes read
@@ -1006,7 +1305,7 @@ static char* uta_fib( char* fname ) {
                                fsize = stats.st_size;                                          // stat ok, save the file size
                        }
                } else {
-                       fsize = 8192;                                                           // stat failed, we'll leave the file open and try to read a default max of 8k
+                       fsize = 8192;                                                           // stat failed, we'll leave the file open and try to read a default max of 8k
                }
        }
 
@@ -1041,21 +1340,31 @@ static char* uta_fib( char* fname ) {
        return buf;
 }
 
+// --------------------- initialisation/creation ---------------------------------------------
 /*
        Create and initialise a route table; Returns a pointer to the table struct.
 */
-static route_table_t* uta_rt_init( ) {
+static route_table_t* uta_rt_init( uta_ctx_t* ctx ) {
        route_table_t*  rt;
 
+       if( ctx == NULL ) {
+               return NULL;
+       }
        if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
                return NULL;
        }
 
+       memset( rt, 0, sizeof( *rt ) );
+
        if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) {
                free( rt );
                return NULL;
        }
 
+       rt->gate = ctx->rtgate;                                         // single mutex needed for all route tables
+       rt->ephash = ctx->ephash;                                       // all route tables share a common endpoint hash
+       pthread_mutex_init( rt->gate, NULL );
+
        return rt;
 }
 
@@ -1065,18 +1374,24 @@ static route_table_t* uta_rt_init( ) {
        Space is the space in the old table to copy. Space 0 uses an integer key and
        references rte structs. All other spaces use a string key and reference endpoints.
 */
-static route_table_t* rt_clone_space( route_table_t* srt, route_table_t* nrt, int space ) {
-       endpoint_t*             ep;             // an endpoint
-       rtable_ent_t*   rte;    // a route table entry
+static route_table_t* rt_clone_space( uta_ctx_t* ctx, route_table_t* srt, route_table_t* nrt, int space ) {
+       endpoint_t*     ep;                     // an endpoint (ignore sonar complaint about const*)
+       rtable_ent_t*   rte;    // a route table entry  (ignore sonar complaint about const*)
        void*   sst;                    // source symtab
        void*   nst;                    // new symtab
        thing_list_t things;    // things from the space to copy
        int             i;
        int             free_on_err = 0;
 
+       if( ctx == NULL ) {
+               return NULL;
+       }
        if( nrt == NULL ) {                             // make a new table if needed
                free_on_err = 1;
-               nrt = uta_rt_init();
+               nrt = uta_rt_init( ctx );
+               if( nrt == NULL ) {
+                       return NULL;
+               }
        }
 
        if( srt == NULL ) {             // source was nil, just give back the new table
@@ -1085,22 +1400,47 @@ static route_table_t* rt_clone_space( route_table_t* srt, route_table_t* nrt, in
 
        things.nalloc = 2048;
        things.nused = 0;
+       things.error = 0;
        things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
        things.names = (const char **) malloc( sizeof( char * ) * things.nalloc );
-       if( things.things == NULL ) {
+       if( things.things == NULL || things.names == NULL ){
+               if( things.things != NULL ) {
+                       free( things.things );
+               }
+               if( things.names != NULL ) {
+                       free( things.names );
+               }
+
                if( free_on_err ) {
-                       free( nrt->hash );
+                       rmr_sym_free( nrt->hash );
                        free( nrt );
                        nrt = NULL;
+               } else {
+                       nrt->error = 1;
                }
 
                return nrt;
        }
+       memset( things.things, 0, sizeof( sizeof( void * ) * things.nalloc ) );
+       memset( things.names, 0, sizeof( char * ) * things.nalloc );
 
        sst = srt->hash;                                                                                        // convenience pointers (src symtab)
        nst = nrt->hash;
 
        rmr_sym_foreach_class( sst, space, collect_things, &things );           // collect things from this space
+       if( things.error ) {                            // something happened and capture failed
+               rmr_vlog( RMR_VL_ERR, "unable to clone route table: unable to capture old contents\n" );
+               free( things.things );
+               free( things.names );
+               if( free_on_err ) {
+                       rmr_sym_free( nrt->hash );
+                       free( nrt );
+                       nrt = NULL;
+               } else {
+                       nrt->error = 1;
+               }
+               return nrt;
+       }
 
        if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "clone space cloned %d things in space %d\n",  things.nused, space );
        for( i = 0; i < things.nused; i++ ) {
@@ -1120,62 +1460,113 @@ static route_table_t* rt_clone_space( route_table_t* srt, route_table_t* nrt, in
 }
 
 /*
-       Creates a new route table and then clones the parts of the table which we must keep with each newrt|start.
-       The endpoint and meid entries in the hash must be preserved.
+       Given a destination route table (drt), clone from the source (srt) into it.
+       If drt is nil, alloc a new one. If srt is nil, then nothing is done (except to
+       allocate the drt if that was nil too). If all is true (1), then we will clone both
+       the MT and the ME spaces; otherwise only the ME space is cloned.
 */
-static route_table_t* uta_rt_clone( route_table_t* srt ) {
+static route_table_t* uta_rt_clone( uta_ctx_t* ctx, route_table_t* srt, route_table_t* drt, int all ) {
        endpoint_t*             ep;                             // an endpoint
        rtable_ent_t*   rte;                    // a route table entry
-       route_table_t*  nrt = NULL;             // new route table
        int i;
 
+       if( ctx == NULL ) {
+               return NULL;
+       }
+       if( drt == NULL ) {
+               drt = uta_rt_init( ctx );
+       }
        if( srt == NULL ) {
-               return uta_rt_init();           // no source to clone, just return an empty table
+               return drt;
        }
 
-       nrt = rt_clone_space( srt, nrt, RT_NAME_SPACE );                // allocate a new one, add endpoint refs
-       rt_clone_space( srt, nrt, RT_ME_SPACE );                                // add meid refs to new
+       drt->ephash = ctx->ephash;                                              // all rts reference the same EP symtab
+       rt_clone_space( ctx, srt, drt, RT_ME_SPACE );
+       if( all ) {
+               rt_clone_space( ctx, srt, drt, RT_MT_SPACE );
+       }
 
-       return nrt;
+       return drt;
 }
 
 /*
-       Creates a new route table and then clones  _all_ of the given route table (references 
-       both endpoints AND the route table entries. Needed to support a partial update where 
-       some route table entries will not be deleted if not explicitly in the update and when 
-       we are adding/replacing meid references.
+       Prepares the "new" route table for populating. If the old_rtable is not nil, then
+       we wait for it's use count to reach 0. Then the table is cleared, and moved on the
+       context to be referenced by the new pointer; the old pointer is set to nil.
+
+       If the old table doesn't exist, then a new table is created and the new pointer is
+       set to reference it.
+
+       The ME namespace references endpoints which do not need to be released, therefore we
+       do not need to run that portion of the table to deref like we do for the RTEs.
 */
-static route_table_t* uta_rt_clone_all( route_table_t* srt ) {
-       endpoint_t*             ep;                             // an endpoint
-       rtable_ent_t*   rte;                    // a route table entry
-       route_table_t*  nrt = NULL;             // new route table
-       int i;
+static route_table_t* prep_new_rt( uta_ctx_t* ctx, int all ) {
+       int counter = 0;
+       int ref_count;
+       route_table_t*  rt;
 
-       if( srt == NULL ) {
-               return uta_rt_init();           // no source to clone, just return an empty table
+       if( ctx == NULL ) {
+               return NULL;
        }
 
-       nrt = rt_clone_space( srt, nrt, RT_MT_SPACE );                  // create new, clone all spaces to it
-       rt_clone_space( srt, nrt, RT_NAME_SPACE );
-       rt_clone_space( srt, nrt, RT_ME_SPACE );
+       if( (rt = ctx->old_rtable) != NULL ) {
+               ctx->old_rtable = NULL;
 
-       return nrt;
+               pthread_mutex_lock( ctx->rtgate );
+               ref_count = rt->ref_count;
+               pthread_mutex_unlock( ctx->rtgate );
+
+               while( ref_count > 0 ) {                                // wait for all who are using to stop
+                       if( counter++ > 1000 ) {
+                               rmr_vlog( RMR_VL_WARN, "rt_prep_newrt:  internal mishap, ref count on table seems wedged" );
+                               break;
+                       }
+
+                       usleep( 1000 );                                         // small sleep to yield the processer if that is needed
+
+                       pthread_mutex_lock( ctx->rtgate );
+                       ref_count = rt->ref_count;
+                       pthread_mutex_unlock( ctx->rtgate );
+               }
+
+               if( rt->hash != NULL ) {
+                       rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL );            // deref and drop if needed
+                       rmr_sym_clear( rt->hash );                                                                      // clear all entries from the old table
+               }
+
+               rt->error = 0;                                                                  // table with errors can be here, so endure clear before attempt to load
+       } else {
+               rt = NULL;
+       }
+
+       rt = uta_rt_clone( ctx, ctx->rtable, rt, all );         // also sets the ephash pointer
+       if( rt != NULL ) {                                                                      // very small chance for nil, but not zero, so test
+               rt->ref_count = 0;                                                              // take no chances; ensure it's 0!
+       } else {
+               rmr_vlog( RMR_VL_ERR, "route table clone returned nil; marking dummy table as error\n" );
+               rt = uta_rt_init( ctx );                                                // must hav something, but mark it in error state
+               rt->error = 1;
+       }
+
+       return rt;
 }
 
+
 /*
        Given a name, find the endpoint struct in the provided route table.
 */
 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
 
-       if( rt == NULL || rt->hash == NULL || ep_name == NULL || *ep_name == 0 ) {
+       if( rt == NULL || rt->ephash == NULL || ep_name == NULL || *ep_name == 0 ) {
                return NULL;
        }
 
-       return rmr_sym_get( rt->hash, ep_name, 1 );
+       return rmr_sym_get( rt->ephash, ep_name, 1 );
 }
 
 /*
        Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
+       Does NOT destroy the gate as it's a common gate for ALL route tables.
 */
 static void uta_rt_drop( route_table_t* rt ) {
        if( rt == NULL ) {
@@ -1201,7 +1592,7 @@ static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
                return NULL;
        }
 
-       if( (ep = uta_get_ep( rt, ep_name )) == NULL ) {                                        // not there yet, make
+       if( (ep = uta_get_ep( rt, ep_name )) == NULL ) {                                        // not there yet, make
                if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
                        rmr_vlog( RMR_VL_WARN, "rt_ensure:  malloc failed for endpoint creation: %s\n", ep_name );
                        errno = ENOMEM;
@@ -1215,7 +1606,7 @@ static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
                pthread_mutex_init( &ep->gate, NULL );          // init with default attrs
                memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
 
-               rmr_sym_put( rt->hash, ep_name, 1, ep );
+               rmr_sym_put( rt->ephash, ep_name, 1, ep );
        }
 
        return ep;
@@ -1242,14 +1633,60 @@ static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
        Given a route table and meid string, find the owner (if known). Returns a pointer to
        the endpoint struct or nil.
 */
-static inline endpoint_t*  get_meid_owner( route_table_t *rt, char* meid ) {
-       endpoint_t* ep;         // the ep we found in the hash
+static inline endpoint_t*  get_meid_owner( route_table_t *rt, char const* meid ) {
+       endpoint_t const* ep;           // the ep we found in the hash
 
        if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) {
                return NULL;
        }
 
-       return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE ); 
+       return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE );
+}
+
+/*
+       This returns a pointer to the currently active route table and ups
+       the reference count so that the route table is not freed while it
+       is being used. The caller MUST call release_rt() when finished
+       with the pointer.
+
+       Care must be taken: the ctx->rtable pointer _could_ change during the time
+       between the release of the lock and the return. Therefore we MUST grab
+       the current pointer when we have the lock so that if it does we don't
+       return a pointer to the wrong table.
+
+       This will return NULL if there is no active table.
+*/
+static inline route_table_t* get_rt( uta_ctx_t* ctx ) {
+       route_table_t*  rrt;                    // return value
+
+       if( ctx == NULL || ctx->rtable == NULL ) {
+               return NULL;
+       }
+
+       pthread_mutex_lock( ctx->rtgate );                              // must hold lock to bump use
+       rrt = ctx->rtable;                                                              // must stash the pointer while we hold lock
+       rrt->ref_count++;
+       pthread_mutex_unlock( ctx->rtgate );
+
+       return rrt;                                                                             // pointer we upped the count with
 }
 
+/*
+       This will "release" the route table by reducing the use counter
+       in the table. The table may not be freed until the counter reaches
+       0, so it's imparative that the pointer be "released" when it is
+       fetched by get_rt().  Once the caller has released the table it
+       may not safely use the pointer that it had.
+*/
+static inline void release_rt( uta_ctx_t* ctx, route_table_t* rt ) {
+       if( ctx == NULL || rt == NULL ) {
+               return;
+       }
+
+       pthread_mutex_lock( ctx->rtgate );                              // must hold lock
+       if( rt->ref_count > 0 ) {                                               // something smells if it's already 0, don't do antyhing if it is
+               rt->ref_count--;
+       }
+       pthread_mutex_unlock( ctx->rtgate );
+}
 #endif