From a012cf63dfdad3656c995cb06c316fd208c63b98 Mon Sep 17 00:00:00 2001 From: "E. Scott Daniels" Date: Wed, 15 May 2019 14:46:17 +0000 Subject: [PATCH] feature(routes): Add rtable update RMr now supports the ability to update entries in the route table throught the updatert request. Change-Id: I49c7a1875017e36359044399cf99aacc93e15dc7 Signed-off-by: E. Scott Daniels Fix bug in symtab numeric key del Numeric key was being truncated because of an incorrect cast (int, instead of int64_t). Change-Id: I854a2b39e3c890a992ec390879e3a7d2418f6e53 Signed-off-by: E. Scott Daniels Add route table update test Change-Id: I1737219cbadf15c5214a2b54d15f74225c781242 Signed-off-by: E. Scott Daniels Fix whitespace things Change-Id: Ibd5e14ad8c7930d386e407ad92d2e903da6eac86 Signed-off-by: E. Scott Daniels --- CMakeLists.txt | 2 +- src/common/include/rmr_agnostic.h | 2 + src/common/src/rt_generic_static.c | 193 +++++++++++++++++++++++++++++++++++-- src/common/src/symtab.c | 2 +- src/common/src/wrapper.c | 4 +- src/nanomsg/src/rmr.c | 2 +- src/nng/src/rmr_nng.c | 2 +- test/sr_nng_static_test.c | 21 +++- 8 files changed, 213 insertions(+), 15 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 6b73dd9..815b981 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -23,7 +23,7 @@ cmake_minimum_required( VERSION 3.5 ) set( major_version "1" ) # should be automatically populated from git tag later, but until CI process sets a tag we use this set( minor_version "0" ) -set( patch_level "23" ) +set( patch_level "24" ) set( install_root "${CMAKE_INSTALL_PREFIX}" ) set( install_lib "lib" ) diff --git a/src/common/include/rmr_agnostic.h b/src/common/include/rmr_agnostic.h index 4b772e3..418ebbc 100644 --- a/src/common/include/rmr_agnostic.h +++ b/src/common/include/rmr_agnostic.h @@ -180,6 +180,7 @@ typedef struct { */ typedef struct { void* hash; // hash table. + int updates; // counter of update records received } route_table_t; /* @@ -238,6 +239,7 @@ static void del_rte( void* st, void* entry, char const* name, void* thing, void* static char* uta_fib( char* fname ); static route_table_t* uta_rt_init( ); static route_table_t* uta_rt_clone( route_table_t* srt ); +static route_table_t* uta_rt_clone_all( route_table_t* srt ); static void uta_rt_drop( route_table_t* rt ); static endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group ); static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ); diff --git a/src/common/src/rt_generic_static.c b/src/common/src/rt_generic_static.c index ef38b8a..64cf47f 100644 --- a/src/common/src/rt_generic_static.c +++ b/src/common/src/rt_generic_static.c @@ -57,9 +57,75 @@ typedef struct thing_list { void** things; } thing_list_t; -// ------------------------------------------------------------------------------------------------ +// ---- debugging/testing ------------------------------------------------------------------------- + +/* + Dump stats for an endpoint in the RT. +*/ +static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) { + int* counter; + endpoint_t* ep; + + if( (ep = (endpoint_t *) thing) == NULL ) { + return; + } + + if( (counter = (int *) vcounter) != NULL ) { + (*counter)++; + } + + fprintf( stderr, "[DBUG] endpoint: %s open=%d\n", ep->name, ep->open ); +} + +/* + Dump stats for a route entry in the table. +*/ +static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) { + int* counter; + rtable_ent_t* rte; // thing is really an rte + int mtype; + int sid; + + if( (rte = (rtable_ent_t *) thing) == NULL ) { + return; + } + + if( (counter = (int *) vcounter) != NULL ) { + (*counter)++; + } + + mtype = rte->key & 0xffff; + sid = (int) (rte->key >> 32); + + fprintf( stderr, "[DBUG] rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs ); +} + +/* + Given a route table, cause some stats to be spit out. +*/ +static void rt_stats( route_table_t* rt ) { + int* counter; + if( rt == NULL ) { + fprintf( stderr, "[DBUG] rtstats: nil table\n" ); + return; + } + counter = (int *) malloc( sizeof( int ) ); + *counter = 0; + fprintf( stderr, "[DBUG] rtstats:\n" ); + rmr_sym_foreach_class( rt->hash, 1, ep_stats, counter ); // run endpoints in the active table + fprintf( stderr, "[DBUG] %d endpoints\n", *counter ); + + *counter = 0; + rmr_sym_foreach_class( rt->hash, 0, rte_stats, counter ); // run entries + fprintf( stderr, "[DBUG] %d entries\n", *counter ); + + free( counter ); +} + + +// ------------------------------------------------------------------------------------------------ /* Little diddy to trim whitespace and trailing comments. Like shell, trailing comments must be at the start of a word (i.e. must be immediatly preceeded by whitespace). @@ -107,6 +173,7 @@ static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups } memset( rte, 0, sizeof( *rte ) ); rte->refs = 1; + rte->key = key; if( nrrgroups <= 0 ) { nrrgroups = 10; @@ -126,7 +193,7 @@ static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups rmr_sym_map( rt->hash, key, rte ); // add to hash using numeric mtype as key - if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: k=%lu groups=%d\n", key, nrrgroups ); + if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: k=%lx groups=%d\n", key, nrrgroups ); return rte; } @@ -165,7 +232,7 @@ static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* r key = build_rt_key( subid, atoi( ts_field ) ); - if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s subid=%d key=%lu\n", ts_field, subid, key ); + if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key ); if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) { // split round robin groups rte = uta_add_rte( ctx->new_rtable, key, ngtoks ); // get/create entry for this key @@ -185,6 +252,48 @@ static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* r } } +/* + Trash_entry takes a partially parsed record from the input and + will delete the entry if the sender,mtype matches us or it's a + generic mtype. The refernce in the new table is removed and the + refcounter for the actual rte is decreased. If that ref count is + 0 then the memory is freed (handled byh the del_rte call). +*/ +static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) { + rtable_ent_t* rte; // route table entry to be 'deleted' + char* tok; + int ntoks; + uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype + char* tokens[128]; + + if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) { + return; + } + + ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments + + if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all) + (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list + has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses + + key = build_rt_key( subid, atoi( ts_field ) ); + rte = rmr_sym_pull( ctx->new_rtable->hash, key ); // get it + if( rte != NULL ) { + if( DEBUG || (vlevel > 1) ) { + fprintf( stderr, "[DBUG] delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key ); + } + rmr_sym_ndel( ctx->new_rtable->hash, key ); // clear from the new table + del_rte( NULL, NULL, NULL, rte, NULL ); // clean up the memory: reduce ref and free if ref == 0 + } else { + if( DEBUG || (vlevel > 1) ) { + fprintf( stderr, "[DBUG] delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key ); + } + } + } else { + if( DEBUG ) fprintf( stderr, "[DBUG] delete rte skipped: %s\n", ts_field ); + } +} + /* Parse a single record recevied from the route table generator, or read from a static route table file. Start records cause a new table to @@ -225,6 +334,20 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) { case '#': // and comment lines break; + case 'd': // del | [sender,]mtype | sub-id + if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently + break; + } + + if( ntoks < 3 ) { + if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks ); + break; + } + + trash_entry( ctx, tokens[1], atoi( tokens[2] ), vlevel ); + ctx->new_rtable->updates++; + break; + case 'n': // newrt|{start|end} tokens[1] = clip( tokens[1] ); if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building @@ -234,6 +357,13 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) { ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active ctx->new_rtable = NULL; if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of route table noticed\n" ); + + if( vlevel > 0 ) { + fprintf( stderr, "[DBUG] old route table:\n" ); + rt_stats( ctx->old_rtable ); + fprintf( stderr, "[DBUG] new route table:\n" ); + rt_stats( ctx->rtable ); + } } else { if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of route table noticed, but one was not started!\n" ); ctx->new_rtable = NULL; @@ -264,6 +394,7 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) { } build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel ); + ctx->new_rtable->updates++; break; case 'r': // assume rt entry @@ -271,6 +402,7 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) { break; } + ctx->new_rtable->updates++; if( ntoks > 3 ) { // assume new entry with subid last build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel ); } else { @@ -278,6 +410,53 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) { } break; + case 'u': // update current table, not a total replacement + tokens[1] = clip( tokens[1] ); + if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building + if( ntoks >2 ) { + if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received + fprintf( stderr, "[ERR] rmr_rtc: RT update had wrong number of records: received %d expected %s\n", + ctx->new_rtable->updates, tokens[2] ); + uta_rt_drop( ctx->new_rtable ); + ctx->new_rtable = NULL; + break; + } + } + + if( ctx->new_rtable ) { + uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced + ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain' + ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active + ctx->new_rtable = NULL; + if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of rt update noticed\n" ); + + if( vlevel > 0 ) { + fprintf( stderr, "[DBUG] old route table:\n" ); + rt_stats( ctx->old_rtable ); + fprintf( stderr, "[DBUG] updated route table:\n" ); + rt_stats( ctx->rtable ); + } + } else { + if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of rt update noticed, but one was not started!\n" ); + ctx->new_rtable = NULL; + } + } else { // start a new table. + if( ctx->new_rtable != NULL ) { // one in progress? this forces it out + if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" ); + uta_rt_drop( ctx->new_rtable ); + } + + if( ctx->rtable ) { + ctx->new_rtable = uta_rt_clone_all( ctx->rtable ); // start with a clone of everything (endpts and entries) + } else { + ctx->new_rtable = uta_rt_init( ); // don't have one yet, just crate empty + } + + ctx->new_rtable->updates = 0; // init count of updates received + if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] start of rt update noticed\n" ); + } + break; + default: if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: unrecognised request: %s\n", tokens[0] ); break; @@ -348,13 +527,13 @@ static void collect_things( void* st, void* entry, char const* name, void* thing pointers, but NOT the endpoints referenced as those are referenced from multiple entries. - Route table entries can be concurrently referenced by multiple symtabs, so + Route table entries can be concurrently referenced by multiple symtabs, so the actual delete happens only if decrementing the rte's ref count takes it to 0. Thus, it is safe to call this function across a symtab when cleaning up the symtab, or overlaying an entry. This function uses ONLY the pointer to the rte (thing) and ignores the other - information that symtab foreach function passes (st, entry, and data) which + information that symtab foreach function passes (st, entry, and data) which means that it _can_ safetly be used outside of the foreach setting. If the function is changed to depend on any of these three, then a stand-alone rte_cleanup() function should be added and referenced by this, and refererences @@ -475,7 +654,6 @@ static route_table_t* uta_rt_init( ) { static route_table_t* uta_rt_clone( route_table_t* srt ) { endpoint_t* ep; // an endpoint route_table_t* nrt; // new route table - //route_table_t* art; // active route table void* sst; // source symtab void* nst; // new symtab thing_list_t things; @@ -519,14 +697,13 @@ static route_table_t* uta_rt_clone( route_table_t* srt ) { /* Clones _all_ of the given route table (references both endpoints AND the route table - entries. Needed to support a partial update where some route table entries will not + entries. Needed to support a partial update where some route table entries will not be deleted if not explicitly in the update. */ static route_table_t* uta_rt_clone_all( route_table_t* srt ) { endpoint_t* ep; // an endpoint rtable_ent_t* rte; // a route table entry route_table_t* nrt; // new route table - //route_table_t* art; // active route table void* sst; // source symtab void* nst; // new symtab thing_list_t things0; // things from space 0 (table entries) diff --git a/src/common/src/symtab.c b/src/common/src/symtab.c index 31f150f..e96af03 100644 --- a/src/common/src/symtab.c +++ b/src/common/src/symtab.c @@ -297,7 +297,7 @@ extern void rmr_sym_del( void *vtable, const char *name, unsigned int class ) hv = sym_hash( name, table->size ); for(eptr=sym_tab[hv]; eptr && ! same(class, eptr->class, eptr->name, name); eptr=eptr->next ); } else { - nkey = *((int *) name); + nkey = *((uint64_t *) name); hv = nkey % table->size; // just hash the number for( eptr=sym_tab[hv]; eptr && eptr->nkey != nkey; eptr=eptr->next ); } diff --git a/src/common/src/wrapper.c b/src/common/src/wrapper.c index b712f36..68af084 100644 --- a/src/common/src/wrapper.c +++ b/src/common/src/wrapper.c @@ -60,8 +60,8 @@ static char* build_sval( char* name, char* val, int add_sep ) { /* Similar to strcat, bangs src onto the end of target, but UNLIKE - strcat src is freed as a convenience. Max is the max amount - that target can accept; we don't bang on if src len is + strcat src is freed as a convenience. Max is the max amount + that target can accept; we don't bang on if src len is larger than max. Return is the size of src; 0 if the target was not modified. If target is not modified, then src is NOT released. diff --git a/src/nanomsg/src/rmr.c b/src/nanomsg/src/rmr.c index e61664b..b57df3a 100644 --- a/src/nanomsg/src/rmr.c +++ b/src/nanomsg/src/rmr.c @@ -281,7 +281,7 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) { if( altk_ok ) { // ok to retry with alternate key key = build_rt_key( UNSET_SUBID, msg->mtype ); // build key with just mtype and retry send_again = 1; - altk_ok = 0; + altk_ok = 0; continue; } diff --git a/src/nng/src/rmr_nng.c b/src/nng/src/rmr_nng.c index 6240d52..e77c9f3 100644 --- a/src/nng/src/rmr_nng.c +++ b/src/nng/src/rmr_nng.c @@ -237,7 +237,7 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { group = 0; // always start with group 0 key = build_rt_key( msg->sub_id, msg->mtype ); // route table key to find the entry - if( msg->sub_id != UNSET_SUBID ) { + if( msg->sub_id != UNSET_SUBID ) { altk_ok = 1; // if caller's sub-id doesn't hit with mtype, allow mtype only key for retry } while( send_again ) { diff --git a/test/sr_nng_static_test.c b/test/sr_nng_static_test.c index 4997706..c58b281 100644 --- a/test/sr_nng_static_test.c +++ b/test/sr_nng_static_test.c @@ -40,6 +40,10 @@ /* Generate a simple route table (for all but direct route table testing). + This gets tricky inasmuch as we generate two in one; first a whole table + and then two update tables. The first is a table with a bad counter in the + last record to test that we don't load that table and error. The second + is a good update. */ static void gen_rt( uta_ctx_t* ctx ) { int fd; @@ -71,9 +75,24 @@ static void gen_rt( uta_ctx_t* ctx ) { } setenv( "RMR_SEED_RT", "utesting.rt", 1 ); + write( fd, rt_stuff, strlen( rt_stuff ) ); // write in the whole table + + rt_stuff = + "updatert|start\n" // this is an update to the table + "mse|4|99|fooapp:9999,barapp:9999;logger:9999\n" // update just one entry + "updatert|end | 3\n"; // bad count; this update should be rejected + write( fd, rt_stuff, strlen( rt_stuff ) ); + + rt_stuff = + "updatert|start\n" // this is an update to the table + "mse|4|10|fooapp:4561,barapp:4561;logger:9999\n" // update just one entry + "del|2|-1\n" // delete an entry; not there so no action + "del|2|10\n" // delete an entry + "updatert|end | 3\n"; // end table; updates have a count as last field write( fd, rt_stuff, strlen( rt_stuff ) ); + close( fd ); - read_static_rt( ctx, 0 ); + read_static_rt( ctx, 1 ); // force in verbose mode to see stats on tty if failure unlink( "utesting.rt" ); } -- 2.16.6