X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fcommon%2Fsrc%2Frt_generic_static.c;h=0de69c83a202a1132a2b06d98c8fe92395e4ae83;hb=a68562a02028434a87149d5996b291e83d33be51;hp=253e7ab1c9ccda33f025bb9948b2cb4199f8f097;hpb=b7a31bd94da349e9c49f6dd7a5bcded74877a8ae;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/common/src/rt_generic_static.c b/src/rmr/common/src/rt_generic_static.c index 253e7ab..0de69c8 100644 --- a/src/rmr/common/src/rt_generic_static.c +++ b/src/rmr/common/src/rt_generic_static.c @@ -1,10 +1,10 @@ -// :vi sw=4 ts=4 noet: + // :vi sw=4 ts=4 noet2 /* ================================================================================== - Copyright (c) 2019 Nokia - Copyright (c) 2018-2019 AT&T Intellectual Property. + Copyright (c) 2019-2020 Nokia + Copyright (c) 2018-2020 AT&T Intellectual Property. - Licensed under the Apache License, Version 2.0 (the "License"); + Licensed under the Apache License, Version 2.0 (the "License") ; you may not use this file except in compliance with the License. You may obtain a copy of the License at @@ -45,22 +45,36 @@ #include #include #include +#include +#include +#include +#include // needed for route manager messages + +#define ALL 1 +#define SOME 0 /* Passed to a symtab foreach callback to construct a list of pointers from a current symtab. */ typedef struct thing_list { + int error; // if a realloc failed, this will be set int nalloc; int nused; void** things; + const char** names; } thing_list_t; // ---- debugging/testing ------------------------------------------------------------------------- /* - Dump stats for an endpoint in the RT. + Dump some stats for an endpoint in the RT. This is generally called to + verify endpoints after a table load/change. + + This is called by the for-each mechanism of the symtab and the prototype is + fixe; we don't really use some of the parms, but have dummy references to + keep sonar from complaining. */ static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) { int* counter; @@ -72,9 +86,65 @@ static void ep_stats( void* st, void* entry, char const* name, void* thing, void if( (counter = (int *) vcounter) != NULL ) { (*counter)++; + } else { + rmr_vlog( RMR_VL_DEBUG, "ep_stas: nil counter %p %p %p", st, entry, name ); // dummy refs + return; + } + + rmr_vlog_force( RMR_VL_DEBUG, "rt endpoint: target=%s open=%d\n", ep->name, ep->open ); +} + +/* + Called to count meid entries in the table. The meid points to an 'owning' endpoint + so we can list what we find + + See note in ep_stats about dummy refs. +*/ +static void meid_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) { + int* counter; + endpoint_t* ep; + + if( (ep = (endpoint_t *) thing) == NULL ) { + return; + } + + if( (counter = (int *) vcounter) != NULL ) { + (*counter)++; + } else { + rmr_vlog( RMR_VL_DEBUG, "meid_stas: nil counter %p %p %p", st, entry, name ); // dummy refs + } + + rmr_vlog_force( RMR_VL_DEBUG, "meid=%s owner=%s open=%d\n", name, ep->name, ep->open ); +} + +/* + Dump counts for an endpoint in the RT. The vid parm is assumed to point to + the 'source' information and is added to each message. + + See note above about dummy references. +*/ +static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) { + endpoint_t* ep; + char* id; + + if( (ep = (endpoint_t *) thing) == NULL ) { + rmr_vlog( RMR_VL_DEBUG, "ep_counts: nil thing %p %p %p", st, entry, name ); // dummy refs + return; } - fprintf( stderr, "[DBUG] endpoint: %s open=%d\n", ep->name, ep->open ); + if( (id = (char *) vid) == NULL ) { + id = "missing"; + } + + rmr_vlog_force( RMR_VL_INFO, "sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n", + (long long) time( NULL ), + id, + ep->name, + ep->open, + ep->scounts[EPSC_GOOD], + ep->scounts[EPSC_FAIL] + ep->scounts[EPSC_TRANS], + ep->scounts[EPSC_FAIL], + ep->scounts[EPSC_TRANS] ); } /* @@ -82,11 +152,12 @@ static void ep_stats( void* st, void* entry, char const* name, void* thing, void */ static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) { int* counter; - rtable_ent_t* rte; // thing is really an rte + rtable_ent_t const* rte; // thing is really an rte int mtype; int sid; if( (rte = (rtable_ent_t *) thing) == NULL ) { + rmr_vlog( RMR_VL_DEBUG, "rte_stats: nil thing %p %p %p", st, entry, name ); // dummy refs return; } @@ -97,7 +168,7 @@ static void rte_stats( void* st, void* entry, char const* name, void* thing, voi mtype = rte->key & 0xffff; sid = (int) (rte->key >> 32); - fprintf( stderr, "[DBUG] rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs ); + rmr_vlog_force( RMR_VL_DEBUG, "rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs ); } /* @@ -107,31 +178,211 @@ static void rt_stats( route_table_t* rt ) { int* counter; if( rt == NULL ) { - fprintf( stderr, "[DBUG] rtstats: nil table\n" ); + rmr_vlog_force( RMR_VL_DEBUG, "rtstats: nil table\n" ); return; } counter = (int *) malloc( sizeof( int ) ); *counter = 0; - fprintf( stderr, "[DBUG] rtstats:\n" ); - rmr_sym_foreach_class( rt->hash, 1, ep_stats, counter ); // run endpoints in the active table - fprintf( stderr, "[DBUG] %d endpoints\n", *counter ); + rmr_vlog_force( RMR_VL_DEBUG, "route table stats:\n" ); + rmr_vlog_force( RMR_VL_DEBUG, "route table endpoints:\n" ); + rmr_sym_foreach_class( rt->ephash, RT_NAME_SPACE, ep_stats, counter ); // run endpoints (names) in the active table + rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d known endpoints\n", *counter ); + + rmr_vlog_force( RMR_VL_DEBUG, "route table entries:\n" ); + *counter = 0; + rmr_sym_foreach_class( rt->hash, RT_MT_SPACE, rte_stats, counter ); // run message type entries + rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d mt entries in table\n", *counter ); + rmr_vlog_force( RMR_VL_DEBUG, "route table meid map:\n" ); *counter = 0; - rmr_sym_foreach_class( rt->hash, 0, rte_stats, counter ); // run entries - fprintf( stderr, "[DBUG] %d entries\n", *counter ); + rmr_sym_foreach_class( rt->hash, RT_ME_SPACE, meid_stats, counter ); // run meid space + rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d meids in map\n", *counter ); free( counter ); } +/* + Given a route table, cause endpoint counters to be written to stderr. The id + parm is written as the "source" in the output. +*/ +static void rt_epcounts( route_table_t* rt, char* id ) { + if( rt == NULL ) { + rmr_vlog_force( RMR_VL_INFO, "endpoint: no counts: empty table\n" ); + return; + } + + rmr_sym_foreach_class( rt->ephash, 1, ep_counts, id ); // run endpoints in the active table +} + + +static void dump_tables( uta_ctx_t *ctx ) { + if( ctx->old_rtable != NULL ) { + rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count ); + rt_stats( ctx->old_rtable ); + } else { + rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" ); + } + rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" ); + rt_stats( ctx->rtable ); +} + +// ------------ route manager communication ------------------------------------------------- +/* + Send a request for a table update to the route manager. Updates come in + async, so send and go. + + pctx is the private context for the thread; ctx is the application context + that we need to be able to send the application ID in case rt mgr needs to + use it to idenfity us. + + Returns 0 if we were not able to send a request. +*/ +static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) { + rmr_mbuf_t* smsg; + int state = 0; + + if( ctx->rtg_whid < 0 ) { + return state; + } + + smsg = rmr_alloc_msg( pctx, 1024 ); + if( smsg != NULL ) { + smsg->mtype = RMRRM_REQ_TABLE; + smsg->sub_id = 0; + snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, time( NULL ) ); + rmr_vlog( RMR_VL_INFO, "rmr_rtc: requesting table: (%s) whid=%d\n", smsg->payload, ctx->rtg_whid ); + smsg->len = strlen( smsg->payload ) + 1; + + smsg = rmr_wh_send_msg( pctx, ctx->rtg_whid, smsg ); + if( (state = smsg->state) != RMR_OK ) { + rmr_vlog( RMR_VL_INFO, "rmr_rtc: send failed: %d whid=%d\n", smsg->state, ctx->rtg_whid ); + rmr_wh_close( ctx, ctx->rtg_whid ); // send failed, assume connection lost + ctx->rtg_whid = -1; + } + + rmr_free_msg( smsg ); + } + + return state; +} + +/* + Send an ack to the route table manager for a table ID that we are + processing. State is 1 for OK, and 0 for failed. Reason might + be populated if we know why there was a failure. + + Context should be the PRIVATE context that we use for messages + to route manger and NOT the user's context. + + If a message buffere is passed we use that and use return to sender + assuming that this might be a response to a call and that is needed + to send back to the proper calling thread. If msg is nil, we allocate + and use it. +*/ +static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int state, char* reason ) { + int use_rts = 1; + int payload_size = 1024; + + if( ctx == NULL || ctx->rtg_whid < 0 ) { + return; + } + + if( ctx->flags & CFL_NO_RTACK ) { // don't ack if reading from file etc + return; + } + + if( smsg != NULL ) { + smsg = rmr_realloc_payload( smsg, payload_size, FALSE, FALSE ); // ensure it's large enough to send a response + } else { + use_rts = 0; + smsg = rmr_alloc_msg( ctx, payload_size ); + } + + if( smsg != NULL ) { + smsg->mtype = RMRRM_TABLE_STATE; + smsg->sub_id = -1; + snprintf( smsg->payload, payload_size-1, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR", + table_id == NULL ? "" : table_id, reason == NULL ? "" : reason ); + + smsg->len = strlen( smsg->payload ) + 1; + + rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d table=%s\n", smsg->payload, state, ctx->rtg_whid, table_id ); + if( use_rts ) { + smsg = rmr_rts_msg( ctx, smsg ); + } else { + smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg ); + } + if( (state = smsg->state) != RMR_OK ) { + rmr_vlog( RMR_VL_WARN, "unable to send table state: %d\n", smsg->state ); + rmr_wh_close( ctx, ctx->rtg_whid ); // send failed, assume connection lost + ctx->rtg_whid = -1; + } + + if( ! use_rts ) { + rmr_free_msg( smsg ); // if not our message we must free the leftovers + } + } +} + +// ---- alarm generation -------------------------------------------------------------------------- + +/* + Given the user's context (not the thread private context) look to see if the application isn't + working fast enough and we're dropping messages. If the drop counter has changed since the last + peeked, and we have not raised an alarm, then we will alarm. If the counter hasn't changed, then we + set a timer and if the counter still hasn't changed when it expires we will clear the alarm. + + The private context is what we use to send so as not to interfere with the user flow. +*/ +static void alarm_if_drops( uta_ctx_t* uctx, uta_ctx_t* pctx ) { + static int alarm_raised = 0; + static int ok2clear = 0; // time that we can clear + static int lastd = 0; // the last counter value so we can compute delta + static int prob_id = 0; // problem ID we assume alarm manager handles dups between processes + + rmr_vlog( RMR_VL_DEBUG, "checking for drops... raised=%d 0k2clear=%d lastd=%d probid=%d\n", alarm_raised, ok2clear, lastd, prob_id ); + if( ! alarm_raised ) { + if( uctx->dcount - lastd == 0 ) { // not actively dropping, ok to do nothing + return; + } + + alarm_raised = 1; + uta_alarm( pctx, ALARM_DROPS | ALARM_RAISE, prob_id, "application running slow; RMR is dropping messages" ); + rmr_vlog( RMR_VL_INFO, "drop alarm raised" ); + } else { + if( uctx->dcount - lastd != 0 ) { // still dropping or dropping again; we've alarmed so nothing to do + lastd = uctx->dcount; + ok2clear = 0; // reset the timer + return; + } + + if( ok2clear == 0 ) { // first round where not dropping + ok2clear = time( NULL ) + 60; // we'll clear the alarm in 60s + } else { + if( time( NULL ) > ok2clear ) { // things still stable after expiry + rmr_vlog( RMR_VL_INFO, "drop alarm cleared\n" ); + alarm_raised = 0; + uta_alarm( pctx, ALARM_DROPS | ALARM_CLEAR, prob_id, "RMR message dropping has stopped" ); + prob_id++; + } + } + } +} + +// ---- utility ----------------------------------------------------------------------------------- + +int isspace_with_fence(int c) { + _mm_lfence(); + return isspace( c ); +} -// ------------------------------------------------------------------------------------------------ /* Little diddy to trim whitespace and trailing comments. Like shell, trailing comments must be at the start of a word (i.e. must be immediatly preceeded by whitespace). */ static char* clip( char* buf ) { - char* tok; + char* tok=NULL; while( *buf && isspace( *buf ) ) { // skip leading whitespace buf++; @@ -147,13 +398,122 @@ static char* clip( char* buf ) { } } - for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too + for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace_with_fence( *tok ); tok-- ); // trim trailing spaces too *(tok+1) = 0; return buf; } +/* + This accepts a pointer to a nil terminated string, and ensures that there is a + newline as the last character. If there is not, a new buffer is allocated and + the newline is added. If a new buffer is allocated, the buffer passed in is + freed. The function returns a pointer which the caller should use, and must + free. In the event of an error, a nil pointer is returned. +*/ +static char* ensure_nlterm( char* buf ) { + char* nb = NULL; + int len = 0; + + if( buf != NULL ) { + len = strlen( buf ); + } + + nb = buf; // default to returning original as is + switch( len ) { + case 0: + nb = strdup( "\n" ); + break; + + case 1: + if( *buf != '\n' ) { // not a newline; realloc + rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" ); + nb = strdup( " \n" ); + *nb = *buf; + free( buf ); + } + break; + + default: + if( buf[len-1] != '\n' ) { // not newline terminated, realloc + rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" ); + if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) { + memcpy( nb, buf, len ); + *(nb+len) = '\n'; // insert \n and nil into the two extra bytes we allocated + *(nb+len+1) = 0; + free( buf ); + } + } + break; + } + + return nb; +} + +/* + Roll the new table into the active and the active into the old table. We + must have the lock on the active table to do this. It's possible that there + is no active table (first load), so we have to account for that (no locking). +*/ +static void roll_tables( uta_ctx_t* ctx ) { + + if( ctx->new_rtable == NULL || ctx->new_rtable->error ) { + rmr_vlog( RMR_VL_WARN, "new route table NOT rolled in: nil pointer or error indicated\n" ); + ctx->old_rtable = ctx->new_rtable; + ctx->new_rtable = NULL; + return; + } + + if( ctx->rtable != NULL ) { // initially there isn't one, so must check! + pthread_mutex_lock( ctx->rtgate ); // must hold lock to move to active + ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain' + ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active + pthread_mutex_unlock( ctx->rtgate ); + } else { + ctx->old_rtable = NULL; // ensure there isn't an old reference + ctx->rtable = ctx->new_rtable; // make new the active one + } + + ctx->new_rtable = NULL; +} +/* + Given a thing list, extend the array of pointers by 1/2 of the current + number allocated. If we cannot realloc an array, then we set the error + flag. Unlikely, but will prevent a crash, AND will prevent us from + trying to use the table since we couldn't capture everything. +*/ +static void extend_things( thing_list_t* tl ) { + int old_alloc; + void* old_things; + void* old_names; + + if( tl == NULL ) { + return; + } + + old_alloc = tl->nalloc; // capture current things + old_things = tl->things; + old_names = tl->names; + + tl->nalloc += tl->nalloc/2; // new allocation size + + tl->things = (void **) malloc( sizeof( void * ) * tl->nalloc ); // allocate larger arrays + tl->names = (const char **) malloc( sizeof( char * ) * tl->nalloc ); + + if( tl->things == NULL || tl->names == NULL ){ // if either failed, then set error + tl->error = 1; + return; + } + + memcpy( tl->things, old_things, sizeof( void * ) * old_alloc ); + memcpy( tl->names, old_names, sizeof( void * ) * old_alloc ); + + free( old_things ); + free( old_names ); +} + +// ------------ entry update functions --------------------------------------------------------------- /* Given a message type create a route table entry and add to the hash keyed on the message type. Once in the hash, endpoints can be added with uta_add_ep. Size @@ -168,23 +528,27 @@ static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups } if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) { - fprintf( stderr, "rmr_add_rte: malloc failed for entry\n" ); + rmr_vlog( RMR_VL_ERR, "rmr_add_rte: malloc failed for entry\n" ); return NULL; } memset( rte, 0, sizeof( *rte ) ); rte->refs = 1; rte->key = key; - if( nrrgroups <= 0 ) { + if( nrrgroups < 0 ) { // zero is allowed as %meid entries have no groups nrrgroups = 10; } - if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) { - fprintf( stderr, "rmr_add_rte: malloc failed for rrgroup array\n" ); - free( rte ); - return NULL; + if( nrrgroups ) { + if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) { + free( rte ); + return NULL; + } + memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups ); + } else { + rte->rrgroups = NULL; } - memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups ); + rte->nrrgroups = nrrgroups; if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) { @@ -193,12 +557,12 @@ static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups rmr_sym_map( rt->hash, key, rte ); // add to hash using numeric mtype as key - if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: k=%lx groups=%d\n", key, nrrgroups ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups ); return rte; } /* - This accepts partially parsed information from a record sent by route manager or read from + This accepts partially parsed information from an rte or mse record sent by route manager or read from a file such that: ts_field is the msg-type,sender field subid is the integer subscription id @@ -214,42 +578,57 @@ static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups */ static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) { rtable_ent_t* rte; // route table entry added - char* tok; + char const* tok; int ntoks; uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype - char* tokens[128]; - char* gtokens[64]; - int i; + char* tokens[128]; + char* gtokens[64]; int ngtoks; // number of tokens in the group list int grp; // index into group list + int cgidx; // contiguous group index (prevents the addition of a contiguous group without ep) + int has_ep = FALSE; // indicates if an endpoint was added in a given round robin group ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments rr_field = clip( rr_field ); - if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all) + if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all) (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses - key = build_rt_key( subid, atoi( ts_field ) ); - - if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key ); + key = build_rt_key( subid, atoi( ts_field ) ); - if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) { // split round robin groups - rte = uta_add_rte( ctx->new_rtable, key, ngtoks ); // get/create entry for this key + if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key ); - for( grp = 0; grp < ngtoks; grp++ ) { - if( (ntoks = uta_tokenise( gtokens[grp], tokens, 64, ',' )) > 0 ) { - for( i = 0; i < ntoks; i++ ) { - if( DEBUG > 1 || (vlevel > 1)) fprintf( stderr, "[DBUG] add endpoint %s\n", ts_field ); - uta_add_ep( ctx->new_rtable, rte, tokens[i], grp ); + if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) { // split round robin groups + if( strcmp( gtokens[0], "%meid" ) == 0 ) { + ngtoks = 0; // special indicator that uses meid to find endpoint, no rrobin + } + rte = uta_add_rte( ctx->new_rtable, key, ngtoks ); // get/create entry for this key + rte->mtype = atoi( ts_field ); // capture mtype for debugging + + for( grp = 0, cgidx = 0; grp < ngtoks; grp++ ) { + int i; // avoid sonar grumbling by defining this here + + if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) { // remove any references to our ip addrs + for( i = 0; i < ntoks; i++ ) { + if( strcmp( tokens[i], ctx->my_name ) != 0 ) { // don't add if it is us -- cannot send to ourself + if( DEBUG > 1 || (vlevel > 1)) rmr_vlog_force( RMR_VL_DEBUG, "add endpoint ts=%s %s\n", ts_field, tokens[i] ); + uta_add_ep( ctx->new_rtable, rte, tokens[i], cgidx ); + has_ep = TRUE; } } + if( has_ep ) { + cgidx++; // only increment to the next contiguous group if the current one has at least one endpoint + has_ep = FALSE; + } } } - } else { - if( DEBUG || (vlevel > 2) ) - fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] ); } + } else { + if( DEBUG || (vlevel > 2) ) { + rmr_vlog_force( RMR_VL_DEBUG, "build entry: ts_entry not of form msg-type,sender: %s\n", ts_field ); + } + } } /* @@ -261,10 +640,10 @@ static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* r */ static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) { rtable_ent_t* rte; // route table entry to be 'deleted' - char* tok; + char const* tok; int ntoks; uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype - char* tokens[128]; + char* tokens[128]; if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) { return; @@ -272,7 +651,7 @@ static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vle ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments - if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all) + if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all) (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses @@ -280,17 +659,250 @@ static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vle rte = rmr_sym_pull( ctx->new_rtable->hash, key ); // get it if( rte != NULL ) { if( DEBUG || (vlevel > 1) ) { - fprintf( stderr, "[DBUG] delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key ); + rmr_vlog_force( RMR_VL_DEBUG, "delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key ); } rmr_sym_ndel( ctx->new_rtable->hash, key ); // clear from the new table del_rte( NULL, NULL, NULL, rte, NULL ); // clean up the memory: reduce ref and free if ref == 0 } else { if( DEBUG || (vlevel > 1) ) { - fprintf( stderr, "[DBUG] delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key ); + rmr_vlog_force( RMR_VL_DEBUG, "delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key ); + } + } + } else { + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "delete rte skipped: %s\n", ts_field ); + } +} + +// -------------------------- parse functions -------------------------------------------------- + +/* + Given the tokens from an mme_ar (meid add/replace) entry, add the entries. + the 'owner' which should be the dns name or IP address of an enpoint + the meid_list is a space separated list of me IDs + + This function assumes the caller has vetted the pointers as needed. + + For each meid in the list, an entry is pushed into the hash which references the owner + endpoint such that when the meid is used to route a message it references the endpoint + to send messages to. +*/ +static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, int vlevel ) { + char const* tok; + int ntoks; + char* tokens[128]; + int i; + int state; + endpoint_t* ep; // endpoint struct for the owner + + owner = clip( owner ); // ditch extra whitespace and trailing comments + meid_list = clip( meid_list ); + + ntoks = uta_tokenise( meid_list, tokens, 128, ' ' ); + for( i = 0; i < ntoks; i++ ) { + if( (ep = rt_ensure_ep( rtab, owner )) != NULL ) { + state = rmr_sym_put( rtab->hash, tokens[i], RT_ME_SPACE, ep ); // slam this one in if new; replace if there + if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state ); + } else { + rmr_vlog( RMR_VL_WARN, "rmr parse_meid_ar: unable to create an endpoint for owner: %s", owner ); + } + } +} + +/* + Given the tokens from an mme_del, delete the listed meid entries from the new + table. The list is a space separated list of meids. + + The meids in the hash reference endpoints which are never deleted and so + the only thing that we need to do here is to remove the meid from the hash. + + This function assumes the caller has vetted the pointers as needed. +*/ +static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) { + char const* tok; + int ntoks; + char* tokens[128]; + int i; + + if( rtab->hash == NULL ) { + return; + } + + meid_list = clip( meid_list ); + + ntoks = uta_tokenise( meid_list, tokens, 128, ' ' ); + for( i = 0; i < ntoks; i++ ) { + rmr_sym_del( rtab->hash, tokens[i], RT_ME_SPACE ); // and it only took my little finger to blow it away! + if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_del: meid deleted: %s\n", tokens[i] ); + } +} + +/* + Parse a partially parsed meid record. Tokens[0] should be one of: + meid_map, mme_ar, mme_del. + + pctx is the private context needed to return an ack/nack using the provided + message buffer with the route managers address info. +*/ +static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char** tokens, int ntoks, int vlevel ) { + char wbuf[1024]; + + if( tokens == NULL || ntoks < 1 ) { + return; // silent but should never happen + } + + if( ntoks < 2 ) { // must have at least two for any valid request record + rmr_vlog( RMR_VL_ERR, "meid_parse: not enough tokens on %s record\n", tokens[0] ); + return; + } + + if( strcmp( tokens[0], "meid_map" ) == 0 ) { // start or end of the meid map update + tokens[1] = clip( tokens[1] ); + if( *(tokens[1]) == 's' ) { + if( ctx->new_rtable != NULL ) { // one in progress? this forces it out + if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid map start: dropping incomplete table\n" ); + uta_rt_drop( ctx->new_rtable ); + ctx->new_rtable = NULL; + send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as and never made it + } + + if( ctx->table_id != NULL ) { + free( ctx->table_id ); } + if( ntoks > 2 ) { + ctx->table_id = strdup( clip( tokens[2] ) ); + } else { + ctx->table_id = NULL; + } + + ctx->new_rtable = prep_new_rt( ctx, ALL ); // start with a clone of everything (mtype, endpoint refs and meid) + ctx->new_rtable->mupdates = 0; + + if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid_parse: meid map start found\n" ); + } else { + if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building + if( ntoks > 2 ) { // meid_map | end | |??? given + if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) { // count they added didn't match what we received + rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n", + ctx->new_rtable->mupdates, tokens[2] ); + snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates ); + send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf ); + uta_rt_drop( ctx->new_rtable ); + ctx->new_rtable = NULL; + return; + } + + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid_parse: meid map update ended; found expected number of entries: %s\n", tokens[2] ); + } + + if( ctx->new_rtable ) { + roll_tables( ctx ); // roll active to old, and new to active with proper locking + if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of meid map noticed\n" ); + send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL ); + + if( vlevel > 0 ) { + if( ctx->old_rtable != NULL ) { + rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count ); + rt_stats( ctx->old_rtable ); + } else { + rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" ); + } + rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" ); + rt_stats( ctx->rtable ); + } + } else { + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "end of meid map noticed, but one was not started!\n" ); + ctx->new_rtable = NULL; + } + } + } + + return; + } + + if( ! ctx->new_rtable ) { // for any other mmap entries, there must be a table in progress or we punt + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid update/delte (%s) encountered, but table update not started\n", tokens[0] ); + return; + } + + if( strcmp( tokens[0], "mme_ar" ) == 0 ) { + if( ntoks < 3 || tokens[1] == NULL || tokens[2] == NULL ) { + rmr_vlog( RMR_VL_ERR, "meid_parse: mme_ar record didn't have enough tokens found %d\n", ntoks ); + return; + } + parse_meid_ar( ctx->new_rtable, tokens[1], tokens[2], vlevel ); + ctx->new_rtable->mupdates++; + return; + } + + if( strcmp( tokens[0], "mme_del" ) == 0 ) { // ntoks < 2 already validated + parse_meid_del( ctx->new_rtable, tokens[1], vlevel ); + ctx->new_rtable->mupdates++; + return; + } +} + +/* + This will close the current table snarf file (in *.inc) and open a new one. + The curent one is renamed. The final file name is determined by the setting of + RMR_SNARF_RT, and if not set then the variable RMR_SEED_RT is used and given + an additional extension of .snarf. If neither seed or snarf environment vars are + set then this does nothing. + + If this is called before the tmp snarf file is opened, then this just opens the file. +*/ +static void cycle_snarfed_rt( uta_ctx_t* ctx ) { + static int ok2warn = 0; // some warnings squelched on first call + + char* seed_fname; // the filename from env + char tfname[512]; // temp fname + char wfname[512]; // working buffer for filename + char* snarf_fname = NULL; // prevent overlay of the static table if snarf_rt not given + + if( ctx == NULL ) { + return; + } + + if( (snarf_fname = getenv( ENV_STASH_RT )) == NULL ) { // specific place to stash the rt not given + if( (seed_fname = getenv( ENV_SEED_RT )) != NULL ) { // no seed, we leave in the default file + memset( wfname, 0, sizeof( wfname ) ); + snprintf( wfname, sizeof( wfname ) - 1, "%s.stash", seed_fname ); + snarf_fname = wfname; } + } + + if( snarf_fname == NULL ) { + rmr_vlog( RMR_VL_DEBUG, "cycle_snarf: no file to save in" ); + return; + } + + memset( tfname, 0, sizeof( tfname ) ); + snprintf( tfname, sizeof( tfname ) -1, "%s.inc", snarf_fname ); // must ensure tmp file is moveable + + if( ctx->snarf_rt_fd >= 0 ) { + char* msg= "### captured from route manager\n"; + write( ctx->snarf_rt_fd, msg, strlen( msg ) ); + if( close( ctx->snarf_rt_fd ) < 0 ) { + rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to close working rt snarf file: %s\n", strerror( errno ) ); + return; + } + + if( unlink( snarf_fname ) < 0 && ok2warn ) { // first time through this can fail and we ignore it + rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to unlink old static table: %s: %s\n", snarf_fname, strerror( errno ) ); + } + + if( rename( tfname, snarf_fname ) ) { + rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to move new route table to seed aname : %s -> %s: %s\n", tfname, snarf_fname, strerror( errno ) ); + } else { + rmr_vlog( RMR_VL_INFO, "latest route table info saved in: %s\n", snarf_fname ); + } + } + ok2warn = 1; + + ctx->snarf_rt_fd = open( tfname, O_WRONLY | O_CREAT | O_TRUNC, 0660 ); + if( ctx->snarf_rt_fd < 0 ) { + rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to open trt file: %s: %s\n", tfname, strerror( errno ) ); } else { - if( DEBUG ) fprintf( stderr, "[DBUG] delete rte skipped: %s\n", ts_field ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: rt snarf file opened: %s\n", tfname ); } } @@ -302,32 +914,73 @@ static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vle end record causes the in progress table to be finalised and the currently active table is replaced. - We expect one of several types: - newrt|{start|end} + The updated table will be activated when the *|end record is encountered. + However, to allow for a "double" update, where both the meid map and the + route table must be updated at the same time, the end indication on a + route table (new or update) may specifiy "hold" which indicates that meid + map entries are to follow and the updated route table should be held as + pending until the end of the meid map is received and validated. + + CAUTION: we are assuming that there is a single route/meid map generator + and as such only one type of update is received at a time; in other + words, the sender cannot mix update records and if there is more than + one sender process they must synchronise to avoid issues. + + + For a RT update, we expect: + newrt | start | + newrt | end | rte|[,sender]|[;,...] mse|[,sender]||[;,...] + mse| [,sender] | | %meid + + + For a meid map update we expect: + meid_map | start | + meid_map | end | | + mme_ar | | ... + mme_del | ... + + + The pctx is our private context that must be used to send acks/status + messages back to the route manager. The regular ctx is the ctx that + the user has been given and thus that's where we have to hang the route + table we're working with. + + If mbuf is given, and we need to ack, then we ack using the mbuf and a + return to sender call (allows route manager to use wh_call() to send + an update and rts is required to get that back to the right thread). + If mbuf is nil, then one will be allocated (in ack) and a normal wh_send + will be used. */ -static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) { +static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf ) { int i; int ntoks; // number of tokens found in something int ngtoks; int grp; // group number - rtable_ent_t* rte; // route table entry added + rtable_ent_t const* rte; // route table entry added char* tokens[128]; - char* gtokens[64]; // groups - char* tok; // pointer into a token or string + char* tok=NULL; // pointer into a token or string + char wbuf[1024]; if( ! buf ) { return; } + if( ctx && ctx->snarf_rt_fd >= 0 ) { // if snarfing table as it arrives, write this puppy + write( ctx->snarf_rt_fd, buf, strlen( buf ) ); + write( ctx->snarf_rt_fd, "\n", 1 ); + } + while( *buf && isspace( *buf ) ) { // skip leading whitespace buf++; } - for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too + for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace_with_fence( *tok ); tok-- ); // trim trailing spaces too *(tok+1) = 0; + memset( tokens, 0, sizeof( tokens ) ); if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) { + tokens[0] = clip( tokens[0] ); switch( *(tokens[0]) ) { case 0: // ignore blanks // fallthrough @@ -340,7 +993,7 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) { } if( ntoks < 3 ) { - if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks ); + if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks ); break; } @@ -351,50 +1004,77 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) { case 'n': // newrt|{start|end} tokens[1] = clip( tokens[1] ); if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building - if( ctx->new_rtable ) { - uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced - ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain' - ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active - ctx->new_rtable = NULL; - if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of route table noticed\n" ); + if( ctx && ctx->snarf_rt_fd >= 0 ) { + cycle_snarfed_rt( ctx ); // make it available and open a new one + } - if( vlevel > 0 ) { - fprintf( stderr, "[DBUG] old route table:\n" ); - rt_stats( ctx->old_rtable ); - fprintf( stderr, "[DBUG] new route table:\n" ); - rt_stats( ctx->rtable ); + if( ntoks >2 ) { + if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received + rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n", + ctx->new_rtable->updates, tokens[2] ); + snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates ); + send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf ); + uta_rt_drop( ctx->new_rtable ); + ctx->new_rtable = NULL; + break; } + } + + if( ctx->new_rtable ) { + roll_tables( ctx ); // roll active to old, and new to active with proper locking + if( DEBUG > 1 || (vlevel > 1) ) { + rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" ); + dump_tables( ctx ); + } + + send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL ); + ctx->rtable_ready = 1; // route based sends can now happen + ctx->flags |= CFL_FULLRT; // indicate we have seen a complete route table } else { - if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of route table noticed, but one was not started!\n" ); + if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" ); ctx->new_rtable = NULL; } - } else { // start a new table. - if( ctx->new_rtable != NULL ) { // one in progress? this forces it out - if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" ); + } else { // start a new table. + if( ctx->new_rtable != NULL ) { // one in progress? this forces it out + send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as end never made it + + if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" ); uta_rt_drop( ctx->new_rtable ); + ctx->new_rtable = NULL; } - if( ctx->rtable ) { - ctx->new_rtable = uta_rt_clone( ctx->rtable ); // create by cloning endpoint entries from active table + if( ctx->table_id != NULL ) { + free( ctx->table_id ); + } + if( ntoks >2 ) { + ctx->table_id = strdup( clip( tokens[2] ) ); } else { - ctx->new_rtable = uta_rt_init( ); // don't have one yet, just crate empty + ctx->table_id = NULL; } - if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] start of route table noticed\n" ); + + ctx->new_rtable = prep_new_rt( ctx, SOME ); // wait for old table to drain and shift it back to new + ctx->new_rtable->updates = 0; // init count of entries received + + if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "start of route table noticed\n" ); } break; - case 'm': // assume mse entry - if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently - break; - } + case 'm': // mse entry or one of the meid_ records + if( strcmp( tokens[0], "mse" ) == 0 ) { + if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently + break; + } - if( ntoks < 4 ) { - if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks ); - break; - } + if( ntoks < 4 ) { + if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks ); + break; + } - build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel ); - ctx->new_rtable->updates++; + build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel ); + ctx->new_rtable->updates++; + } else { + meid_parser( ctx, pctx, mbuf, tokens, ntoks, vlevel ); + } break; case 'r': // assume rt entry @@ -411,16 +1091,25 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) { break; case 'u': // update current table, not a total replacement + if( ! (ctx->flags & CFL_FULLRT) ) { // we cannot update until we have a full table from route generator + rmr_vlog( RMR_VL_WARN, "route table update ignored: full table not previously recevied" ); + break; + } + tokens[1] = clip( tokens[1] ); if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building if( ctx->new_rtable == NULL ) { // update table not in progress break; } + if( ctx->snarf_rt_fd >= 0 ) { + cycle_snarfed_rt( ctx ); // make it available and open a new one + } if( ntoks >2 ) { if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received - fprintf( stderr, "[ERR] rmr_rtc: RT update had wrong number of records: received %d expected %s\n", + rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n", ctx->new_rtable->updates, tokens[2] ); + send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf ); uta_rt_drop( ctx->new_rtable ); ctx->new_rtable = NULL; break; @@ -428,41 +1117,42 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) { } if( ctx->new_rtable ) { - uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced - ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain' - ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active - ctx->new_rtable = NULL; - if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of rt update noticed\n" ); - - if( vlevel > 0 ) { - fprintf( stderr, "[DBUG] old route table:\n" ); - rt_stats( ctx->old_rtable ); - fprintf( stderr, "[DBUG] updated route table:\n" ); - rt_stats( ctx->rtable ); + roll_tables( ctx ); // roll active to old, and new to active with proper locking + if( DEBUG > 1 || (vlevel > 1) ) { + rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" ); + dump_tables( ctx ); } + + send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL ); + ctx->rtable_ready = 1; // route based sends can now happen } else { - if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of rt update noticed, but one was not started!\n" ); + if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed, but one was not started!\n" ); ctx->new_rtable = NULL; } } else { // start a new table. if( ctx->new_rtable != NULL ) { // one in progress? this forces it out - if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" ); + if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" ); + send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as end never made it uta_rt_drop( ctx->new_rtable ); + ctx->new_rtable = NULL; } - if( ctx->rtable ) { - ctx->new_rtable = uta_rt_clone_all( ctx->rtable ); // start with a clone of everything (endpts and entries) - } else { - ctx->new_rtable = uta_rt_init( ); // don't have one yet, just crate empty + if( ntoks > 2 ) { + if( ctx->table_id != NULL ) { + free( ctx->table_id ); + } + ctx->table_id = strdup( clip( tokens[2] ) ); } - ctx->new_rtable->updates = 0; // init count of updates received - if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] start of rt update noticed\n" ); + ctx->new_rtable = prep_new_rt( ctx, ALL ); // start with a copy of everything in the live table + ctx->new_rtable->updates = 0; // init count of updates received + + if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "start of rt update noticed\n" ); } break; default: - if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: unrecognised request: %s\n", tokens[0] ); + if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: unrecognised request: %s\n", tokens[0] ); break; } } @@ -485,42 +1175,50 @@ static void read_static_rt( uta_ctx_t* ctx, int vlevel ) { char* eor; // end of the record int rcount = 0; // record count for debug - if( (fname = getenv( ENV_SEED_RT )) == NULL ) { - return; + if( (fname = ctx->seed_rt_fname) == NULL ) { + if( (fname = getenv( ENV_SEED_RT )) == NULL ) { + return; + } + + ctx->seed_rt_fname = strdup( fname ); + fname = ctx->seed_rt_fname; } - if( (fbuf = uta_fib( fname ) ) == NULL ) { // read file into a single buffer (nil terminated string) - fprintf( stderr, "[WRN] rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) ); + if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) { // read file into a single buffer (nil terminated string) + rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) ); return; } - if( DEBUG ) fprintf( stderr, "[DBUG] rmr: seed route table successfully opened: %s\n", fname ); + if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "seed route table successfully opened: %s\n", fname ); for( eor = fbuf; *eor; eor++ ) { // fix broken systems that use \r or \r\n to terminate records if( *eor == '\r' ) { *eor = '\n'; // will look like a blank line which is ok } } - for( rec = fbuf; rec && *rec; rec = eor+1 ) { + rec = fbuf; + while( rec && *rec ) { rcount++; if( (eor = strchr( rec, '\n' )) != NULL ) { *eor = 0; } else { - fprintf( stderr, "[WARN] rmr read_static: seed route table had malformed records (missing newline): %s\n", fname ); - fprintf( stderr, "[WARN] rmr read_static: seed route table not used%s\n", fname ); + rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table had malformed records (missing newline): %s\n", fname ); + rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table not used: %s\n", fname ); free( fbuf ); return; } - parse_rt_rec( ctx, rec, vlevel ); + parse_rt_rec( ctx, NULL, rec, vlevel, NULL ); // no pvt context as we can't ack + + rec = eor+1; } - if( DEBUG ) fprintf( stderr, "[DBUG] rmr: seed route table successfully parsed: %d records\n", rcount ); + if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_read_static: seed route table successfully parsed: %d records\n", rcount ); free( fbuf ); } /* - Callback driven for each named thing in a symtab. We collect the pointers to those + Callback driven for each thing in a symtab. We collect the pointers to those things for later use (cloning). */ static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) { @@ -531,10 +1229,16 @@ static void collect_things( void* st, void* entry, char const* name, void* thing } if( thing == NULL ) { + rmr_vlog_force( RMR_VL_DEBUG, "collect things given nil thing: %p %p %p\n", st, entry, name ); // dummy ref for sonar return; } + tl->names[tl->nused] = name; // the name/key (space 0 uses int keys, so name can be nil and that is OK) tl->things[tl->nused++] = thing; // save a reference to the thing + + if( tl->nused >= tl->nalloc ) { + extend_things( tl ); // not enough allocated + } } /* @@ -559,6 +1263,7 @@ static void del_rte( void* st, void* entry, char const* name, void* thing, void* int i; if( (rte = (rtable_ent_t *) thing) == NULL ) { + rmr_vlog_force( RMR_VL_DEBUG, "delrte given nil table: %p %p %p\n", st, entry, name ); // dummy ref for sonar return; } @@ -571,7 +1276,9 @@ static void del_rte( void* st, void* entry, char const* name, void* thing, void* for( i = 0; i < rte->nrrgroups; i++ ) { if( rte->rrgroups[i] ) { free( rte->rrgroups[i]->epts ); // ditch list of endpoint pointers (end points are reused; don't trash them) + free( rte->rrgroups[i] ); // but must free the rrg itself too } + } free( rte->rrgroups ); @@ -590,7 +1297,7 @@ static void del_rte( void* st, void* entry, char const* name, void* thing, void* an empty buffer, as opposed to a nil, so the caller can generate defaults or error if an empty/missing file isn't tolerated. */ -static char* uta_fib( char* fname ) { +static char* uta_fib( char const* fname ) { struct stat stats; off_t fsize = 8192; // size of the file off_t nread; // number of bytes read @@ -606,7 +1313,7 @@ static char* uta_fib( char* fname ) { fsize = stats.st_size; // stat ok, save the file size } } else { - fsize = 8192; // stat failed, we'll leave the file open and try to read a default max of 8k + fsize = 8192; // stat failed, we'll leave the file open and try to read a default max of 8k } } @@ -641,157 +1348,234 @@ static char* uta_fib( char* fname ) { return buf; } +// --------------------- initialisation/creation --------------------------------------------- /* Create and initialise a route table; Returns a pointer to the table struct. */ -static route_table_t* uta_rt_init( ) { +static route_table_t* uta_rt_init( uta_ctx_t* ctx ) { route_table_t* rt; + if( ctx == NULL ) { + return NULL; + } if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) { return NULL; } - if( (rt->hash = rmr_sym_alloc( 509 )) == NULL ) { // modest size, prime + memset( rt, 0, sizeof( *rt ) ); + + if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) { free( rt ); return NULL; } + rt->gate = ctx->rtgate; // single mutex needed for all route tables + rt->ephash = ctx->ephash; // all route tables share a common endpoint hash + pthread_mutex_init( rt->gate, NULL ); + return rt; } /* - Clone (sort of) an existing route table. This is done to preserve the endpoint - names referenced in a table (and thus existing sessions) when a new set - of message type to endpoint name mappings is received. A new route table - with only endpoint name references is returned based on the active table in - the context. + Clones one of the spaces in the given table. + Srt is the source route table, Nrt is the new route table; if nil, we allocate it. + Space is the space in the old table to copy. Space 0 uses an integer key and + references rte structs. All other spaces use a string key and reference endpoints. */ -static route_table_t* uta_rt_clone( route_table_t* srt ) { - endpoint_t* ep; // an endpoint - route_table_t* nrt; // new route table +static route_table_t* rt_clone_space( uta_ctx_t* ctx, route_table_t* srt, route_table_t* nrt, int space ) { + endpoint_t* ep; // an endpoint (ignore sonar complaint about const*) + rtable_ent_t* rte; // a route table entry (ignore sonar complaint about const*) void* sst; // source symtab void* nst; // new symtab - thing_list_t things; - int i; + thing_list_t things; // things from the space to copy + int i; + int free_on_err = 0; - if( srt == NULL ) { + if( ctx == NULL ) { return NULL; } - - if( (nrt = (route_table_t *) malloc( sizeof( *nrt ) )) == NULL ) { - return NULL; + if( nrt == NULL ) { // make a new table if needed + free_on_err = 1; + nrt = uta_rt_init( ctx ); + if( nrt == NULL ) { + return NULL; + } } - if( (nrt->hash = rmr_sym_alloc( 509 )) == NULL ) { // modest size, prime - free( nrt ); - return NULL; + if( srt == NULL ) { // source was nil, just give back the new table + return nrt; } things.nalloc = 2048; things.nused = 0; + things.error = 0; things.things = (void **) malloc( sizeof( void * ) * things.nalloc ); - if( things.things == NULL ) { - free( nrt->hash ); - free( nrt ); - return NULL; + things.names = (const char **) malloc( sizeof( char * ) * things.nalloc ); + if( things.things == NULL || things.names == NULL ){ + if( things.things != NULL ) { + free( things.things ); + } + if( things.names != NULL ) { + free( things.names ); + } + + if( free_on_err ) { + rmr_sym_free( nrt->hash ); + free( nrt ); + nrt = NULL; + } else { + nrt->error = 1; + } + + return nrt; } + memset( things.things, 0, sizeof( sizeof( void * ) * things.nalloc ) ); + memset( things.names, 0, sizeof( char * ) * things.nalloc ); sst = srt->hash; // convenience pointers (src symtab) nst = nrt->hash; - rmr_sym_foreach_class( sst, 1, collect_things, &things ); // collect the named endpoints in the active table + rmr_sym_foreach_class( sst, space, collect_things, &things ); // collect things from this space + if( things.error ) { // something happened and capture failed + rmr_vlog( RMR_VL_ERR, "unable to clone route table: unable to capture old contents\n" ); + free( things.things ); + free( things.names ); + if( free_on_err ) { + rmr_sym_free( nrt->hash ); + free( nrt ); + nrt = NULL; + } else { + nrt->error = 1; + } + return nrt; + } + if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "clone space cloned %d things in space %d\n", things.nused, space ); for( i = 0; i < things.nused; i++ ) { - ep = (endpoint_t *) things.things[i]; - rmr_sym_put( nst, ep->name, 1, ep ); // slam this one into the new table + if( space ) { // string key, epoint reference + ep = (endpoint_t *) things.things[i]; + rmr_sym_put( nst, things.names[i], space, ep ); // slam this one into the new table + } else { + rte = (rtable_ent_t *) things.things[i]; + rte->refs++; // rtes can be removed, so we track references + rmr_sym_map( nst, rte->key, rte ); // add to hash using numeric mtype/sub-id as key (default to space 0) + } } free( things.things ); + free( (void *) things.names ); return nrt; } /* - Clones _all_ of the given route table (references both endpoints AND the route table - entries. Needed to support a partial update where some route table entries will not - be deleted if not explicitly in the update. + Given a destination route table (drt), clone from the source (srt) into it. + If drt is nil, alloc a new one. If srt is nil, then nothing is done (except to + allocate the drt if that was nil too). If all is true (1), then we will clone both + the MT and the ME spaces; otherwise only the ME space is cloned. */ -static route_table_t* uta_rt_clone_all( route_table_t* srt ) { - endpoint_t* ep; // an endpoint - rtable_ent_t* rte; // a route table entry - route_table_t* nrt; // new route table - void* sst; // source symtab - void* nst; // new symtab - thing_list_t things0; // things from space 0 (table entries) - thing_list_t things1; // things from space 1 (end points) +static route_table_t* uta_rt_clone( uta_ctx_t* ctx, route_table_t* srt, route_table_t* drt, int all ) { + endpoint_t* ep; // an endpoint + rtable_ent_t* rte; // a route table entry int i; - if( srt == NULL ) { + if( ctx == NULL ) { return NULL; } - - if( (nrt = (route_table_t *) malloc( sizeof( *nrt ) )) == NULL ) { - return NULL; + if( drt == NULL ) { + drt = uta_rt_init( ctx ); } - - if( (nrt->hash = rmr_sym_alloc( 509 )) == NULL ) { // modest size, prime - free( nrt ); - return NULL; + if( srt == NULL ) { + return drt; } - things0.nalloc = 2048; - things0.nused = 0; - things0.things = (void **) malloc( sizeof( void * ) * things0.nalloc ); - if( things0.things == NULL ) { - free( nrt->hash ); - free( nrt ); - return NULL; + drt->ephash = ctx->ephash; // all rts reference the same EP symtab + rt_clone_space( ctx, srt, drt, RT_ME_SPACE ); + if( all ) { + rt_clone_space( ctx, srt, drt, RT_MT_SPACE ); } - things1.nalloc = 2048; - things1.nused = 0; - things1.things = (void **) malloc( sizeof( void * ) * things1.nalloc ); - if( things1.things == NULL ) { - free( nrt->hash ); - free( nrt ); + return drt; +} + +/* + Prepares the "new" route table for populating. If the old_rtable is not nil, then + we wait for it's use count to reach 0. Then the table is cleared, and moved on the + context to be referenced by the new pointer; the old pointer is set to nil. + + If the old table doesn't exist, then a new table is created and the new pointer is + set to reference it. + + The ME namespace references endpoints which do not need to be released, therefore we + do not need to run that portion of the table to deref like we do for the RTEs. +*/ +static route_table_t* prep_new_rt( uta_ctx_t* ctx, int all ) { + int counter = 0; + int ref_count; + route_table_t* rt; + + if( ctx == NULL ) { return NULL; } - sst = srt->hash; // convenience pointers (src symtab) - nst = nrt->hash; + if( (rt = ctx->old_rtable) != NULL ) { + ctx->old_rtable = NULL; + + pthread_mutex_lock( ctx->rtgate ); + ref_count = rt->ref_count; + pthread_mutex_unlock( ctx->rtgate ); - rmr_sym_foreach_class( sst, 0, collect_things, &things0 ); // collect the rtes - rmr_sym_foreach_class( sst, 1, collect_things, &things1 ); // collect the named endpoints in the active table + while( ref_count > 0 ) { // wait for all who are using to stop + if( counter++ > 1000 ) { + rmr_vlog( RMR_VL_WARN, "rt_prep_newrt: internal mishap, ref count on table seems wedged" ); + break; + } + + usleep( 1000 ); // small sleep to yield the processer if that is needed + + pthread_mutex_lock( ctx->rtgate ); + ref_count = rt->ref_count; + pthread_mutex_unlock( ctx->rtgate ); + } + + if( rt->hash != NULL ) { + rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL ); // deref and drop if needed + rmr_sym_clear( rt->hash ); // clear all entries from the old table + } - for( i = 0; i < things0.nused; i++ ) { - rte = (rtable_ent_t *) things0.things[i]; - rte->refs++; // rtes can be removed, so we track references - rmr_sym_map( nst, rte->key, rte ); // add to hash using numeric mtype/sub-id as key (default to space 0) + rt->error = 0; // table with errors can be here, so endure clear before attempt to load + } else { + rt = NULL; } - for( i = 0; i < things1.nused; i++ ) { - ep = (endpoint_t *) things1.things[i]; - rmr_sym_put( nst, ep->name, 1, ep ); // slam this one into the new table + pthread_mutex_destroy(ctx->rtgate); + rt = uta_rt_clone( ctx, ctx->rtable, rt, all ); // also sets the ephash pointer + if( rt != NULL ) { // very small chance for nil, but not zero, so test + rt->ref_count = 0; // take no chances; ensure it's 0! + } else { + rmr_vlog( RMR_VL_ERR, "route table clone returned nil; marking dummy table as error\n" ); + rt = uta_rt_init( ctx ); // must hav something, but mark it in error state + rt->error = 1; } - free( things0.things ); - free( things1.things ); - return nrt; + return rt; } + /* Given a name, find the endpoint struct in the provided route table. */ static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) { - if( rt == NULL || rt->hash == NULL || ep_name == NULL || *ep_name == 0 ) { + if( rt == NULL || rt->ephash == NULL || ep_name == NULL || *ep_name == 0 ) { return NULL; } - return rmr_sym_get( rt->hash, ep_name, 1 ); + return rmr_sym_get( rt->ephash, ep_name, 1 ); } /* Drop the given route table. Purge all type 0 entries, then drop the symtab itself. + Does NOT destroy the gate as it's a common gate for ALL route tables. */ static void uta_rt_drop( route_table_t* rt ) { if( rt == NULL ) { @@ -812,24 +1596,26 @@ static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) { endpoint_t* ep; if( !rt || !ep_name || ! *ep_name ) { - fprintf( stderr, "[WARN] rmr: rt_ensure: internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name ); + rmr_vlog( RMR_VL_WARN, "rt_ensure: internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name ); errno = EINVAL; return NULL; } - if( (ep = uta_get_ep( rt, ep_name )) == NULL ) { // not there yet, make + if( (ep = uta_get_ep( rt, ep_name )) == NULL ) { // not there yet, make if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) { - fprintf( stderr, "[WARN] rmr: rt_ensure: malloc failed for endpoint creation: %s\n", ep_name ); + rmr_vlog( RMR_VL_WARN, "rt_ensure: malloc failed for endpoint creation: %s\n", ep_name ); errno = ENOMEM; return NULL; } + ep->notify = 1; // show notification on first connection failure ep->open = 0; // not connected ep->addr = uta_h2ip( ep_name ); ep->name = strdup( ep_name ); pthread_mutex_init( &ep->gate, NULL ); // init with default attrs + memset( &ep->scounts[0], 0, sizeof( ep->scounts ) ); - rmr_sym_put( rt->hash, ep_name, 1, ep ); + rmr_sym_put( rt->ephash, ep_name, 1, ep ); } return ep; @@ -852,5 +1638,65 @@ static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) { return key; } +/* + Given a route table and meid string, find the owner (if known). Returns a pointer to + the endpoint struct or nil. +*/ +static inline endpoint_t* get_meid_owner( route_table_t *rt, char const* meid ) { + endpoint_t const* ep; // the ep we found in the hash + + if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) { + return NULL; + } + + return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE ); +} + +/* + This returns a pointer to the currently active route table and ups + the reference count so that the route table is not freed while it + is being used. The caller MUST call release_rt() when finished + with the pointer. + + Care must be taken: the ctx->rtable pointer _could_ change during the time + between the release of the lock and the return. Therefore we MUST grab + the current pointer when we have the lock so that if it does we don't + return a pointer to the wrong table. + + This will return NULL if there is no active table. +*/ +static inline route_table_t* get_rt( uta_ctx_t* ctx ) { + route_table_t* rrt; // return value + + if( ctx == NULL || ctx->rtable == NULL ) { + return NULL; + } + + pthread_mutex_lock( ctx->rtgate ); // must hold lock to bump use + rrt = ctx->rtable; // must stash the pointer while we hold lock + rrt->ref_count++; + pthread_mutex_unlock( ctx->rtgate ); + + return rrt; // pointer we upped the count with +} + +/* + This will "release" the route table by reducing the use counter + in the table. The table may not be freed until the counter reaches + 0, so it's imparative that the pointer be "released" when it is + fetched by get_rt(). Once the caller has released the table it + may not safely use the pointer that it had. +*/ +static inline void release_rt( uta_ctx_t* ctx, route_table_t* rt ) { + if( ctx == NULL || rt == NULL ) { + return; + } + + pthread_mutex_lock( ctx->rtgate ); // must hold lock + if( rt->ref_count > 0 ) { // something smells if it's already 0, don't do antyhing if it is + rt->ref_count--; + } + pthread_mutex_unlock( ctx->rtgate ); +} #endif