Fix RMR routing statistic data printout crash
[ric-plt/lib/rmr.git] / src / rmr / common / src / rt_generic_static.c
index 84f8445..9a8d1fd 100644 (file)
@@ -1,4 +1,4 @@
-// :vi sw=4 ts=4 noet:
+ // :vi sw=4 ts=4 noet2
 /*
 ==================================================================================
        Copyright (c) 2019-2020 Nokia
@@ -57,6 +57,7 @@
        a current symtab.
 */
 typedef struct thing_list {
+       int error;                                      // if a realloc failed, this will be set
        int nalloc;
        int nused;
        void** things;
@@ -85,6 +86,7 @@ static void ep_stats( void* st, void* entry, char const* name, void* thing, void
                (*counter)++;
        } else {
                rmr_vlog( RMR_VL_DEBUG, "ep_stas: nil counter %p %p %p", st, entry, name );     // dummy refs
+               return;
        }
 
        rmr_vlog_force( RMR_VL_DEBUG, "rt endpoint: target=%s open=%d\n", ep->name, ep->open );
@@ -208,7 +210,19 @@ static void  rt_epcounts( route_table_t* rt, char* id ) {
                return;
        }
 
-       rmr_sym_foreach_class( rt->hash, 1, ep_counts, id );            // run endpoints in the active table
+       rmr_sym_foreach_class( rt->ephash, 1, ep_counts, id );          // run endpoints in the active table
+}
+
+
+static void dump_tables( uta_ctx_t *ctx ) {
+       if( ctx->old_rtable != NULL ) {
+               rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
+               rt_stats( ctx->old_rtable );
+       } else {
+               rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
+       }
+       rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
+       rt_stats( ctx->rtable );
 }
 
 // ------------ route manager communication -------------------------------------------------
@@ -291,7 +305,7 @@ static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int s
 
                smsg->len = strlen( smsg->payload ) + 1;
 
-               rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d\n", smsg->payload, state, ctx->rtg_whid );
+               rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d table=%s\n", smsg->payload, state, ctx->rtg_whid, table_id );
                if( use_rts ) {
                        smsg = rmr_rts_msg( ctx, smsg );
                } else {
@@ -309,6 +323,51 @@ static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int s
        }
 }
 
