X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fcommon%2Fsrc%2Frt_generic_static.c;h=9a8d1fd98c14e8b51e5bce1f5d803ed915fa798e;hb=817516b6d7d3ffd874192c3a5342ead4c5da498b;hp=84f8445e44d11b9731bbce76bd5d8947f6fdfc70;hpb=05850e0815095c029ecff43ac8e0983f2fba4fb6;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/common/src/rt_generic_static.c b/src/rmr/common/src/rt_generic_static.c index 84f8445..9a8d1fd 100644 --- a/src/rmr/common/src/rt_generic_static.c +++ b/src/rmr/common/src/rt_generic_static.c @@ -1,4 +1,4 @@ -// :vi sw=4 ts=4 noet: + // :vi sw=4 ts=4 noet2 /* ================================================================================== Copyright (c) 2019-2020 Nokia @@ -57,6 +57,7 @@ a current symtab. */ typedef struct thing_list { + int error; // if a realloc failed, this will be set int nalloc; int nused; void** things; @@ -85,6 +86,7 @@ static void ep_stats( void* st, void* entry, char const* name, void* thing, void (*counter)++; } else { rmr_vlog( RMR_VL_DEBUG, "ep_stas: nil counter %p %p %p", st, entry, name ); // dummy refs + return; } rmr_vlog_force( RMR_VL_DEBUG, "rt endpoint: target=%s open=%d\n", ep->name, ep->open ); @@ -208,7 +210,19 @@ static void rt_epcounts( route_table_t* rt, char* id ) { return; } - rmr_sym_foreach_class( rt->hash, 1, ep_counts, id ); // run endpoints in the active table + rmr_sym_foreach_class( rt->ephash, 1, ep_counts, id ); // run endpoints in the active table +} + + +static void dump_tables( uta_ctx_t *ctx ) { + if( ctx->old_rtable != NULL ) { + rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count ); + rt_stats( ctx->old_rtable ); + } else { + rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" ); + } + rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" ); + rt_stats( ctx->rtable ); } // ------------ route manager communication ------------------------------------------------- @@ -291,7 +305,7 @@ static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int s smsg->len = strlen( smsg->payload ) + 1; - rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d\n", smsg->payload, state, ctx->rtg_whid ); + rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d table=%s\n", smsg->payload, state, ctx->rtg_whid, table_id ); if( use_rts ) { smsg = rmr_rts_msg( ctx, smsg ); } else { @@ -309,6 +323,51 @@ static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int s } } +// ---- alarm generation -------------------------------------------------------------------------- + +/* + Given the user's context (not the thread private context) look to see if the application isn't + working fast enough and we're dropping messages. If the drop counter has changed since the last + peeked, and we have not raised an alarm, then we will alarm. If the counter hasn't changed, then we + set a timer and if the counter still hasn't changed when it expires we will clear the alarm. + + The private context is what we use to send so as not to interfere with the user flow. +*/ +static void alarm_if_drops( uta_ctx_t* uctx, uta_ctx_t* pctx ) { + static int alarm_raised = 0; + static int ok2clear = 0; // time that we can clear + static int lastd = 0; // the last counter value so we can compute delta + static int prob_id = 0; // problem ID we assume alarm manager handles dups between processes + + rmr_vlog( RMR_VL_DEBUG, "checking for drops... raised=%d 0k2clear=%d lastd=%d probid=%d\n", alarm_raised, ok2clear, lastd, prob_id ); + if( ! alarm_raised ) { + if( uctx->dcount - lastd == 0 ) { // not actively dropping, ok to do nothing + return; + } + + alarm_raised = 1; + uta_alarm( pctx, ALARM_DROPS | ALARM_RAISE, prob_id, "application running slow; RMR is dropping messages" ); + rmr_vlog( RMR_VL_INFO, "drop alarm raised" ); + } else { + if( uctx->dcount - lastd != 0 ) { // still dropping or dropping again; we've alarmed so nothing to do + lastd = uctx->dcount; + ok2clear = 0; // reset the timer + return; + } + + if( ok2clear == 0 ) { // first round where not dropping + ok2clear = time( NULL ) + 60; // we'll clear the alarm in 60s + } else { + if( time( NULL ) > ok2clear ) { // things still stable after expiry + rmr_vlog( RMR_VL_INFO, "drop alarm cleared\n" ); + alarm_raised = 0; + uta_alarm( pctx, ALARM_DROPS | ALARM_CLEAR, prob_id, "RMR message dropping has stopped" ); + prob_id++; + } + } + } +} + // ---- utility ----------------------------------------------------------------------------------- /* Little diddy to trim whitespace and trailing comments. Like shell, trailing comments @@ -390,6 +449,13 @@ static char* ensure_nlterm( char* buf ) { */ static void roll_tables( uta_ctx_t* ctx ) { + if( ctx->new_rtable == NULL || ctx->new_rtable->error ) { + rmr_vlog( RMR_VL_WARN, "new route table NOT rolled in: nil pointer or error indicated\n" ); + ctx->old_rtable = ctx->new_rtable; + ctx->new_rtable = NULL; + return; + } + if( ctx->rtable != NULL ) { // initially there isn't one, so must check! pthread_mutex_lock( ctx->rtgate ); // must hold lock to move to active ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain' @@ -403,6 +469,42 @@ static void roll_tables( uta_ctx_t* ctx ) { ctx->new_rtable = NULL; } +/* + Given a thing list, extend the array of pointers by 1/2 of the current + number allocated. If we cannot realloc an array, then we set the error + flag. Unlikely, but will prevent a crash, AND will prevent us from + trying to use the table since we couldn't capture everything. +*/ +static void extend_things( thing_list_t* tl ) { + int old_alloc; + void* old_things; + void* old_names; + + if( tl == NULL ) { + return; + } + + old_alloc = tl->nalloc; // capture current things + old_things = tl->things; + old_names = tl->names; + + tl->nalloc += tl->nalloc/2; // new allocation size + + tl->things = (void **) malloc( sizeof( void * ) * tl->nalloc ); // allocate larger arrays + tl->names = (const char **) malloc( sizeof( char * ) * tl->nalloc ); + + if( tl->things == NULL || tl->names == NULL ){ // if either failed, then set error + tl->error = 1; + return; + } + + memcpy( tl->things, old_things, sizeof( void * ) * old_alloc ); + memcpy( tl->names, old_names, sizeof( void * ) * old_alloc ); + + free( old_things ); + free( old_names ); +} + // ------------ entry update functions --------------------------------------------------------------- /* Given a message type create a route table entry and add to the hash keyed on the @@ -473,9 +575,10 @@ static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* r uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype char* tokens[128]; char* gtokens[64]; - int i; int ngtoks; // number of tokens in the group list int grp; // index into group list + int cgidx; // contiguous group index (prevents the addition of a contiguous group without ep) + int has_ep = FALSE; // indicates if an endpoint was added in a given round robin group ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments rr_field = clip( rr_field ); @@ -495,14 +598,21 @@ static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* r rte = uta_add_rte( ctx->new_rtable, key, ngtoks ); // get/create entry for this key rte->mtype = atoi( ts_field ); // capture mtype for debugging - for( grp = 0; grp < ngtoks; grp++ ) { - if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) { // remove any referneces to our ip addrs + for( grp = 0, cgidx = 0; grp < ngtoks; grp++ ) { + int i; // avoid sonar grumbling by defining this here + + if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) { // remove any references to our ip addrs for( i = 0; i < ntoks; i++ ) { if( strcmp( tokens[i], ctx->my_name ) != 0 ) { // don't add if it is us -- cannot send to ourself if( DEBUG > 1 || (vlevel > 1)) rmr_vlog_force( RMR_VL_DEBUG, "add endpoint ts=%s %s\n", ts_field, tokens[i] ); - uta_add_ep( ctx->new_rtable, rte, tokens[i], grp ); + uta_add_ep( ctx->new_rtable, rte, tokens[i], cgidx ); + has_ep = TRUE; } } + if( has_ep ) { + cgidx++; // only increment to the next contiguous group if the current one has at least one endpoint + has_ep = FALSE; + } } } } @@ -723,6 +833,71 @@ static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char } } +/* + This will close the current table snarf file (in *.inc) and open a new one. + The curent one is renamed. The final file name is determined by the setting of + RMR_SNARF_RT, and if not set then the variable RMR_SEED_RT is used and given + an additional extension of .snarf. If neither seed or snarf environment vars are + set then this does nothing. + + If this is called before the tmp snarf file is opened, then this just opens the file. +*/ +static void cycle_snarfed_rt( uta_ctx_t* ctx ) { + static int ok2warn = 0; // some warnings squelched on first call + + char* seed_fname; // the filename from env + char tfname[512]; // temp fname + char wfname[512]; // working buffer for filename + char* snarf_fname = NULL; // prevent overlay of the static table if snarf_rt not given + + if( ctx == NULL ) { + return; + } + + if( (snarf_fname = getenv( ENV_STASH_RT )) == NULL ) { // specific place to stash the rt not given + if( (seed_fname = getenv( ENV_SEED_RT )) != NULL ) { // no seed, we leave in the default file + memset( wfname, 0, sizeof( wfname ) ); + snprintf( wfname, sizeof( wfname ) - 1, "%s.stash", seed_fname ); + snarf_fname = wfname; + } + } + + if( snarf_fname == NULL ) { + rmr_vlog( RMR_VL_DEBUG, "cycle_snarf: no file to save in" ); + return; + } + + memset( tfname, 0, sizeof( tfname ) ); + snprintf( tfname, sizeof( tfname ) -1, "%s.inc", snarf_fname ); // must ensure tmp file is moveable + + if( ctx->snarf_rt_fd >= 0 ) { + char* msg= "### captured from route manager\n"; + write( ctx->snarf_rt_fd, msg, strlen( msg ) ); + if( close( ctx->snarf_rt_fd ) < 0 ) { + rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to close working rt snarf file: %s\n", strerror( errno ) ); + return; + } + + if( unlink( snarf_fname ) < 0 && ok2warn ) { // first time through this can fail and we ignore it + rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to unlink old static table: %s: %s\n", snarf_fname, strerror( errno ) ); + } + + if( rename( tfname, snarf_fname ) ) { + rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to move new route table to seed aname : %s -> %s: %s\n", tfname, snarf_fname, strerror( errno ) ); + } else { + rmr_vlog( RMR_VL_INFO, "latest route table info saved in: %s\n", snarf_fname ); + } + } + ok2warn = 1; + + ctx->snarf_rt_fd = open( tfname, O_WRONLY | O_CREAT | O_TRUNC, 0660 ); + if( ctx->snarf_rt_fd < 0 ) { + rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to open trt file: %s: %s\n", tfname, strerror( errno ) ); + } else { + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: rt snarf file opened: %s\n", tfname ); + } +} + /* Parse a single record recevied from the route table generator, or read from a static route table file. Start records cause a new table to @@ -784,6 +959,11 @@ static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vleve return; } + if( ctx && ctx->snarf_rt_fd >= 0 ) { // if snarfing table as it arrives, write this puppy + write( ctx->snarf_rt_fd, buf, strlen( buf ) ); + write( ctx->snarf_rt_fd, "\n", 1 ); + } + while( *buf && isspace( *buf ) ) { // skip leading whitespace buf++; } @@ -816,6 +996,10 @@ static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vleve case 'n': // newrt|{start|end} tokens[1] = clip( tokens[1] ); if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building + if( ctx && ctx->snarf_rt_fd >= 0 ) { + cycle_snarfed_rt( ctx ); // make it available and open a new one + } + if( ntoks >2 ) { if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n", @@ -829,22 +1013,15 @@ static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vleve } if( ctx->new_rtable ) { - if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" ); roll_tables( ctx ); // roll active to old, and new to active with proper locking - - if( vlevel > 0 ) { - if( ctx->old_rtable != NULL ) { - rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count ); - rt_stats( ctx->old_rtable ); - } else { - rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" ); - } - rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" ); - rt_stats( ctx->rtable ); + if( DEBUG > 1 || (vlevel > 1) ) { + rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" ); + dump_tables( ctx ); } send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL ); ctx->rtable_ready = 1; // route based sends can now happen + ctx->flags |= CFL_FULLRT; // indicate we have seen a complete route table } else { if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" ); ctx->new_rtable = NULL; @@ -906,16 +1083,25 @@ static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vleve break; case 'u': // update current table, not a total replacement + if( ! (ctx->flags & CFL_FULLRT) ) { // we cannot update until we have a full table from route generator + rmr_vlog( RMR_VL_WARN, "route table update ignored: full table not previously recevied" ); + break; + } + tokens[1] = clip( tokens[1] ); if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building if( ctx->new_rtable == NULL ) { // update table not in progress break; } + if( ctx->snarf_rt_fd >= 0 ) { + cycle_snarfed_rt( ctx ); // make it available and open a new one + } if( ntoks >2 ) { if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n", ctx->new_rtable->updates, tokens[2] ); + send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf ); uta_rt_drop( ctx->new_rtable ); ctx->new_rtable = NULL; break; @@ -924,18 +1110,13 @@ static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vleve if( ctx->new_rtable ) { roll_tables( ctx ); // roll active to old, and new to active with proper locking - if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" ); - - if( vlevel > 0 ) { - if( ctx->old_rtable != NULL ) { - rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count ); - rt_stats( ctx->old_rtable ); - } else { - rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" ); - } - rmr_vlog_force( RMR_VL_DEBUG, "updated route table:\n" ); - rt_stats( ctx->rtable ); + if( DEBUG > 1 || (vlevel > 1) ) { + rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" ); + dump_tables( ctx ); } + + send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL ); + ctx->rtable_ready = 1; // route based sends can now happen } else { if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed, but one was not started!\n" ); ctx->new_rtable = NULL; @@ -943,6 +1124,7 @@ static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vleve } else { // start a new table. if( ctx->new_rtable != NULL ) { // one in progress? this forces it out if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" ); + send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as end never made it uta_rt_drop( ctx->new_rtable ); ctx->new_rtable = NULL; } @@ -985,8 +1167,13 @@ static void read_static_rt( uta_ctx_t* ctx, int vlevel ) { char* eor; // end of the record int rcount = 0; // record count for debug - if( (fname = getenv( ENV_SEED_RT )) == NULL ) { - return; + if( (fname = ctx->seed_rt_fname) == NULL ) { + if( (fname = getenv( ENV_SEED_RT )) == NULL ) { + return; + } + + ctx->seed_rt_fname = strdup( fname ); + fname = ctx->seed_rt_fname; } if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) { // read file into a single buffer (nil terminated string) @@ -1023,7 +1210,7 @@ static void read_static_rt( uta_ctx_t* ctx, int vlevel ) { } /* - Callback driven for each named thing in a symtab. We collect the pointers to those + Callback driven for each thing in a symtab. We collect the pointers to those things for later use (cloning). */ static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) { @@ -1038,8 +1225,12 @@ static void collect_things( void* st, void* entry, char const* name, void* thing return; } - tl->names[tl->nused] = name; // the name/key + tl->names[tl->nused] = name; // the name/key (space 0 uses int keys, so name can be nil and that is OK) tl->things[tl->nused++] = thing; // save a reference to the thing + + if( tl->nused >= tl->nalloc ) { + extend_things( tl ); // not enough allocated + } } /* @@ -1077,7 +1268,9 @@ static void del_rte( void* st, void* entry, char const* name, void* thing, void* for( i = 0; i < rte->nrrgroups; i++ ) { if( rte->rrgroups[i] ) { free( rte->rrgroups[i]->epts ); // ditch list of endpoint pointers (end points are reused; don't trash them) + free( rte->rrgroups[i] ); // but must free the rrg itself too } + } free( rte->rrgroups ); @@ -1207,24 +1400,47 @@ static route_table_t* rt_clone_space( uta_ctx_t* ctx, route_table_t* srt, route_ things.nalloc = 2048; things.nused = 0; + things.error = 0; things.things = (void **) malloc( sizeof( void * ) * things.nalloc ); - memset( things.things, 0, sizeof( sizeof( void * ) * things.nalloc ) ); things.names = (const char **) malloc( sizeof( char * ) * things.nalloc ); - memset( things.names, 0, sizeof( char * ) * things.nalloc ); - if( things.things == NULL ) { + if( things.things == NULL || things.names == NULL ){ + if( things.things != NULL ) { + free( things.things ); + } + if( things.names != NULL ) { + free( things.names ); + } + if( free_on_err ) { rmr_sym_free( nrt->hash ); free( nrt ); nrt = NULL; + } else { + nrt->error = 1; } return nrt; } + memset( things.things, 0, sizeof( sizeof( void * ) * things.nalloc ) ); + memset( things.names, 0, sizeof( char * ) * things.nalloc ); sst = srt->hash; // convenience pointers (src symtab) nst = nrt->hash; rmr_sym_foreach_class( sst, space, collect_things, &things ); // collect things from this space + if( things.error ) { // something happened and capture failed + rmr_vlog( RMR_VL_ERR, "unable to clone route table: unable to capture old contents\n" ); + free( things.things ); + free( things.names ); + if( free_on_err ) { + rmr_sym_free( nrt->hash ); + free( nrt ); + nrt = NULL; + } else { + nrt->error = 1; + } + return nrt; + } if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "clone space cloned %d things in space %d\n", things.nused, space ); for( i = 0; i < things.nused; i++ ) { @@ -1280,9 +1496,13 @@ static route_table_t* uta_rt_clone( uta_ctx_t* ctx, route_table_t* srt, route_ta If the old table doesn't exist, then a new table is created and the new pointer is set to reference it. + + The ME namespace references endpoints which do not need to be released, therefore we + do not need to run that portion of the table to deref like we do for the RTEs. */ static route_table_t* prep_new_rt( uta_ctx_t* ctx, int all ) { int counter = 0; + int ref_count; route_table_t* rt; if( ctx == NULL ) { @@ -1291,22 +1511,42 @@ static route_table_t* prep_new_rt( uta_ctx_t* ctx, int all ) { if( (rt = ctx->old_rtable) != NULL ) { ctx->old_rtable = NULL; - while( rt->ref_count > 0 ) { // wait for all who are using to stop + + pthread_mutex_lock( ctx->rtgate ); + ref_count = rt->ref_count; + pthread_mutex_unlock( ctx->rtgate ); + + while( ref_count > 0 ) { // wait for all who are using to stop if( counter++ > 1000 ) { rmr_vlog( RMR_VL_WARN, "rt_prep_newrt: internal mishap, ref count on table seems wedged" ); break; } usleep( 1000 ); // small sleep to yield the processer if that is needed + + pthread_mutex_lock( ctx->rtgate ); + ref_count = rt->ref_count; + pthread_mutex_unlock( ctx->rtgate ); } - rmr_sym_clear( rt ); // clear all entries from the old table + if( rt->hash != NULL ) { + rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL ); // deref and drop if needed + rmr_sym_clear( rt->hash ); // clear all entries from the old table + } + + rt->error = 0; // table with errors can be here, so endure clear before attempt to load } else { rt = NULL; } rt = uta_rt_clone( ctx, ctx->rtable, rt, all ); // also sets the ephash pointer - rt->ref_count = 0; // take no chances; ensure it's 0! + if( rt != NULL ) { // very small chance for nil, but not zero, so test + rt->ref_count = 0; // take no chances; ensure it's 0! + } else { + rmr_vlog( RMR_VL_ERR, "route table clone returned nil; marking dummy table as error\n" ); + rt = uta_rt_init( ctx ); // must hav something, but mark it in error state + rt->error = 1; + } return rt; }