X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fcommon%2Fsrc%2Frt_generic_static.c;h=eb887a9bfb74a233e2f68c25318688cb88e251af;hb=refs%2Fchanges%2F15%2F5115%2F2;hp=597347ab83a4d3b1e76bbcfffddde3a9f4b2f86c;hpb=cf4413c47ce274d7fc08c3bcfc8c4de3d465ad4d;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/common/src/rt_generic_static.c b/src/rmr/common/src/rt_generic_static.c index 597347a..eb887a9 100644 --- a/src/rmr/common/src/rt_generic_static.c +++ b/src/rmr/common/src/rt_generic_static.c @@ -4,7 +4,7 @@ Copyright (c) 2019-2020 Nokia Copyright (c) 2018-2020 AT&T Intellectual Property. - Licensed under the Apache License, Version 2.0 (the "License"); + Licensed under the Apache License, Version 2.0 (the "License") ; you may not use this file except in compliance with the License. You may obtain a copy of the License at @@ -45,9 +45,12 @@ #include #include #include +#include #include // needed for route manager messages +#define ALL 1 +#define SOME 0 /* Passed to a symtab foreach callback to construct a list of pointers from @@ -65,6 +68,10 @@ typedef struct thing_list { /* Dump some stats for an endpoint in the RT. This is generally called to verify endpoints after a table load/change. + + This is called by the for-each mechanism of the symtab and the prototype is + fixe; we don't really use some of the parms, but have dummy references to + keep sonar from complaining. */ static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) { int* counter; @@ -76,6 +83,8 @@ static void ep_stats( void* st, void* entry, char const* name, void* thing, void if( (counter = (int *) vcounter) != NULL ) { (*counter)++; + } else { + rmr_vlog( RMR_VL_DEBUG, "ep_stas: nil counter %p %p %p", st, entry, name ); // dummy refs } rmr_vlog_force( RMR_VL_DEBUG, "rt endpoint: target=%s open=%d\n", ep->name, ep->open ); @@ -84,6 +93,8 @@ static void ep_stats( void* st, void* entry, char const* name, void* thing, void /* Called to count meid entries in the table. The meid points to an 'owning' endpoint so we can list what we find + + See note in ep_stats about dummy refs. */ static void meid_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) { int* counter; @@ -95,6 +106,8 @@ static void meid_stats( void* st, void* entry, char const* name, void* thing, vo if( (counter = (int *) vcounter) != NULL ) { (*counter)++; + } else { + rmr_vlog( RMR_VL_DEBUG, "meid_stas: nil counter %p %p %p", st, entry, name ); // dummy refs } rmr_vlog_force( RMR_VL_DEBUG, "meid=%s owner=%s open=%d\n", name, ep->name, ep->open ); @@ -103,12 +116,15 @@ static void meid_stats( void* st, void* entry, char const* name, void* thing, vo /* Dump counts for an endpoint in the RT. The vid parm is assumed to point to the 'source' information and is added to each message. + + See note above about dummy references. */ static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) { endpoint_t* ep; char* id; if( (ep = (endpoint_t *) thing) == NULL ) { + rmr_vlog( RMR_VL_DEBUG, "ep_counts: nil thing %p %p %p", st, entry, name ); // dummy refs return; } @@ -132,11 +148,12 @@ static void ep_counts( void* st, void* entry, char const* name, void* thing, voi */ static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) { int* counter; - rtable_ent_t* rte; // thing is really an rte + rtable_ent_t const* rte; // thing is really an rte int mtype; int sid; if( (rte = (rtable_ent_t *) thing) == NULL ) { + rmr_vlog( RMR_VL_DEBUG, "rte_stats: nil thing %p %p %p", st, entry, name ); // dummy refs return; } @@ -165,7 +182,7 @@ static void rt_stats( route_table_t* rt ) { *counter = 0; rmr_vlog_force( RMR_VL_DEBUG, "route table stats:\n" ); rmr_vlog_force( RMR_VL_DEBUG, "route table endpoints:\n" ); - rmr_sym_foreach_class( rt->hash, RT_NAME_SPACE, ep_stats, counter ); // run endpoints (names) in the active table + rmr_sym_foreach_class( rt->ephash, RT_NAME_SPACE, ep_stats, counter ); // run endpoints (names) in the active table rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d known endpoints\n", *counter ); rmr_vlog_force( RMR_VL_DEBUG, "route table entries:\n" ); @@ -194,6 +211,18 @@ static void rt_epcounts( route_table_t* rt, char* id ) { rmr_sym_foreach_class( rt->hash, 1, ep_counts, id ); // run endpoints in the active table } + +static void dump_tables( uta_ctx_t *ctx ) { + if( ctx->old_rtable != NULL ) { + rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count ); + rt_stats( ctx->old_rtable ); + } else { + rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" ); + } + rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" ); + rt_stats( ctx->rtable ); +} + // ------------ route manager communication ------------------------------------------------- /* Send a request for a table update to the route manager. Updates come in @@ -217,10 +246,10 @@ static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) { if( smsg != NULL ) { smsg->mtype = RMRRM_REQ_TABLE; smsg->sub_id = 0; - snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, (long) time( NULL ) ); + snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, time( NULL ) ); rmr_vlog( RMR_VL_INFO, "rmr_rtc: requesting table: (%s) whid=%d\n", smsg->payload, ctx->rtg_whid ); smsg->len = strlen( smsg->payload ) + 1; - + smsg = rmr_wh_send_msg( pctx, ctx->rtg_whid, smsg ); if( (state = smsg->state) != RMR_OK ) { rmr_vlog( RMR_VL_INFO, "rmr_rtc: send failed: %d whid=%d\n", smsg->state, ctx->rtg_whid ); @@ -236,7 +265,7 @@ static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) { /* Send an ack to the route table manager for a table ID that we are - processing. State is 1 for OK, and 0 for failed. Reason might + processing. State is 1 for OK, and 0 for failed. Reason might be populated if we know why there was a failure. Context should be the PRIVATE context that we use for messages @@ -250,7 +279,7 @@ static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) { static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int state, char* reason ) { int use_rts = 1; int payload_size = 1024; - + if( ctx == NULL || ctx->rtg_whid < 0 ) { return; } @@ -269,12 +298,12 @@ static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int s if( smsg != NULL ) { smsg->mtype = RMRRM_TABLE_STATE; smsg->sub_id = -1; - snprintf( smsg->payload, payload_size-1, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR", + snprintf( smsg->payload, payload_size-1, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR", table_id == NULL ? "" : table_id, reason == NULL ? "" : reason ); smsg->len = strlen( smsg->payload ) + 1; - - rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d\n", smsg->payload, smsg->state, ctx->rtg_whid ); + + rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d table=%s\n", smsg->payload, state, ctx->rtg_whid, table_id ); if( use_rts ) { smsg = rmr_rts_msg( ctx, smsg ); } else { @@ -292,13 +321,13 @@ static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int s } } -// ------------------------------------------------------------------------------------------------ +// ---- utility ----------------------------------------------------------------------------------- /* Little diddy to trim whitespace and trailing comments. Like shell, trailing comments must be at the start of a word (i.e. must be immediatly preceeded by whitespace). */ static char* clip( char* buf ) { - char* tok; + char* tok; while( *buf && isspace( *buf ) ) { // skip leading whitespace buf++; @@ -329,30 +358,64 @@ static char* clip( char* buf ) { */ static char* ensure_nlterm( char* buf ) { char* nb = NULL; - int len = 1; + int len = 0; - nb = buf; - if( buf == NULL || (len = strlen( buf )) < 2 ) { - if( (nb = (char *) malloc( sizeof( char ) * 2 )) != NULL ) { - *nb = '\n'; - *(nb+1) = 0; - } - } else { - if( buf[len-1] != '\n' ) { - rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" ); - if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) { - memcpy( nb, buf, len ); - *(nb+len) = '\n'; // insert \n and nil into the two extra bytes we allocated - *(nb+len+1) = 0; - } + if( buf != NULL ) { + len = strlen( buf ); + } - free( buf ); - } + nb = buf; // default to returning original as is + switch( len ) { + case 0: + nb = strdup( "\n" ); + break; + + case 1: + if( *buf != '\n' ) { // not a newline; realloc + rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" ); + nb = strdup( " \n" ); + *nb = *buf; + free( buf ); + } + break; + + default: + if( buf[len-1] != '\n' ) { // not newline terminated, realloc + rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" ); + if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) { + memcpy( nb, buf, len ); + *(nb+len) = '\n'; // insert \n and nil into the two extra bytes we allocated + *(nb+len+1) = 0; + free( buf ); + } + } + break; } return nb; } +/* + Roll the new table into the active and the active into the old table. We + must have the lock on the active table to do this. It's possible that there + is no active table (first load), so we have to account for that (no locking). +*/ +static void roll_tables( uta_ctx_t* ctx ) { + + if( ctx->rtable != NULL ) { // initially there isn't one, so must check! + pthread_mutex_lock( ctx->rtgate ); // must hold lock to move to active + ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain' + ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active + pthread_mutex_unlock( ctx->rtgate ); + } else { + ctx->old_rtable = NULL; // ensure there isn't an old reference + ctx->rtable = ctx->new_rtable; // make new the active one + } + + ctx->new_rtable = NULL; +} + +// ------------ entry update functions --------------------------------------------------------------- /* Given a message type create a route table entry and add to the hash keyed on the message type. Once in the hash, endpoints can be added with uta_add_ep. Size @@ -417,19 +480,21 @@ static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups */ static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) { rtable_ent_t* rte; // route table entry added - char* tok; + char const* tok; int ntoks; uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype - char* tokens[128]; - char* gtokens[64]; + char* tokens[128]; + char* gtokens[64]; int i; int ngtoks; // number of tokens in the group list int grp; // index into group list + int cgidx; // contiguous group index (prevents the addition of a contiguous group without ep) + int has_ep = FALSE; // indicates if an endpoint was added in a given round robin group ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments rr_field = clip( rr_field ); - if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all) + if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all) (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses @@ -444,20 +509,25 @@ static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* r rte = uta_add_rte( ctx->new_rtable, key, ngtoks ); // get/create entry for this key rte->mtype = atoi( ts_field ); // capture mtype for debugging - for( grp = 0; grp < ngtoks; grp++ ) { - if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) { // remove any referneces to our ip addrs + for( grp = 0, cgidx = 0; grp < ngtoks; grp++ ) { + if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) { // remove any references to our ip addrs for( i = 0; i < ntoks; i++ ) { if( strcmp( tokens[i], ctx->my_name ) != 0 ) { // don't add if it is us -- cannot send to ourself if( DEBUG > 1 || (vlevel > 1)) rmr_vlog_force( RMR_VL_DEBUG, "add endpoint ts=%s %s\n", ts_field, tokens[i] ); - uta_add_ep( ctx->new_rtable, rte, tokens[i], grp ); + uta_add_ep( ctx->new_rtable, rte, tokens[i], cgidx ); + has_ep = TRUE; } } + if( has_ep ) { + cgidx++; // only increment to the next contiguous group if the current one has at least one endpoint + has_ep = FALSE; + } } } } } else { if( DEBUG || (vlevel > 2) ) { - rmr_vlog_force( RMR_VL_DEBUG, "entry not included, sender not matched: %s\n", tokens[1] ); + rmr_vlog_force( RMR_VL_DEBUG, "build entry: ts_entry not of form msg-type,sender: %s\n", ts_field ); } } } @@ -471,10 +541,10 @@ static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* r */ static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) { rtable_ent_t* rte; // route table entry to be 'deleted' - char* tok; + char const* tok; int ntoks; uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype - char* tokens[128]; + char* tokens[128]; if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) { return; @@ -482,7 +552,7 @@ static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vle ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments - if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all) + if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all) (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses @@ -504,6 +574,8 @@ static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vle } } +// -------------------------- parse functions -------------------------------------------------- + /* Given the tokens from an mme_ar (meid add/replace) entry, add the entries. the 'owner' which should be the dns name or IP address of an enpoint @@ -516,9 +588,9 @@ static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vle to send messages to. */ static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, int vlevel ) { - char* tok; + char const* tok; int ntoks; - char* tokens[128]; + char* tokens[128]; int i; int state; endpoint_t* ep; // endpoint struct for the owner @@ -547,9 +619,9 @@ static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, in This function assumes the caller has vetted the pointers as needed. */ static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) { - char* tok; + char const* tok; int ntoks; - char* tokens[128]; + char* tokens[128]; int i; if( rtab->hash == NULL ) { @@ -590,25 +662,29 @@ static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char if( ctx->new_rtable != NULL ) { // one in progress? this forces it out if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid map start: dropping incomplete table\n" ); uta_rt_drop( ctx->new_rtable ); - send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as end never made it + ctx->new_rtable = NULL; + send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as and never made it } if( ctx->table_id != NULL ) { free( ctx->table_id ); } - if( ntoks >2 ) { + if( ntoks > 2 ) { ctx->table_id = strdup( clip( tokens[2] ) ); } else { ctx->table_id = NULL; } - ctx->new_rtable = uta_rt_clone_all( ctx->rtable ); // start with a clone of everything (mtype, endpoint refs and meid) + + ctx->new_rtable = prep_new_rt( ctx, ALL ); // start with a clone of everything (mtype, endpoint refs and meid) ctx->new_rtable->mupdates = 0; + if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid_parse: meid map start found\n" ); } else { if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building if( ntoks > 2 ) { // meid_map | end | |??? given if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) { // count they added didn't match what we received - rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n", ctx->new_rtable->mupdates, tokens[2] ); + rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n", + ctx->new_rtable->mupdates, tokens[2] ); snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates ); send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf ); uta_rt_drop( ctx->new_rtable ); @@ -620,16 +696,17 @@ static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char } if( ctx->new_rtable ) { - uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced - ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain' - ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active - ctx->new_rtable = NULL; + roll_tables( ctx ); // roll active to old, and new to active with proper locking if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of meid map noticed\n" ); send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL ); if( vlevel > 0 ) { - rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" ); - rt_stats( ctx->old_rtable ); + if( ctx->old_rtable != NULL ) { + rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count ); + rt_stats( ctx->old_rtable ); + } else { + rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" ); + } rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" ); rt_stats( ctx->rtable ); } @@ -641,9 +718,9 @@ static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char } return; - } + } - if( ! ctx->new_rtable ) { // for any other mmap entries, there must be a table in progress or we punt + if( ! ctx->new_rtable ) { // for any other mmap entries, there must be a table in progress or we punt if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid update/delte (%s) encountered, but table update not started\n", tokens[0] ); return; } @@ -655,15 +732,13 @@ static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char } parse_meid_ar( ctx->new_rtable, tokens[1], tokens[2], vlevel ); ctx->new_rtable->mupdates++; + return; } - if( strcmp( tokens[0], "mme_del" ) == 0 ) { - if( ntoks < 2 ) { - rmr_vlog( RMR_VL_ERR, "meid_parse: mme_del record didn't have enough tokens\n" ); - return; - } + if( strcmp( tokens[0], "mme_del" ) == 0 ) { // ntoks < 2 already validated parse_meid_del( ctx->new_rtable, tokens[1], vlevel ); ctx->new_rtable->mupdates++; + return; } } @@ -719,7 +794,7 @@ static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vleve int ntoks; // number of tokens found in something int ngtoks; int grp; // group number - rtable_ent_t* rte; // route table entry added + rtable_ent_t const* rte; // route table entry added char* tokens[128]; char* tok; // pointer into a token or string char wbuf[1024]; @@ -734,6 +809,7 @@ static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vleve for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too *(tok+1) = 0; + memset( tokens, 0, sizeof( tokens ) ); if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) { tokens[0] = clip( tokens[0] ); switch( *(tokens[0]) ) { @@ -772,17 +848,10 @@ static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vleve } if( ctx->new_rtable ) { - uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced - ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain' - ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active - ctx->new_rtable = NULL; - if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" ); - - if( vlevel > 0 ) { - rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" ); - rt_stats( ctx->old_rtable ); - rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" ); - rt_stats( ctx->rtable ); + roll_tables( ctx ); // roll active to old, and new to active with proper locking + if( DEBUG > 1 || (vlevel > 1) ) { + rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" ); + dump_tables( ctx ); } send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL ); @@ -797,6 +866,7 @@ static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vleve if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" ); uta_rt_drop( ctx->new_rtable ); + ctx->new_rtable = NULL; } if( ctx->table_id != NULL ) { @@ -808,9 +878,9 @@ static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vleve ctx->table_id = NULL; } - ctx->new_rtable = NULL; - ctx->new_rtable = uta_rt_clone( ctx->rtable ); // create by cloning endpoint and meidtentries from active table + ctx->new_rtable = prep_new_rt( ctx, SOME ); // wait for old table to drain and shift it back to new ctx->new_rtable->updates = 0; // init count of entries received + if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "start of route table noticed\n" ); } break; @@ -857,6 +927,7 @@ static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vleve if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n", ctx->new_rtable->updates, tokens[2] ); + send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf ); uta_rt_drop( ctx->new_rtable ); ctx->new_rtable = NULL; break; @@ -864,18 +935,14 @@ static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vleve } if( ctx->new_rtable ) { - uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced - ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain' - ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active - ctx->new_rtable = NULL; - if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" ); - - if( vlevel > 0 ) { - rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" ); - rt_stats( ctx->old_rtable ); - rmr_vlog_force( RMR_VL_DEBUG, "updated route table:\n" ); - rt_stats( ctx->rtable ); + roll_tables( ctx ); // roll active to old, and new to active with proper locking + if( DEBUG > 1 || (vlevel > 1) ) { + rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" ); + dump_tables( ctx ); } + + send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL ); + ctx->rtable_ready = 1; // route based sends can now happen } else { if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed, but one was not started!\n" ); ctx->new_rtable = NULL; @@ -883,18 +950,21 @@ static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vleve } else { // start a new table. if( ctx->new_rtable != NULL ) { // one in progress? this forces it out if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" ); + send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as end never made it uta_rt_drop( ctx->new_rtable ); + ctx->new_rtable = NULL; } - if( ntoks >2 ) { + if( ntoks > 2 ) { if( ctx->table_id != NULL ) { free( ctx->table_id ); } ctx->table_id = strdup( clip( tokens[2] ) ); } - ctx->new_rtable = uta_rt_clone_all( ctx->rtable ); // start with a clone of everything (endpts and entries) - ctx->new_rtable->updates = 0; // init count of updates received + ctx->new_rtable = prep_new_rt( ctx, ALL ); // start with a copy of everything in the live table + ctx->new_rtable->updates = 0; // init count of updates received + if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "start of rt update noticed\n" ); } break; @@ -972,6 +1042,7 @@ static void collect_things( void* st, void* entry, char const* name, void* thing } if( thing == NULL ) { + rmr_vlog_force( RMR_VL_DEBUG, "collect things given nil thing: %p %p %p\n", st, entry, name ); // dummy ref for sonar return; } @@ -1001,6 +1072,7 @@ static void del_rte( void* st, void* entry, char const* name, void* thing, void* int i; if( (rte = (rtable_ent_t *) thing) == NULL ) { + rmr_vlog_force( RMR_VL_DEBUG, "delrte given nil table: %p %p %p\n", st, entry, name ); // dummy ref for sonar return; } @@ -1013,7 +1085,9 @@ static void del_rte( void* st, void* entry, char const* name, void* thing, void* for( i = 0; i < rte->nrrgroups; i++ ) { if( rte->rrgroups[i] ) { free( rte->rrgroups[i]->epts ); // ditch list of endpoint pointers (end points are reused; don't trash them) + free( rte->rrgroups[i] ); // but must free the rrg itself too } + } free( rte->rrgroups ); @@ -1032,7 +1106,7 @@ static void del_rte( void* st, void* entry, char const* name, void* thing, void* an empty buffer, as opposed to a nil, so the caller can generate defaults or error if an empty/missing file isn't tolerated. */ -static char* uta_fib( char* fname ) { +static char* uta_fib( char const* fname ) { struct stat stats; off_t fsize = 8192; // size of the file off_t nread; // number of bytes read @@ -1048,7 +1122,7 @@ static char* uta_fib( char* fname ) { fsize = stats.st_size; // stat ok, save the file size } } else { - fsize = 8192; // stat failed, we'll leave the file open and try to read a default max of 8k + fsize = 8192; // stat failed, we'll leave the file open and try to read a default max of 8k } } @@ -1083,21 +1157,31 @@ static char* uta_fib( char* fname ) { return buf; } +// --------------------- initialisation/creation --------------------------------------------- /* Create and initialise a route table; Returns a pointer to the table struct. */ -static route_table_t* uta_rt_init( ) { +static route_table_t* uta_rt_init( uta_ctx_t* ctx ) { route_table_t* rt; + if( ctx == NULL ) { + return NULL; + } if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) { return NULL; } + memset( rt, 0, sizeof( *rt ) ); + if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) { free( rt ); return NULL; } + rt->gate = ctx->rtgate; // single mutex needed for all route tables + rt->ephash = ctx->ephash; // all route tables share a common endpoint hash + pthread_mutex_init( rt->gate, NULL ); + return rt; } @@ -1107,18 +1191,24 @@ static route_table_t* uta_rt_init( ) { Space is the space in the old table to copy. Space 0 uses an integer key and references rte structs. All other spaces use a string key and reference endpoints. */ -static route_table_t* rt_clone_space( route_table_t* srt, route_table_t* nrt, int space ) { - endpoint_t* ep; // an endpoint - rtable_ent_t* rte; // a route table entry +static route_table_t* rt_clone_space( uta_ctx_t* ctx, route_table_t* srt, route_table_t* nrt, int space ) { + endpoint_t* ep; // an endpoint (ignore sonar complaint about const*) + rtable_ent_t* rte; // a route table entry (ignore sonar complaint about const*) void* sst; // source symtab void* nst; // new symtab thing_list_t things; // things from the space to copy int i; int free_on_err = 0; + if( ctx == NULL ) { + return NULL; + } if( nrt == NULL ) { // make a new table if needed free_on_err = 1; - nrt = uta_rt_init(); + nrt = uta_rt_init( ctx ); + if( nrt == NULL ) { + return NULL; + } } if( srt == NULL ) { // source was nil, just give back the new table @@ -1129,15 +1219,20 @@ static route_table_t* rt_clone_space( route_table_t* srt, route_table_t* nrt, in things.nused = 0; things.things = (void **) malloc( sizeof( void * ) * things.nalloc ); things.names = (const char **) malloc( sizeof( char * ) * things.nalloc ); - if( things.things == NULL ) { + if( things.things == NULL || things.names == NULL ){ + if( things.things != NULL) { free( things.things ); } + if( things.names != NULL) { free( things.names ); } + if( free_on_err ) { - free( nrt->hash ); + rmr_sym_free( nrt->hash ); free( nrt ); nrt = NULL; } return nrt; } + memset( things.things, 0, sizeof( sizeof( void * ) * things.nalloc ) ); + memset( things.names, 0, sizeof( char * ) * things.nalloc ); sst = srt->hash; // convenience pointers (src symtab) nst = nrt->hash; @@ -1162,62 +1257,92 @@ static route_table_t* rt_clone_space( route_table_t* srt, route_table_t* nrt, in } /* - Creates a new route table and then clones the parts of the table which we must keep with each newrt|start. - The endpoint and meid entries in the hash must be preserved. + Given a destination route table (drt), clone from the source (srt) into it. + If drt is nil, alloc a new one. If srt is nil, then nothing is done (except to + allocate the drt if that was nil too). If all is true (1), then we will clone both + the MT and the ME spaces; otherwise only the ME space is cloned. */ -static route_table_t* uta_rt_clone( route_table_t* srt ) { +static route_table_t* uta_rt_clone( uta_ctx_t* ctx, route_table_t* srt, route_table_t* drt, int all ) { endpoint_t* ep; // an endpoint rtable_ent_t* rte; // a route table entry - route_table_t* nrt = NULL; // new route table int i; + if( ctx == NULL ) { + return NULL; + } + if( drt == NULL ) { + drt = uta_rt_init( ctx ); + } if( srt == NULL ) { - return uta_rt_init(); // no source to clone, just return an empty table + return drt; } - nrt = rt_clone_space( srt, nrt, RT_NAME_SPACE ); // allocate a new one, add endpoint refs - rt_clone_space( srt, nrt, RT_ME_SPACE ); // add meid refs to new + drt->ephash = ctx->ephash; // all rts reference the same EP symtab + rt_clone_space( ctx, srt, drt, RT_ME_SPACE ); + if( all ) { + rt_clone_space( ctx, srt, drt, RT_MT_SPACE ); + } - return nrt; + return drt; } /* - Creates a new route table and then clones _all_ of the given route table (references - both endpoints AND the route table entries. Needed to support a partial update where - some route table entries will not be deleted if not explicitly in the update and when - we are adding/replacing meid references. + Prepares the "new" route table for populating. If the old_rtable is not nil, then + we wait for it's use count to reach 0. Then the table is cleared, and moved on the + context to be referenced by the new pointer; the old pointer is set to nil. + + If the old table doesn't exist, then a new table is created and the new pointer is + set to reference it. */ -static route_table_t* uta_rt_clone_all( route_table_t* srt ) { - endpoint_t* ep; // an endpoint - rtable_ent_t* rte; // a route table entry - route_table_t* nrt = NULL; // new route table - int i; +static route_table_t* prep_new_rt( uta_ctx_t* ctx, int all ) { + int counter = 0; + route_table_t* rt; - if( srt == NULL ) { - return uta_rt_init(); // no source to clone, just return an empty table + if( ctx == NULL ) { + return NULL; } - nrt = rt_clone_space( srt, nrt, RT_MT_SPACE ); // create new, clone all spaces to it - rt_clone_space( srt, nrt, RT_NAME_SPACE ); - rt_clone_space( srt, nrt, RT_ME_SPACE ); + if( (rt = ctx->old_rtable) != NULL ) { + ctx->old_rtable = NULL; + while( rt->ref_count > 0 ) { // wait for all who are using to stop + if( counter++ > 1000 ) { + rmr_vlog( RMR_VL_WARN, "rt_prep_newrt: internal mishap, ref count on table seems wedged" ); + break; + } - return nrt; + usleep( 1000 ); // small sleep to yield the processer if that is needed + } + + if( rt->hash != NULL ) { + rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL ); // deref and drop if needed + rmr_sym_clear( rt->hash ); // clear all entries from the old table + } + } else { + rt = NULL; + } + + rt = uta_rt_clone( ctx, ctx->rtable, rt, all ); // also sets the ephash pointer + rt->ref_count = 0; // take no chances; ensure it's 0! + + return rt; } + /* Given a name, find the endpoint struct in the provided route table. */ static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) { - if( rt == NULL || rt->hash == NULL || ep_name == NULL || *ep_name == 0 ) { + if( rt == NULL || rt->ephash == NULL || ep_name == NULL || *ep_name == 0 ) { return NULL; } - return rmr_sym_get( rt->hash, ep_name, 1 ); + return rmr_sym_get( rt->ephash, ep_name, 1 ); } /* Drop the given route table. Purge all type 0 entries, then drop the symtab itself. + Does NOT destroy the gate as it's a common gate for ALL route tables. */ static void uta_rt_drop( route_table_t* rt ) { if( rt == NULL ) { @@ -1243,7 +1368,7 @@ static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) { return NULL; } - if( (ep = uta_get_ep( rt, ep_name )) == NULL ) { // not there yet, make + if( (ep = uta_get_ep( rt, ep_name )) == NULL ) { // not there yet, make if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) { rmr_vlog( RMR_VL_WARN, "rt_ensure: malloc failed for endpoint creation: %s\n", ep_name ); errno = ENOMEM; @@ -1257,7 +1382,7 @@ static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) { pthread_mutex_init( &ep->gate, NULL ); // init with default attrs memset( &ep->scounts[0], 0, sizeof( ep->scounts ) ); - rmr_sym_put( rt->hash, ep_name, 1, ep ); + rmr_sym_put( rt->ephash, ep_name, 1, ep ); } return ep; @@ -1284,14 +1409,60 @@ static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) { Given a route table and meid string, find the owner (if known). Returns a pointer to the endpoint struct or nil. */ -static inline endpoint_t* get_meid_owner( route_table_t *rt, char* meid ) { - endpoint_t* ep; // the ep we found in the hash +static inline endpoint_t* get_meid_owner( route_table_t *rt, char const* meid ) { + endpoint_t const* ep; // the ep we found in the hash if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) { return NULL; } - return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE ); + return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE ); +} + +/* + This returns a pointer to the currently active route table and ups + the reference count so that the route table is not freed while it + is being used. The caller MUST call release_rt() when finished + with the pointer. + + Care must be taken: the ctx->rtable pointer _could_ change during the time + between the release of the lock and the return. Therefore we MUST grab + the current pointer when we have the lock so that if it does we don't + return a pointer to the wrong table. + + This will return NULL if there is no active table. +*/ +static inline route_table_t* get_rt( uta_ctx_t* ctx ) { + route_table_t* rrt; // return value + + if( ctx == NULL || ctx->rtable == NULL ) { + return NULL; + } + + pthread_mutex_lock( ctx->rtgate ); // must hold lock to bump use + rrt = ctx->rtable; // must stash the pointer while we hold lock + rrt->ref_count++; + pthread_mutex_unlock( ctx->rtgate ); + + return rrt; // pointer we upped the count with } +/* + This will "release" the route table by reducing the use counter + in the table. The table may not be freed until the counter reaches + 0, so it's imparative that the pointer be "released" when it is + fetched by get_rt(). Once the caller has released the table it + may not safely use the pointer that it had. +*/ +static inline void release_rt( uta_ctx_t* ctx, route_table_t* rt ) { + if( ctx == NULL || rt == NULL ) { + return; + } + + pthread_mutex_lock( ctx->rtgate ); // must hold lock + if( rt->ref_count > 0 ) { // something smells if it's already 0, don't do antyhing if it is + rt->ref_count--; + } + pthread_mutex_unlock( ctx->rtgate ); +} #endif