+// ---- alarm generation --------------------------------------------------------------------------
+
+/*
+       Given the user's context (not the thread private context) look to see if the application isn't
+       working fast enough and we're dropping messages. If the drop counter has changed since the last
+       peeked, and we have not raised an alarm, then we will alarm. If the counter hasn't changed, then we
+       set a timer and if the counter still hasn't changed when it expires we will clear the alarm.
+
+       The private context is what we use to send so as not to interfere with the user flow.
+*/
+static void alarm_if_drops( uta_ctx_t* uctx, uta_ctx_t* pctx ) {
+       static  int alarm_raised = 0;
+       static  int ok2clear = 0;                                       // time that we can clear
+       static  int lastd = 0;                                          // the last counter value so we can compute delta
+       static  int prob_id = 0;                                        // problem ID we assume alarm manager handles dups between processes
+
+       rmr_vlog( RMR_VL_DEBUG, "checking for drops... raised=%d 0k2clear=%d lastd=%d probid=%d\n", alarm_raised, ok2clear, lastd, prob_id );
+       if( ! alarm_raised ) {
+               if( uctx->dcount - lastd == 0 ) {                       // not actively dropping, ok to do nothing
+                       return;
+               }
+
+               alarm_raised = 1;
+               uta_alarm( pctx, ALARM_DROPS | ALARM_RAISE, prob_id, "application running slow; RMR is dropping messages" );
+               rmr_vlog( RMR_VL_INFO, "drop alarm raised" );
+       } else {
+               if( uctx->dcount - lastd != 0 ) {                       // still dropping or dropping again; we've alarmed so nothing to do
+                       lastd = uctx->dcount;
+                       ok2clear = 0;                                                   // reset the timer
+                       return;
+               }
+
+               if( ok2clear == 0 ) {                                           // first round where not dropping
+                       ok2clear = time( NULL ) + 60;                   // we'll clear the alarm in 60s
+               } else {
+                       if( time( NULL ) > ok2clear ) {                 // things still stable after expiry
+                               rmr_vlog( RMR_VL_INFO, "drop alarm cleared\n" );
+                               alarm_raised = 0;
+                               uta_alarm( pctx, ALARM_DROPS | ALARM_CLEAR, prob_id, "RMR message dropping has stopped" );
+                               prob_id++;
+                       }
+               }
+       }
+}
+
 // ---- utility -----------------------------------------------------------------------------------
 /*
        Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
@@ -390,6 +449,13 @@ static char* ensure_nlterm( char* buf ) {
 */
 static void roll_tables( uta_ctx_t* ctx ) {
 
+       if( ctx->new_rtable == NULL || ctx->new_rtable->error ) {
+               rmr_vlog( RMR_VL_WARN, "new route table NOT rolled in: nil pointer or error indicated\n" );
+               ctx->old_rtable = ctx->new_rtable;
+               ctx->new_rtable = NULL;
+               return;
+       }
+
        if( ctx->rtable != NULL ) {                                                     // initially there isn't one, so must check!
                pthread_mutex_lock( ctx->rtgate );                              // must hold lock to move to active
                ctx->old_rtable = ctx->rtable;                                  // currently active becomes old and allowed to 'drain'
@@ -403,6 +469,42 @@ static void roll_tables( uta_ctx_t* ctx ) {
        ctx->new_rtable = NULL;
 }
 
+/*
+       Given a thing list, extend the array of pointers by 1/2 of the current
+       number allocated. If we cannot realloc an array, then we set the error
+       flag.  Unlikely, but will prevent a crash, AND will prevent us from
+       trying to use the table since we couldn't capture everything.
+*/
+static void extend_things( thing_list_t* tl ) {
+       int old_alloc;
+       void*   old_things;
+       void*   old_names;
+
+       if( tl == NULL ) {
+               return;
+       }
+
+       old_alloc = tl->nalloc;                         // capture current things
+       old_things = tl->things;
+       old_names = tl->names;
+
+       tl->nalloc += tl->nalloc/2;                     // new allocation size
+
+       tl->things = (void **) malloc( sizeof( void * ) * tl->nalloc );                 // allocate larger arrays
+       tl->names = (const char **) malloc( sizeof( char * ) * tl->nalloc );
+
+       if( tl->things == NULL || tl->names == NULL ){                          // if either failed, then set error
+               tl->error = 1;
+               return;
+       }
+
+       memcpy( tl->things, old_things, sizeof( void * ) * old_alloc );
+       memcpy( tl->names, old_names, sizeof( void * ) * old_alloc );
+
+       free( old_things );
+       free( old_names );
+}
+
 // ------------ entry update functions ---------------------------------------------------------------
 /*
        Given a message type create a route table entry and add to the hash keyed on the
@@ -473,9 +575,10 @@ static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* r
        uint64_t key = 0;                       // the symtab key will be mtype or sub_id+mtype
        char*   tokens[128];
        char*   gtokens[64];
-       int             i;
        int             ngtoks;                         // number of tokens in the group list
        int             grp;                            // index into group list
+       int             cgidx;                          // contiguous group index (prevents the addition of a contiguous group without ep)
+       int             has_ep = FALSE;         // indicates if an endpoint was added in a given round robin group
 
        ts_field = clip( ts_field );                            // ditch extra whitespace and trailing comments
        rr_field = clip( rr_field );
@@ -495,14 +598,21 @@ static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* r
                        rte = uta_add_rte( ctx->new_rtable, key, ngtoks );                                                              // get/create entry for this key
                        rte->mtype = atoi( ts_field );                                                                                                  // capture mtype for debugging
 
-                       for( grp = 0; grp < ngtoks; grp++ ) {
-                               if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) {          // remove any referneces to our ip addrs
+                       for( grp = 0, cgidx = 0; grp < ngtoks; grp++ ) {
+                               int             i;                                      // avoid sonar grumbling by defining this here
+
+                               if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) {          // remove any references to our ip addrs
                                        for( i = 0; i < ntoks; i++ ) {
                                                if( strcmp( tokens[i], ctx->my_name ) != 0 ) {                                  // don't add if it is us -- cannot send to ourself
                                                        if( DEBUG > 1  || (vlevel > 1)) rmr_vlog_force( RMR_VL_DEBUG, "add endpoint  ts=%s %s\n", ts_field, tokens[i] );
-                                                       uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
+                                                       uta_add_ep( ctx->new_rtable, rte, tokens[i], cgidx );
+                                                       has_ep = TRUE;
                                                }
                                        }
+                                       if( has_ep ) {
+                                               cgidx++;        // only increment to the next contiguous group if the current one has at least one endpoint
+                                               has_ep = FALSE;
+                                       }
                                }
                        }
                }
@@ -723,6 +833,71 @@ static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char
        }
 }
 
+/*
+       This will close the current table snarf file (in *.inc) and open a new one.
+       The curent one is renamed. The final file name is determined by the setting of
+       RMR_SNARF_RT, and if not set then the variable RMR_SEED_RT is used and given
+       an additional extension of .snarf.  If neither seed or snarf environment vars are
+       set then this does nothing.
+
+       If this is called before the tmp snarf file is opened, then this just opens the file.
+*/
+static void cycle_snarfed_rt( uta_ctx_t* ctx ) {
+       static int              ok2warn = 0;    // some warnings squelched on first call
+
+       char*   seed_fname;                             // the filename from env
+       char    tfname[512];                    // temp fname
+       char    wfname[512];                    // working buffer for filename
+       char*   snarf_fname = NULL;             // prevent overlay of the static table if snarf_rt not given
+
+       if( ctx == NULL ) {
+               return;
+       }
+
+       if( (snarf_fname = getenv(  ENV_STASH_RT )) == NULL ) {                         // specific place to stash the rt not given
+               if( (seed_fname = getenv( ENV_SEED_RT )) != NULL ) {                    // no seed, we leave in the default file
+                       memset( wfname, 0, sizeof( wfname ) );
+                       snprintf( wfname, sizeof( wfname ) - 1, "%s.stash", seed_fname );
+                       snarf_fname = wfname;
+               }
+       }
+
+       if( snarf_fname == NULL ) {
+               rmr_vlog( RMR_VL_DEBUG, "cycle_snarf: no file to save in" );
+               return;
+       }
+
+       memset( tfname, 0, sizeof( tfname ) );
+       snprintf( tfname, sizeof( tfname ) -1, "%s.inc", snarf_fname );         // must ensure tmp file is moveable
+
+       if( ctx->snarf_rt_fd >= 0 ) {
+               char* msg= "### captured from route manager\n";
+               write( ctx->snarf_rt_fd, msg, strlen( msg ) );
+               if( close( ctx->snarf_rt_fd ) < 0 ) {
+                       rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to close working rt snarf file: %s\n", strerror( errno ) );
+                       return;
+               }
+
+               if( unlink( snarf_fname ) < 0  && ok2warn ) {                                   // first time through this can fail and we ignore it
+                       rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to unlink old static table: %s: %s\n", snarf_fname, strerror( errno ) );
+               }
+
+               if( rename( tfname, snarf_fname ) ) {
+                       rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to move new route table to seed aname : %s -> %s: %s\n", tfname, snarf_fname, strerror( errno ) );
+               } else {
+                       rmr_vlog( RMR_VL_INFO, "latest route table info saved in: %s\n", snarf_fname );
+               }
+       }
+       ok2warn = 1;
+
+       ctx->snarf_rt_fd = open( tfname, O_WRONLY | O_CREAT | O_TRUNC, 0660 );
+       if( ctx->snarf_rt_fd < 0 ) {
+               rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to open trt file: %s: %s\n", tfname, strerror( errno ) );
+       } else {
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: rt snarf file opened: %s\n", tfname );
+       }
+}
+
 /*
        Parse a single record recevied from the route table generator, or read
        from a static route table file.  Start records cause a new table to
@@ -784,6 +959,11 @@ static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vleve
                return;
        }
 
+       if( ctx && ctx->snarf_rt_fd  >= 0 ) {                                                           // if snarfing table as it arrives, write this puppy
+               write( ctx->snarf_rt_fd, buf, strlen( buf ) );
+               write( ctx->snarf_rt_fd, "\n", 1 );
+       }
+
        while( *buf && isspace( *buf ) ) {                                                      // skip leading whitespace
                buf++;
        }
@@ -816,6 +996,10 @@ static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vleve
                        case 'n':                                                                                               // newrt|{start|end}
                                tokens[1] = clip( tokens[1] );
                                if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
+                                       if( ctx && ctx->snarf_rt_fd >= 0 ) {
+                                               cycle_snarfed_rt( ctx );                                        // make it available and open a new one
+                                       }
+
                                        if( ntoks >2 ) {
                                                if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
                                                        rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
@@ -829,22 +1013,15 @@ static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vleve
                                        }
 
                                        if( ctx->new_rtable ) {
-                                               if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
                                                roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
-
-                                               if( vlevel > 0 ) {
-                                                       if( ctx->old_rtable != NULL ) {
-                                                               rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
-                                                               rt_stats( ctx->old_rtable );
-                                                       } else {
-                                                               rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
-                                                       }
-                                                       rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
-                                                       rt_stats( ctx->rtable );
+                                               if( DEBUG > 1 || (vlevel > 1) ) {
+                                                       rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
+                                                       dump_tables( ctx );
                                                }
 
                                                send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
                                                ctx->rtable_ready = 1;                                                  // route based sends can now happen
+                                               ctx->flags |= CFL_FULLRT;                                               // indicate we have seen a complete route table
                                        } else {
                                                if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" );
                                                ctx->new_rtable = NULL;
@@ -906,16 +1083,25 @@ static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vleve
                                break;
 
                        case 'u':                                                                                               // update current table, not a total replacement
+                               if( ! (ctx->flags & CFL_FULLRT) ) {                                     // we cannot update until we have a full table from route generator
+                                       rmr_vlog( RMR_VL_WARN, "route table update ignored: full table not previously recevied" );
+                                       break;
+                               }
+
                                tokens[1] = clip( tokens[1] );
                                if( strcmp( tokens[1], "end" ) == 0 ) {                         // wrap up the table we were building
                                        if( ctx->new_rtable == NULL ) {                                 // update table not in progress
                                                break;
                                        }
+                                       if( ctx->snarf_rt_fd >= 0 ) {
+                                               cycle_snarfed_rt( ctx );                                        // make it available and open a new one
+                                       }
 
                                        if( ntoks >2 ) {
                                                if( ctx->new_rtable->updates != atoi( tokens[2] ) ) {   // count they added didn't match what we received
                                                        rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
                                                                ctx->new_rtable->updates, tokens[2] );
+                                                       send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
                                                        uta_rt_drop( ctx->new_rtable );
                                                        ctx->new_rtable = NULL;
                                                        break;
@@ -924,18 +1110,13 @@ static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vleve
 
                                        if( ctx->new_rtable ) {
                                                roll_tables( ctx );                                             // roll active to old, and new to active with proper locking
-                                               if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
-
-                                               if( vlevel > 0 ) {
-                                                       if( ctx->old_rtable != NULL ) {
-                                                               rmr_vlog_force( RMR_VL_DEBUG, "old route table:  (ref_count=%d)\n", ctx->old_rtable->ref_count );
-                                                               rt_stats( ctx->old_rtable );
-                                                       } else {
-                                                               rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
-                                                       }
-                                                       rmr_vlog_force( RMR_VL_DEBUG, "updated route table:\n" );
-                                                       rt_stats( ctx->rtable );
+                                               if( DEBUG > 1 || (vlevel > 1) )  {
+                                                       rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
+                                                       dump_tables( ctx );
                                                }
+
+                                               send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
+                                               ctx->rtable_ready = 1;                                                  // route based sends can now happen
                                        } else {
                                                if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed, but one was not started!\n" );
                                                ctx->new_rtable = NULL;
@@ -943,6 +1124,7 @@ static void parse_rt_rec( uta_ctx_t* ctx,  uta_ctx_t* pctx, char* buf, int vleve
                                } else {                                                                                        // start a new table.
                                        if( ctx->new_rtable != NULL ) {                                 // one in progress?  this forces it out
                                                if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
+                                               send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" );                        // nack the one that was pending as end never made it
                                                uta_rt_drop( ctx->new_rtable );
                                                ctx->new_rtable = NULL;
                                        }
@@ -985,8 +1167,13 @@ static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
        char*   eor;                            // end of the record
        int             rcount = 0;                     // record count for debug
 
-       if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
-               return;
+       if( (fname = ctx->seed_rt_fname) == NULL ) {
+               if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
+                       return;
+               }
+
+               ctx->seed_rt_fname = strdup( fname );
+               fname = ctx->seed_rt_fname;
        }
 
        if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) {                     // read file into a single buffer (nil terminated string)
@@ -1023,7 +1210,7 @@ static void read_static_rt( uta_ctx_t* ctx, int vlevel ) {
 }
 
 /*
-       Callback driven for each named thing in a symtab. We collect the pointers to those
+       Callback driven for each thing in a symtab. We collect the pointers to those
        things for later use (cloning).
 */
 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
@@ -1038,8 +1225,12 @@ static void collect_things( void* st, void* entry, char const* name, void* thing
                return;
        }
 
-       tl->names[tl->nused] = name;                    // the name/key
+       tl->names[tl->nused] = name;                    // the name/key (space 0 uses int keys, so name can be nil and that is OK)
        tl->things[tl->nused++] = thing;                // save a reference to the thing
+
+       if( tl->nused >= tl->nalloc ) {
+               extend_things( tl );                            // not enough allocated
+       }
 }
 
 /*
@@ -1077,7 +1268,9 @@ static void del_rte( void* st, void* entry, char const* name, void* thing, void*
                for( i = 0; i < rte->nrrgroups; i++ ) {
                        if( rte->rrgroups[i] ) {
                                free( rte->rrgroups[i]->epts );                 // ditch list of endpoint pointers (end points are reused; don't trash them)
+                               free( rte->rrgroups[i] );                               // but must free the rrg itself too
                        }
+
                }
 
                free( rte->rrgroups );
@@ -1207,24 +1400,47 @@ static route_table_t* rt_clone_space( uta_ctx_t* ctx, route_table_t* srt, route_
 
        things.nalloc = 2048;
        things.nused = 0;
+       things.error = 0;
        things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
-       memset( things.things, 0, sizeof( sizeof( void * ) * things.nalloc ) );
        things.names = (const char **) malloc( sizeof( char * ) * things.nalloc );
-       memset( things.names, 0, sizeof( char * ) * things.nalloc );
-       if( things.things == NULL ) {
+       if( things.things == NULL || things.names == NULL ){
+               if( things.things != NULL ) {
+                       free( things.things );
+               }
+               if( things.names != NULL ) {
+                       free( things.names );
+               }
+
                if( free_on_err ) {
                        rmr_sym_free( nrt->hash );
                        free( nrt );
                        nrt = NULL;
+               } else {
+                       nrt->error = 1;
                }
 
                return nrt;
        }
+       memset( things.things, 0, sizeof( sizeof( void * ) * things.nalloc ) );
+       memset( things.names, 0, sizeof( char * ) * things.nalloc );
 
        sst = srt->hash;                                                                                        // convenience pointers (src symtab)
        nst = nrt->hash;
 
        rmr_sym_foreach_class( sst, space, collect_things, &things );           // collect things from this space
+       if( things.error ) {                            // something happened and capture failed
+               rmr_vlog( RMR_VL_ERR, "unable to clone route table: unable to capture old contents\n" );
+               free( things.things );
+               free( things.names );
+               if( free_on_err ) {
+                       rmr_sym_free( nrt->hash );
+                       free( nrt );
+                       nrt = NULL;
+               } else {
+                       nrt->error = 1;
+               }
+               return nrt;
+       }
 
        if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "clone space cloned %d things in space %d\n",  things.nused, space );
        for( i = 0; i < things.nused; i++ ) {
@@ -1280,9 +1496,13 @@ static route_table_t* uta_rt_clone( uta_ctx_t* ctx, route_table_t* srt, route_ta
 
        If the old table doesn't exist, then a new table is created and the new pointer is
        set to reference it.
+
+       The ME namespace references endpoints which do not need to be released, therefore we
+       do not need to run that portion of the table to deref like we do for the RTEs.
 */
 static route_table_t* prep_new_rt( uta_ctx_t* ctx, int all ) {
        int counter = 0;
+       int ref_count;
        route_table_t*  rt;
 
        if( ctx == NULL ) {
@@ -1291,22 +1511,42 @@ static route_table_t* prep_new_rt( uta_ctx_t* ctx, int all ) {
 
        if( (rt = ctx->old_rtable) != NULL ) {
                ctx->old_rtable = NULL;
-               while( rt->ref_count > 0 ) {                    // wait for all who are using to stop
+
+               pthread_mutex_lock( ctx->rtgate );
+               ref_count = rt->ref_count;
+               pthread_mutex_unlock( ctx->rtgate );
+
+               while( ref_count > 0 ) {                                // wait for all who are using to stop
                        if( counter++ > 1000 ) {
                                rmr_vlog( RMR_VL_WARN, "rt_prep_newrt:  internal mishap, ref count on table seems wedged" );
                                break;
                        }
 
                        usleep( 1000 );                                         // small sleep to yield the processer if that is needed
+
+                       pthread_mutex_lock( ctx->rtgate );
+                       ref_count = rt->ref_count;
+                       pthread_mutex_unlock( ctx->rtgate );
                }
 
-               rmr_sym_clear( rt );                                    // clear all entries from the old table
+               if( rt->hash != NULL ) {
+                       rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL );            // deref and drop if needed
+                       rmr_sym_clear( rt->hash );                                                                      // clear all entries from the old table
+               }
+
+               rt->error = 0;                                                                  // table with errors can be here, so endure clear before attempt to load
        } else {
                rt = NULL;
        }
 
        rt = uta_rt_clone( ctx, ctx->rtable, rt, all );         // also sets the ephash pointer
-       rt->ref_count = 0;                                                                      // take no chances; ensure it's 0!
+       if( rt != NULL ) {                                                                      // very small chance for nil, but not zero, so test
+               rt->ref_count = 0;                                                              // take no chances; ensure it's 0!
+       } else {
+               rmr_vlog( RMR_VL_ERR, "route table clone returned nil; marking dummy table as error\n" );
+               rt = uta_rt_init( ctx );                                                // must hav something, but mark it in error state
+               rt->error = 1;
+       }
 
        return rt;
 }