-// :vi sw=4 ts=4 noet:
+ // :vi sw=4 ts=4 noet2
/*
==================================================================================
Copyright (c) 2019-2020 Nokia
#include <unistd.h>
#include <netdb.h>
#include <pthread.h>
+#include <immintrin.h>
+#include <stdbool.h>
#include <RIC_message_types.h> // needed for route manager messages
a current symtab.
*/
typedef struct thing_list {
+ int error; // if a realloc failed, this will be set
int nalloc;
int nused;
void** things;
(*counter)++;
} else {
rmr_vlog( RMR_VL_DEBUG, "ep_stas: nil counter %p %p %p", st, entry, name ); // dummy refs
+ return;
}
rmr_vlog_force( RMR_VL_DEBUG, "rt endpoint: target=%s open=%d\n", ep->name, ep->open );
return;
}
- rmr_sym_foreach_class( rt->hash, 1, ep_counts, id ); // run endpoints in the active table
+ rmr_sym_foreach_class( rt->ephash, 1, ep_counts, id ); // run endpoints in the active table
+}
+
+
+static void dump_tables( uta_ctx_t *ctx ) {
+ if( ctx->old_rtable != NULL ) {
+ rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
+ rt_stats( ctx->old_rtable );
+ } else {
+ rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
+ }
+ rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
+ rt_stats( ctx->rtable );
}
// ------------ route manager communication -------------------------------------------------
smsg->len = strlen( smsg->payload ) + 1;
- rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d\n", smsg->payload, state, ctx->rtg_whid );
+ rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d table=%s\n", smsg->payload, state, ctx->rtg_whid, table_id );
if( use_rts ) {
smsg = rmr_rts_msg( ctx, smsg );
} else {
}
}
+// ---- alarm generation --------------------------------------------------------------------------
+
+/*
+ Given the user's context (not the thread private context) look to see if the application isn't
+ working fast enough and we're dropping messages. If the drop counter has changed since the last
+ peeked, and we have not raised an alarm, then we will alarm. If the counter hasn't changed, then we
+ set a timer and if the counter still hasn't changed when it expires we will clear the alarm.
+
+ The private context is what we use to send so as not to interfere with the user flow.
+*/
+static void alarm_if_drops( uta_ctx_t* uctx, uta_ctx_t* pctx ) {
+ static int alarm_raised = 0;
+ static int ok2clear = 0; // time that we can clear
+ static int lastd = 0; // the last counter value so we can compute delta
+ static int prob_id = 0; // problem ID we assume alarm manager handles dups between processes
+
+ rmr_vlog( RMR_VL_DEBUG, "checking for drops... raised=%d 0k2clear=%d lastd=%d probid=%d\n", alarm_raised, ok2clear, lastd, prob_id );
+ if( ! alarm_raised ) {
+ if( uctx->dcount - lastd == 0 ) { // not actively dropping, ok to do nothing
+ return;
+ }
+
+ alarm_raised = 1;
+ uta_alarm( pctx, ALARM_DROPS | ALARM_RAISE, prob_id, "application running slow; RMR is dropping messages" );
+ rmr_vlog( RMR_VL_INFO, "drop alarm raised" );
+ } else {
+ if( uctx->dcount - lastd != 0 ) { // still dropping or dropping again; we've alarmed so nothing to do
+ lastd = uctx->dcount;
+ ok2clear = 0; // reset the timer
+ return;
+ }
+
+ if( ok2clear == 0 ) { // first round where not dropping
+ ok2clear = time( NULL ) + 60; // we'll clear the alarm in 60s
+ } else {
+ if( time( NULL ) > ok2clear ) { // things still stable after expiry
+ rmr_vlog( RMR_VL_INFO, "drop alarm cleared\n" );
+ alarm_raised = 0;
+ uta_alarm( pctx, ALARM_DROPS | ALARM_CLEAR, prob_id, "RMR message dropping has stopped" );
+ prob_id++;
+ }
+ }
+ }
+}
+
// ---- utility -----------------------------------------------------------------------------------
+
+int isspace_with_fence(int c) {
+ _mm_lfence();
+ return isspace( c );
+}
+
/*
Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
*/
static char* clip( char* buf ) {
- char* tok;
+ char* tok=NULL;
while( *buf && isspace( *buf ) ) { // skip leading whitespace
buf++;
}
}
- for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
+ for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace_with_fence( *tok ); tok-- ); // trim trailing spaces too
*(tok+1) = 0;
return buf;
is no active table (first load), so we have to account for that (no locking).
*/
static void roll_tables( uta_ctx_t* ctx ) {
- if( ctx->rtable != NULL ) { // initially there isn't one, so must check!
- pthread_mutex_lock( ctx->rtgate ); // must hold lock to move to active
+ pthread_mutex_lock( ctx->rtgate ); // must hold lock to move to active
+ if( ctx->new_rtable == NULL || ctx->new_rtable->error ) {
+ rmr_vlog( RMR_VL_WARN, "new route table NOT rolled in: nil pointer or error indicated\n" );
+ ctx->old_rtable = ctx->new_rtable;
+ }else if( ctx->rtable != NULL ) { // initially there isn't one, so must check!
ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
- pthread_mutex_unlock( ctx->rtgate );
} else {
ctx->old_rtable = NULL; // ensure there isn't an old reference
ctx->rtable = ctx->new_rtable; // make new the active one
}
-
ctx->new_rtable = NULL;
+ pthread_mutex_unlock( ctx->rtgate );
+}
+
+/*
+ Given a thing list, extend the array of pointers by 1/2 of the current
+ number allocated. If we cannot realloc an array, then we set the error
+ flag. Unlikely, but will prevent a crash, AND will prevent us from
+ trying to use the table since we couldn't capture everything.
+*/
+static void extend_things( thing_list_t* tl ) {
+ int old_alloc;
+ void* old_things;
+ void* old_names;
+
+ if( tl == NULL ) {
+ return;
+ }
+
+ old_alloc = tl->nalloc; // capture current things
+ old_things = tl->things;
+ old_names = tl->names;
+
+ tl->nalloc += tl->nalloc/2; // new allocation size
+
+ tl->things = (void **) malloc( sizeof( void * ) * tl->nalloc ); // allocate larger arrays
+ tl->names = (const char **) malloc( sizeof( char * ) * tl->nalloc );
+
+ if( tl->things == NULL || tl->names == NULL ){ // if either failed, then set error
+ tl->error = 1;
+ return;
+ }
+
+ memcpy( tl->things, old_things, sizeof( void * ) * old_alloc );
+ memcpy( tl->names, old_names, sizeof( void * ) * old_alloc );
+
+ free( old_things );
+ free( old_names );
}
// ------------ entry update functions ---------------------------------------------------------------
uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
char* tokens[128];
char* gtokens[64];
- int i;
int ngtoks; // number of tokens in the group list
int grp; // index into group list
+ int cgidx; // contiguous group index (prevents the addition of a contiguous group without ep)
+ int has_ep = FALSE; // indicates if an endpoint was added in a given round robin group
ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
rr_field = clip( rr_field );
rte = uta_add_rte( ctx->new_rtable, key, ngtoks ); // get/create entry for this key
rte->mtype = atoi( ts_field ); // capture mtype for debugging
- for( grp = 0; grp < ngtoks; grp++ ) {
- if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) { // remove any referneces to our ip addrs
+ for( grp = 0, cgidx = 0; grp < ngtoks; grp++ ) {
+ int i; // avoid sonar grumbling by defining this here
+
+ if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) { // remove any references to our ip addrs
for( i = 0; i < ntoks; i++ ) {
if( strcmp( tokens[i], ctx->my_name ) != 0 ) { // don't add if it is us -- cannot send to ourself
if( DEBUG > 1 || (vlevel > 1)) rmr_vlog_force( RMR_VL_DEBUG, "add endpoint ts=%s %s\n", ts_field, tokens[i] );
- uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
+ uta_add_ep( ctx->new_rtable, rte, tokens[i], cgidx );
+ has_ep = TRUE;
}
}
+ if( has_ep ) {
+ cgidx++; // only increment to the next contiguous group if the current one has at least one endpoint
+ has_ep = FALSE;
+ }
}
}
}
}
}
+/*
+ This will close the current table snarf file (in *.inc) and open a new one.
+ The curent one is renamed. The final file name is determined by the setting of
+ RMR_SNARF_RT, and if not set then the variable RMR_SEED_RT is used and given
+ an additional extension of .snarf. If neither seed or snarf environment vars are
+ set then this does nothing.
+
+ If this is called before the tmp snarf file is opened, then this just opens the file.
+*/
+static void cycle_snarfed_rt( uta_ctx_t* ctx ) {
+ static int ok2warn = 0; // some warnings squelched on first call
+
+ char* seed_fname; // the filename from env
+ char tfname[512]; // temp fname
+ char wfname[512]; // working buffer for filename
+ char* snarf_fname = NULL; // prevent overlay of the static table if snarf_rt not given
+
+ if( ctx == NULL ) {
+ return;
+ }
+
+ if( (snarf_fname = getenv( ENV_STASH_RT )) == NULL ) { // specific place to stash the rt not given
+ if( (seed_fname = getenv( ENV_SEED_RT )) != NULL ) { // no seed, we leave in the default file
+ memset( wfname, 0, sizeof( wfname ) );
+ snprintf( wfname, sizeof( wfname ) - 1, "%s.stash", seed_fname );
+ snarf_fname = wfname;
+ }
+ }
+
+ if( snarf_fname == NULL ) {
+ rmr_vlog( RMR_VL_DEBUG, "cycle_snarf: no file to save in" );
+ return;
+ }
+
+ memset( tfname, 0, sizeof( tfname ) );
+ snprintf( tfname, sizeof( tfname ) -1, "%s.inc", snarf_fname ); // must ensure tmp file is moveable
+
+ if( ctx->snarf_rt_fd >= 0 ) {
+ char* msg= "### captured from route manager\n";
+ write( ctx->snarf_rt_fd, msg, strlen( msg ) );
+ if( close( ctx->snarf_rt_fd ) < 0 ) {
+ rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to close working rt snarf file: %s\n", strerror( errno ) );
+ return;
+ }
+
+ if( unlink( snarf_fname ) < 0 && ok2warn ) { // first time through this can fail and we ignore it
+ rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to unlink old static table: %s: %s\n", snarf_fname, strerror( errno ) );
+ }
+
+ if( rename( tfname, snarf_fname ) ) {
+ rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to move new route table to seed aname : %s -> %s: %s\n", tfname, snarf_fname, strerror( errno ) );
+ } else {
+ rmr_vlog( RMR_VL_INFO, "latest route table info saved in: %s\n", snarf_fname );
+ }
+ }
+ ok2warn = 1;
+
+ ctx->snarf_rt_fd = open( tfname, O_WRONLY | O_CREAT | O_TRUNC, 0660 );
+ if( ctx->snarf_rt_fd < 0 ) {
+ rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to open trt file: %s: %s\n", tfname, strerror( errno ) );
+ } else {
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: rt snarf file opened: %s\n", tfname );
+ }
+}
+
/*
Parse a single record recevied from the route table generator, or read
from a static route table file. Start records cause a new table to
int grp; // group number
rtable_ent_t const* rte; // route table entry added
char* tokens[128];
- char* tok; // pointer into a token or string
+ char* tok=NULL; // pointer into a token or string
char wbuf[1024];
if( ! buf ) {
return;
}
+ if( ctx && ctx->snarf_rt_fd >= 0 ) { // if snarfing table as it arrives, write this puppy
+ write( ctx->snarf_rt_fd, buf, strlen( buf ) );
+ write( ctx->snarf_rt_fd, "\n", 1 );
+ }
+
while( *buf && isspace( *buf ) ) { // skip leading whitespace
buf++;
}
- for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
+ for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace_with_fence( *tok ); tok-- ); // trim trailing spaces too
*(tok+1) = 0;
memset( tokens, 0, sizeof( tokens ) );
case 'n': // newrt|{start|end}
tokens[1] = clip( tokens[1] );
if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
+ if( ctx && ctx->snarf_rt_fd >= 0 ) {
+ cycle_snarfed_rt( ctx ); // make it available and open a new one
+ }
+
if( ntoks >2 ) {
if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received
rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
}
if( ctx->new_rtable ) {
- if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
roll_tables( ctx ); // roll active to old, and new to active with proper locking
-
- if( vlevel > 0 ) {
- if( ctx->old_rtable != NULL ) {
- rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
- rt_stats( ctx->old_rtable );
- } else {
- rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
- }
- rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
- rt_stats( ctx->rtable );
+ if( DEBUG > 1 || (vlevel > 1) ) {
+ rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
+ dump_tables( ctx );
}
send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
ctx->rtable_ready = 1; // route based sends can now happen
+ ctx->flags |= CFL_FULLRT; // indicate we have seen a complete route table
} else {
if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" );
ctx->new_rtable = NULL;
break;
case 'u': // update current table, not a total replacement
+ if( ! (ctx->flags & CFL_FULLRT) ) { // we cannot update until we have a full table from route generator
+ rmr_vlog( RMR_VL_WARN, "route table update ignored: full table not previously recevied" );
+ break;
+ }
+
tokens[1] = clip( tokens[1] );
if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
if( ctx->new_rtable == NULL ) { // update table not in progress
break;
}
+ if( ctx->snarf_rt_fd >= 0 ) {
+ cycle_snarfed_rt( ctx ); // make it available and open a new one
+ }
if( ntoks >2 ) {
if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received
rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
ctx->new_rtable->updates, tokens[2] );
+ send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
uta_rt_drop( ctx->new_rtable );
ctx->new_rtable = NULL;
break;
if( ctx->new_rtable ) {
roll_tables( ctx ); // roll active to old, and new to active with proper locking
- if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
-
- if( vlevel > 0 ) {
- if( ctx->old_rtable != NULL ) {
- rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
- rt_stats( ctx->old_rtable );
- } else {
- rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
- }
- rmr_vlog_force( RMR_VL_DEBUG, "updated route table:\n" );
- rt_stats( ctx->rtable );
+ if( DEBUG > 1 || (vlevel > 1) ) {
+ rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
+ dump_tables( ctx );
}
+
+ send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
+ ctx->rtable_ready = 1; // route based sends can now happen
} else {
if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed, but one was not started!\n" );
ctx->new_rtable = NULL;
} else { // start a new table.
if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
+ send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as end never made it
uta_rt_drop( ctx->new_rtable );
ctx->new_rtable = NULL;
}
char* eor; // end of the record
int rcount = 0; // record count for debug
- if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
- return;
+ if( (fname = ctx->seed_rt_fname) == NULL ) {
+ if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
+ return;
+ }
+
+ ctx->seed_rt_fname = strdup( fname );
+ fname = ctx->seed_rt_fname;
}
if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) { // read file into a single buffer (nil terminated string)
}
/*
- Callback driven for each named thing in a symtab. We collect the pointers to those
+ Callback driven for each thing in a symtab. We collect the pointers to those
things for later use (cloning).
*/
static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
return;
}
- tl->names[tl->nused] = name; // the name/key
+ tl->names[tl->nused] = name; // the name/key (space 0 uses int keys, so name can be nil and that is OK)
tl->things[tl->nused++] = thing; // save a reference to the thing
+
+ if( tl->nused >= tl->nalloc ) {
+ extend_things( tl ); // not enough allocated
+ }
}
/*
for( i = 0; i < rte->nrrgroups; i++ ) {
if( rte->rrgroups[i] ) {
free( rte->rrgroups[i]->epts ); // ditch list of endpoint pointers (end points are reused; don't trash them)
+ free( rte->rrgroups[i] ); // but must free the rrg itself too
}
+
}
free( rte->rrgroups );
return NULL;
}
- rt->gate = ctx->rtgate; // single mutex needed for all route tables
rt->ephash = ctx->ephash; // all route tables share a common endpoint hash
- pthread_mutex_init( rt->gate, NULL );
-
return rt;
}
things.nalloc = 2048;
things.nused = 0;
+ things.error = 0;
things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
- memset( things.things, 0, sizeof( sizeof( void * ) * things.nalloc ) );
things.names = (const char **) malloc( sizeof( char * ) * things.nalloc );
- memset( things.names, 0, sizeof( char * ) * things.nalloc );
- if( things.things == NULL ) {
+ if( things.things == NULL || things.names == NULL ){
+ if( things.things != NULL ) {
+ free( things.things );
+ }
+ if( things.names != NULL ) {
+ free( things.names );
+ }
+
if( free_on_err ) {
rmr_sym_free( nrt->hash );
free( nrt );
nrt = NULL;
+ } else {
+ nrt->error = 1;
}
return nrt;
}
+ memset( things.things, 0, sizeof( sizeof( void * ) * things.nalloc ) );
+ memset( things.names, 0, sizeof( char * ) * things.nalloc );
sst = srt->hash; // convenience pointers (src symtab)
nst = nrt->hash;
rmr_sym_foreach_class( sst, space, collect_things, &things ); // collect things from this space
+ if( things.error ) { // something happened and capture failed
+ rmr_vlog( RMR_VL_ERR, "unable to clone route table: unable to capture old contents\n" );
+ free( things.things );
+ free( things.names );
+ if( free_on_err ) {
+ rmr_sym_free( nrt->hash );
+ free( nrt );
+ nrt = NULL;
+ } else {
+ nrt->error = 1;
+ }
+ return nrt;
+ }
if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "clone space cloned %d things in space %d\n", things.nused, space );
for( i = 0; i < things.nused; i++ ) {
If the old table doesn't exist, then a new table is created and the new pointer is
set to reference it.
+
+ The ME namespace references endpoints which do not need to be released, therefore we
+ do not need to run that portion of the table to deref like we do for the RTEs.
*/
static route_table_t* prep_new_rt( uta_ctx_t* ctx, int all ) {
- int counter = 0;
+ //int counter = 0;
route_table_t* rt;
if( ctx == NULL ) {
return NULL;
}
+ pthread_mutex_lock( ctx->rtgate );
if( (rt = ctx->old_rtable) != NULL ) {
ctx->old_rtable = NULL;
- while( rt->ref_count > 0 ) { // wait for all who are using to stop
- if( counter++ > 1000 ) {
- rmr_vlog( RMR_VL_WARN, "rt_prep_newrt: internal mishap, ref count on table seems wedged" );
- break;
- }
+ while( rt->ref_count > 0 ) { // wait for all who are using to stop
+ //if( counter++ > 1000 ) {
+ // rmr_vlog( RMR_VL_WARN, "rt_prep_newrt: internal mishap, ref count on table seems wedged" );
+ // break;
+ //}
+
+ pthread_mutex_unlock( ctx->rtgate );
usleep( 1000 ); // small sleep to yield the processer if that is needed
+ pthread_mutex_lock( ctx->rtgate );
+
+ }
+
+ if( rt->hash != NULL ) {
+ rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL ); // deref and drop if needed
+ rmr_sym_clear( rt->hash ); // clear all entries from the old table
}
- rmr_sym_clear( rt ); // clear all entries from the old table
+ rt->error = 0; // table with errors can be here, so endure clear before attempt to load
} else {
rt = NULL;
}
+ pthread_mutex_unlock( ctx->rtgate );
- rt = uta_rt_clone( ctx, ctx->rtable, rt, all ); // also sets the ephash pointer
- rt->ref_count = 0; // take no chances; ensure it's 0!
+ rt = uta_rt_clone( ctx, ctx->rtable, rt, all ); // also sets the ephash pointer
+ if( rt != NULL ) { // very small chance for nil, but not zero, so test
+ rt->ref_count = 0; // take no chances; ensure it's 0!
+ } else {
+ rmr_vlog( RMR_VL_ERR, "route table clone returned nil; marking dummy table as error\n" );
+ rt = uta_rt_init( ctx ); // must hav something, but mark it in error state
+ rt->error = 1;
+ }
return rt;
}
}
pthread_mutex_unlock( ctx->rtgate );
}
+
#endif