rmr_sym_foreach_class( rt->hash, 1, ep_counts, id ); // run endpoints in the active table
}
+
+static void dump_tables( uta_ctx_t *ctx ) {
+ if( ctx->old_rtable != NULL ) {
+ rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
+ rt_stats( ctx->old_rtable );
+ } else {
+ rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
+ }
+ rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
+ rt_stats( ctx->rtable );
+}
+
// ------------ route manager communication -------------------------------------------------
/*
Send a request for a table update to the route manager. Updates come in
smsg->len = strlen( smsg->payload ) + 1;
- rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d\n", smsg->payload, state, ctx->rtg_whid );
+ rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d table=%s\n", smsg->payload, state, ctx->rtg_whid, table_id );
if( use_rts ) {
smsg = rmr_rts_msg( ctx, smsg );
} else {
is no active table (first load), so we have to account for that (no locking).
*/
static void roll_tables( uta_ctx_t* ctx ) {
+
if( ctx->rtable != NULL ) { // initially there isn't one, so must check!
pthread_mutex_lock( ctx->rtgate ); // must hold lock to move to active
ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
int i;
int ngtoks; // number of tokens in the group list
int grp; // index into group list
+ int cgidx; // contiguous group index (prevents the addition of a contiguous group without ep)
+ int has_ep = FALSE; // indicates if an endpoint was added in a given round robin group
ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
rr_field = clip( rr_field );
rte = uta_add_rte( ctx->new_rtable, key, ngtoks ); // get/create entry for this key
rte->mtype = atoi( ts_field ); // capture mtype for debugging
- for( grp = 0; grp < ngtoks; grp++ ) {
- if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) { // remove any referneces to our ip addrs
+ for( grp = 0, cgidx = 0; grp < ngtoks; grp++ ) {
+ if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) { // remove any references to our ip addrs
for( i = 0; i < ntoks; i++ ) {
if( strcmp( tokens[i], ctx->my_name ) != 0 ) { // don't add if it is us -- cannot send to ourself
if( DEBUG > 1 || (vlevel > 1)) rmr_vlog_force( RMR_VL_DEBUG, "add endpoint ts=%s %s\n", ts_field, tokens[i] );
- uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
+ uta_add_ep( ctx->new_rtable, rte, tokens[i], cgidx );
+ has_ep = TRUE;
}
}
+ if( has_ep ) {
+ cgidx++; // only increment to the next contiguous group if the current one has at least one endpoint
+ has_ep = FALSE;
+ }
}
}
}
}
if( ctx->new_rtable ) {
- if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
roll_tables( ctx ); // roll active to old, and new to active with proper locking
-
- if( vlevel > 0 ) {
- if( ctx->old_rtable != NULL ) {
- rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
- rt_stats( ctx->old_rtable );
- } else {
- rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
- }
- rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
- rt_stats( ctx->rtable );
+ if( DEBUG > 1 || (vlevel > 1) ) {
+ rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
+ dump_tables( ctx );
}
send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received
rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
ctx->new_rtable->updates, tokens[2] );
+ send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
uta_rt_drop( ctx->new_rtable );
ctx->new_rtable = NULL;
break;
if( ctx->new_rtable ) {
roll_tables( ctx ); // roll active to old, and new to active with proper locking
- if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
-
- if( vlevel > 0 ) {
- if( ctx->old_rtable != NULL ) {
- rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
- rt_stats( ctx->old_rtable );
- } else {
- rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
- }
- rmr_vlog_force( RMR_VL_DEBUG, "updated route table:\n" );
- rt_stats( ctx->rtable );
+ if( DEBUG > 1 || (vlevel > 1) ) {
+ rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
+ dump_tables( ctx );
}
+
+ send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
+ ctx->rtable_ready = 1; // route based sends can now happen
} else {
if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed, but one was not started!\n" );
ctx->new_rtable = NULL;
} else { // start a new table.
if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
+ send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as end never made it
uta_rt_drop( ctx->new_rtable );
ctx->new_rtable = NULL;
}
for( i = 0; i < rte->nrrgroups; i++ ) {
if( rte->rrgroups[i] ) {
free( rte->rrgroups[i]->epts ); // ditch list of endpoint pointers (end points are reused; don't trash them)
+ free( rte->rrgroups[i] ); // but must free the rrg itself too
}
+
}
free( rte->rrgroups );
things.nalloc = 2048;
things.nused = 0;
things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
- memset( things.things, 0, sizeof( sizeof( void * ) * things.nalloc ) );
things.names = (const char **) malloc( sizeof( char * ) * things.nalloc );
- memset( things.names, 0, sizeof( char * ) * things.nalloc );
- if( things.things == NULL ) {
+ if( things.things == NULL || things.names == NULL ){
+ if( things.things != NULL) { free( things.things ); }
+ if( things.names != NULL) { free( things.names ); }
+
if( free_on_err ) {
rmr_sym_free( nrt->hash );
free( nrt );
return nrt;
}
+ memset( things.things, 0, sizeof( sizeof( void * ) * things.nalloc ) );
+ memset( things.names, 0, sizeof( char * ) * things.nalloc );
sst = srt->hash; // convenience pointers (src symtab)
nst = nrt->hash;
usleep( 1000 ); // small sleep to yield the processer if that is needed
}
- rmr_sym_clear( rt ); // clear all entries from the old table
+ if( rt->hash != NULL ) {
+ rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL ); // deref and drop if needed
+ rmr_sym_clear( rt->hash ); // clear all entries from the old table
+ }
} else {
rt = NULL;
}
- rt = uta_rt_clone( ctx, ctx->rtable, rt, all ); // also sets the ephash pointer
- rt->ref_count = 0; // take no chances; ensure it's 0!
+ rt = uta_rt_clone( ctx, ctx->rtable, rt, all ); // also sets the ephash pointer
+ rt->ref_count = 0; // take no chances; ensure it's 0!
return rt;
}