X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Fcommon%2Fsrc%2Frt_generic_static.c;h=ef38b8a713d613ba5a9beb4f7a46a29c068e66b8;hb=refs%2Fchanges%2F51%2F151%2F1;hp=4dd93442e379d75c728061416f63deb83af500d5;hpb=8790bf0c4f4f08fd05853afa67e211112b344a42;p=ric-plt%2Flib%2Frmr.git diff --git a/src/common/src/rt_generic_static.c b/src/common/src/rt_generic_static.c index 4dd9344..ef38b8a 100644 --- a/src/common/src/rt_generic_static.c +++ b/src/common/src/rt_generic_static.c @@ -44,6 +44,7 @@ #include #include #include +#include /* @@ -56,14 +57,45 @@ typedef struct thing_list { void** things; } thing_list_t; +// ------------------------------------------------------------------------------------------------ + + +/* + Little diddy to trim whitespace and trailing comments. Like shell, trailing comments + must be at the start of a word (i.e. must be immediatly preceeded by whitespace). +*/ +static char* clip( char* buf ) { + char* tok; + + while( *buf && isspace( *buf ) ) { // skip leading whitespace + buf++; + } + + if( (tok = strchr( buf, '#' )) != NULL ) { + if( tok == buf ) { + return buf; // just push back; leading comment sym handled there + } + + if( isspace( *(tok-1) ) ) { + *tok = 0; + } + } + + for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too + *(tok+1) = 0; + + return buf; +} + /* Given a message type create a route table entry and add to the hash keyed on the message type. Once in the hash, endpoints can be added with uta_add_ep. Size is the number of group slots to allocate in the entry. */ -static rtable_ent_t* uta_add_rte( route_table_t* rt, int mtype, int nrrgroups ) { +static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) { rtable_ent_t* rte; + rtable_ent_t* old_rte; // entry which was already in the table for the key if( rt == NULL ) { return NULL; @@ -74,6 +106,7 @@ static rtable_ent_t* uta_add_rte( route_table_t* rt, int mtype, int nrrgroups ) return NULL; } memset( rte, 0, sizeof( *rte ) ); + rte->refs = 1; if( nrrgroups <= 0 ) { nrrgroups = 10; @@ -87,12 +120,71 @@ static rtable_ent_t* uta_add_rte( route_table_t* rt, int mtype, int nrrgroups ) memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups ); rte->nrrgroups = nrrgroups; - rmr_sym_map( rt->hash, mtype, rte ); // add to hash using numeric mtype as key + if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) { + del_rte( NULL, NULL, NULL, old_rte, NULL ); // dec the ref counter and trash if unreferenced + } + + rmr_sym_map( rt->hash, key, rte ); // add to hash using numeric mtype as key - if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: mt=%d groups=%d\n", mtype, nrrgroups ); + if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: k=%lu groups=%d\n", key, nrrgroups ); return rte; } +/* + This accepts partially parsed information from a record sent by route manager or read from + a file such that: + ts_field is the msg-type,sender field + subid is the integer subscription id + rr_field is the endpoint information for round robening message over + + If all goes well, this will add an RTE to the table under construction. + + The ts_field is checked to see if we should ingest this record. We ingest if one of + these is true: + there is no sender info (a generic entry for all) + there is sender and our host:port matches one of the senders + the sender info is an IP address that matches one of our IP addresses +*/ +static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) { + rtable_ent_t* rte; // route table entry added + char* tok; + int ntoks; + uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype + char* tokens[128]; + char* gtokens[64]; + int i; + int ngtoks; // number of tokens in the group list + int grp; // index into group list + + ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments + rr_field = clip( rr_field ); + + if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all) + (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list + has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses + + key = build_rt_key( subid, atoi( ts_field ) ); + + if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s subid=%d key=%lu\n", ts_field, subid, key ); + + if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) { // split round robin groups + rte = uta_add_rte( ctx->new_rtable, key, ngtoks ); // get/create entry for this key + + for( grp = 0; grp < ngtoks; grp++ ) { + if( (ntoks = uta_tokenise( gtokens[grp], tokens, 64, ',' )) > 0 ) { + for( i = 0; i < ntoks; i++ ) { + if( DEBUG > 1 || (vlevel > 1)) fprintf( stderr, "[DBUG] add endpoint %s\n", ts_field ); + uta_add_ep( ctx->new_rtable, rte, tokens[i], grp ); + } + } + } + } + } else { + if( DEBUG || (vlevel > 2) ) + fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] ); + } +} + /* Parse a single record recevied from the route table generator, or read from a static route table file. Start records cause a new table to @@ -100,6 +192,11 @@ static rtable_ent_t* uta_add_rte( route_table_t* rt, int mtype, int nrrgroups ) entry records are added to the currenly 'in progress' table, and an end record causes the in progress table to be finalised and the currently active table is replaced. + + We expect one of several types: + newrt|{start|end} + rte|[,sender]|[;,...] + mse|[,sender]||[;,...] */ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) { int i; @@ -129,6 +226,7 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) { break; case 'n': // newrt|{start|end} + tokens[1] = clip( tokens[1] ); if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building if( ctx->new_rtable ) { uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced @@ -155,33 +253,29 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) { } break; + case 'm': // assume mse entry + if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently + break; + } + + if( ntoks < 4 ) { + if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks ); + break; + } + + build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel ); + break; + case 'r': // assume rt entry if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently break; } - if( ((tok = strchr( tokens[1], ',' )) == NULL ) || // no sender names - (uta_has_str( tokens[1], ctx->my_name, ',', 127) >= 0) || // our name isn't in the list - has_myip( tokens[1], ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses - - if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s\n", tokens[1] ); - - if( (ngtoks = uta_tokenise( tokens[2], gtokens, 64, ';' )) > 0 ) { // split last field by groups first - rte = uta_add_rte( ctx->new_rtable, atoi( tokens[1] ), ngtoks ); // get/create entry for message type - for( grp = 0; grp < ngtoks; grp++ ) { - if( (ntoks = uta_tokenise( gtokens[grp], tokens, 64, ',' )) > 0 ) { - for( i = 0; i < ntoks; i++ ) { - if( DEBUG > 1 || (vlevel > 1)) fprintf( stderr, "[DBUG] add endpoint %s\n", tokens[i] ); - uta_add_ep( ctx->new_rtable, rte, tokens[i], grp ); - } - } - } - } + if( ntoks > 3 ) { // assume new entry with subid last + build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel ); } else { - if( DEBUG || (vlevel > 2) ) - fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] ); + build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel ); // old school entry has no sub id } - break; default: @@ -253,6 +347,18 @@ static void collect_things( void* st, void* entry, char const* name, void* thing Called to delete a route table entry struct. We delete the array of endpoint pointers, but NOT the endpoints referenced as those are referenced from multiple entries. + + Route table entries can be concurrently referenced by multiple symtabs, so + the actual delete happens only if decrementing the rte's ref count takes it + to 0. Thus, it is safe to call this function across a symtab when cleaning up + the symtab, or overlaying an entry. + + This function uses ONLY the pointer to the rte (thing) and ignores the other + information that symtab foreach function passes (st, entry, and data) which + means that it _can_ safetly be used outside of the foreach setting. If + the function is changed to depend on any of these three, then a stand-alone + rte_cleanup() function should be added and referenced by this, and refererences + to this outside of the foreach world should be changed. */ static void del_rte( void* st, void* entry, char const* name, void* thing, void* data ) { rtable_ent_t* rte; @@ -262,6 +368,11 @@ static void del_rte( void* st, void* entry, char const* name, void* thing, void* return; } + rte->refs--; + if( rte->refs > 0 ) { // something still referencing, so it lives + return; + } + if( rte->rrgroups ) { // clean up the round robin groups for( i = 0; i < rte->nrrgroups; i++ ) { if( rte->rrgroups[i] ) { @@ -364,7 +475,7 @@ static route_table_t* uta_rt_init( ) { static route_table_t* uta_rt_clone( route_table_t* srt ) { endpoint_t* ep; // an endpoint route_table_t* nrt; // new route table - route_table_t* art; // active route table + //route_table_t* art; // active route table void* sst; // source symtab void* nst; // new symtab thing_list_t things; @@ -406,6 +517,75 @@ static route_table_t* uta_rt_clone( route_table_t* srt ) { return nrt; } +/* + Clones _all_ of the given route table (references both endpoints AND the route table + entries. Needed to support a partial update where some route table entries will not + be deleted if not explicitly in the update. +*/ +static route_table_t* uta_rt_clone_all( route_table_t* srt ) { + endpoint_t* ep; // an endpoint + rtable_ent_t* rte; // a route table entry + route_table_t* nrt; // new route table + //route_table_t* art; // active route table + void* sst; // source symtab + void* nst; // new symtab + thing_list_t things0; // things from space 0 (table entries) + thing_list_t things1; // things from space 1 (end points) + int i; + + if( srt == NULL ) { + return NULL; + } + + if( (nrt = (route_table_t *) malloc( sizeof( *nrt ) )) == NULL ) { + return NULL; + } + + if( (nrt->hash = rmr_sym_alloc( 509 )) == NULL ) { // modest size, prime + free( nrt ); + return NULL; + } + + things0.nalloc = 2048; + things0.nused = 0; + things0.things = (void **) malloc( sizeof( void * ) * things0.nalloc ); + if( things0.things == NULL ) { + free( nrt->hash ); + free( nrt ); + return NULL; + } + + things1.nalloc = 2048; + things1.nused = 0; + things1.things = (void **) malloc( sizeof( void * ) * things1.nalloc ); + if( things1.things == NULL ) { + free( nrt->hash ); + free( nrt ); + return NULL; + } + + sst = srt->hash; // convenience pointers (src symtab) + nst = nrt->hash; + + rmr_sym_foreach_class( sst, 0, collect_things, &things0 ); // collect the rtes + rmr_sym_foreach_class( sst, 1, collect_things, &things1 ); // collect the named endpoints in the active table + + for( i = 0; i < things0.nused; i++ ) { + rte = (rtable_ent_t *) things0.things[i]; + rte->refs++; // rtes can be removed, so we track references + rmr_sym_map( nst, rte->key, rte ); // add to hash using numeric mtype/sub-id as key (default to space 0) + } + + for( i = 0; i < things1.nused; i++ ) { + ep = (endpoint_t *) things1.things[i]; + rmr_sym_put( nst, ep->name, 1, ep ); // slam this one into the new table + } + + free( things0.things ); + free( things1.things ); + return nrt; +} + /* Given a name, find the endpoint struct in the provided route table. */ @@ -463,4 +643,21 @@ static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) { } +/* + Given a session id and message type build a key that can be used to look up the rte in the route + table hash. Sub_id is expected to be -1 if there is no session id associated with the entry. +*/ +static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) { + uint64_t key; + + if( sub_id == UNSET_SUBID ) { + key = 0xffffffff00000000 | mtype; + } else { + key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff); + } + + return key; +} + + #endif