X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fcommon%2Fsrc%2Frt_generic_static.c;h=c196a9adceb32e0c90379704295886523f3be540;hb=9c923bcc9322c22220b574671c7b46f10008c614;hp=5a2b1fa9a56ee67cd096218bbb02740991ebd850;hpb=0b79fc264eea2591ad6f645d0c90cc378ea5603b;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/common/src/rt_generic_static.c b/src/rmr/common/src/rt_generic_static.c index 5a2b1fa..c196a9a 100644 --- a/src/rmr/common/src/rt_generic_static.c +++ b/src/rmr/common/src/rt_generic_static.c @@ -1,8 +1,8 @@ // :vi sw=4 ts=4 noet: /* ================================================================================== - Copyright (c) 2019 Nokia - Copyright (c) 2018-2019 AT&T Intellectual Property. + Copyright (c) 2019-2020 Nokia + Copyright (c) 2018-2020 AT&T Intellectual Property. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -46,6 +46,8 @@ #include #include +#include // needed for route manager messages + /* Passed to a symtab foreach callback to construct a list of pointers from @@ -76,7 +78,7 @@ static void ep_stats( void* st, void* entry, char const* name, void* thing, void (*counter)++; } - fprintf( stderr, "[DBUG] RMR rt endpoint: target=%s open=%d\n", ep->name, ep->open ); + rmr_vlog_force( RMR_VL_DEBUG, "rt endpoint: target=%s open=%d\n", ep->name, ep->open ); } /* @@ -95,7 +97,7 @@ static void meid_stats( void* st, void* entry, char const* name, void* thing, vo (*counter)++; } - fprintf( stderr, "[DBUG] RMR meid=%s owner=%s open=%d\n", name, ep->name, ep->open ); + rmr_vlog_force( RMR_VL_DEBUG, "meid=%s owner=%s open=%d\n", name, ep->name, ep->open ); } /* @@ -114,7 +116,7 @@ static void ep_counts( void* st, void* entry, char const* name, void* thing, voi id = "missing"; } - fprintf( stderr, "[INFO] RMR sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n", + rmr_vlog_force( RMR_VL_INFO, "sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n", (long long) time( NULL ), id, ep->name, @@ -145,7 +147,7 @@ static void rte_stats( void* st, void* entry, char const* name, void* thing, voi mtype = rte->key & 0xffff; sid = (int) (rte->key >> 32); - fprintf( stderr, "[DBUG] RMR rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs ); + rmr_vlog_force( RMR_VL_DEBUG, "rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs ); } /* @@ -155,26 +157,26 @@ static void rt_stats( route_table_t* rt ) { int* counter; if( rt == NULL ) { - fprintf( stderr, "[DBUG] rtstats: nil table\n" ); + rmr_vlog_force( RMR_VL_DEBUG, "rtstats: nil table\n" ); return; } counter = (int *) malloc( sizeof( int ) ); *counter = 0; - fprintf( stderr, "[DBUG] RMR route table stats:\n" ); - fprintf( stderr, "[DBUG] RMR route table endpoints:\n" ); + rmr_vlog_force( RMR_VL_DEBUG, "route table stats:\n" ); + rmr_vlog_force( RMR_VL_DEBUG, "route table endpoints:\n" ); rmr_sym_foreach_class( rt->hash, RT_NAME_SPACE, ep_stats, counter ); // run endpoints (names) in the active table - fprintf( stderr, "[DBUG] RMR rtable: %d known endpoints\n", *counter ); + rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d known endpoints\n", *counter ); - fprintf( stderr, "[DBUG] RMR route table entries:\n" ); + rmr_vlog_force( RMR_VL_DEBUG, "route table entries:\n" ); *counter = 0; rmr_sym_foreach_class( rt->hash, RT_MT_SPACE, rte_stats, counter ); // run message type entries - fprintf( stderr, "[DBUG] RMR rtable: %d mt entries in table\n", *counter ); + rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d mt entries in table\n", *counter ); - fprintf( stderr, "[DBUG] RMR route table meid map:\n" ); + rmr_vlog_force( RMR_VL_DEBUG, "route table meid map:\n" ); *counter = 0; rmr_sym_foreach_class( rt->hash, RT_ME_SPACE, meid_stats, counter ); // run meid space - fprintf( stderr, "[DBUG] RMR rtable: %d meids in map\n", *counter ); + rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d meids in map\n", *counter ); free( counter ); } @@ -185,13 +187,110 @@ static void rt_stats( route_table_t* rt ) { */ static void rt_epcounts( route_table_t* rt, char* id ) { if( rt == NULL ) { - fprintf( stderr, "[INFO] RMR endpoint: no counts: empty table\n" ); + rmr_vlog_force( RMR_VL_INFO, "endpoint: no counts: empty table\n" ); return; } rmr_sym_foreach_class( rt->hash, 1, ep_counts, id ); // run endpoints in the active table } +// ------------ route manager communication ------------------------------------------------- +/* + Send a request for a table update to the route manager. Updates come in + async, so send and go. + + pctx is the private context for the thread; ctx is the application context + that we need to be able to send the application ID in case rt mgr needs to + use it to idenfity us. + + Returns 0 if we were not able to send a request. +*/ +static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) { + rmr_mbuf_t* smsg; + int state = 0; + + if( ctx->rtg_whid < 0 ) { + return state; + } + + smsg = rmr_alloc_msg( pctx, 1024 ); + if( smsg != NULL ) { + smsg->mtype = RMRRM_REQ_TABLE; + smsg->sub_id = 0; + snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, (long) time( NULL ) ); + rmr_vlog( RMR_VL_INFO, "rmr_rtc: requesting table: (%s) whid=%d\n", smsg->payload, ctx->rtg_whid ); + smsg->len = strlen( smsg->payload ) + 1; + + smsg = rmr_wh_send_msg( pctx, ctx->rtg_whid, smsg ); + if( (state = smsg->state) != RMR_OK ) { + rmr_vlog( RMR_VL_INFO, "rmr_rtc: send failed: %d whid=%d\n", smsg->state, ctx->rtg_whid ); + rmr_wh_close( ctx, ctx->rtg_whid ); // send failed, assume connection lost + ctx->rtg_whid = -1; + } + + rmr_free_msg( smsg ); + } + + return state; +} + +/* + Send an ack to the route table manager for a table ID that we are + processing. State is 1 for OK, and 0 for failed. Reason might + be populated if we know why there was a failure. + + Context should be the PRIVATE context that we use for messages + to route manger and NOT the user's context. + + If a message buffere is passed we use that and use return to sender + assuming that this might be a response to a call and that is needed + to send back to the proper calling thread. If msg is nil, we allocate + and use it. +*/ +static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int state, char* reason ) { + int use_rts = 1; + int payload_size = 1024; + + if( ctx == NULL || ctx->rtg_whid < 0 ) { + return; + } + + if( ctx->flags & CFL_NO_RTACK ) { // don't ack if reading from file etc + return; + } + + if( smsg != NULL ) { + smsg = rmr_realloc_payload( smsg, payload_size, FALSE, FALSE ); // ensure it's large enough to send a response + } else { + use_rts = 0; + smsg = rmr_alloc_msg( ctx, payload_size ); + } + + if( smsg != NULL ) { + smsg->mtype = RMRRM_TABLE_STATE; + smsg->sub_id = -1; + snprintf( smsg->payload, payload_size-1, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR", + table_id == NULL ? "" : table_id, reason == NULL ? "" : reason ); + + smsg->len = strlen( smsg->payload ) + 1; + + rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d\n", smsg->payload, state, ctx->rtg_whid ); + if( use_rts ) { + smsg = rmr_rts_msg( ctx, smsg ); + } else { + smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg ); + } + if( (state = smsg->state) != RMR_OK ) { + rmr_vlog( RMR_VL_WARN, "unable to send table state: %d\n", smsg->state ); + rmr_wh_close( ctx, ctx->rtg_whid ); // send failed, assume connection lost + ctx->rtg_whid = -1; + } + + if( ! use_rts ) { + rmr_free_msg( smsg ); // if not our message we must free the leftovers + } + } +} // ------------------------------------------------------------------------------------------------ /* @@ -199,7 +298,7 @@ static void rt_epcounts( route_table_t* rt, char* id ) { must be at the start of a word (i.e. must be immediatly preceeded by whitespace). */ static char* clip( char* buf ) { - char* tok; + char* tok; while( *buf && isspace( *buf ) ) { // skip leading whitespace buf++; @@ -240,7 +339,7 @@ static char* ensure_nlterm( char* buf ) { } } else { if( buf[len-1] != '\n' ) { - fprintf( stderr, "[WRN] rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" ); + rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" ); if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) { memcpy( nb, buf, len ); *(nb+len) = '\n'; // insert \n and nil into the two extra bytes we allocated @@ -268,7 +367,7 @@ static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups } if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) { - fprintf( stderr, "[ERR] rmr_add_rte: malloc failed for entry\n" ); + rmr_vlog( RMR_VL_ERR, "rmr_add_rte: malloc failed for entry\n" ); return NULL; } memset( rte, 0, sizeof( *rte ) ); @@ -281,7 +380,6 @@ static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups if( nrrgroups ) { if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) { - fprintf( stderr, "rmr_add_rte: malloc failed for rrgroup array\n" ); free( rte ); return NULL; } @@ -298,7 +396,7 @@ static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups rmr_sym_map( rt->hash, key, rte ); // add to hash using numeric mtype as key - if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups ); return rte; } @@ -322,8 +420,8 @@ static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* r char* tok; int ntoks; uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype - char* tokens[128]; - char* gtokens[64]; + char* tokens[128]; + char* gtokens[64]; int i; int ngtoks; // number of tokens in the group list int grp; // index into group list @@ -331,25 +429,26 @@ static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* r ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments rr_field = clip( rr_field ); - if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all) + if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all) (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses key = build_rt_key( subid, atoi( ts_field ) ); - if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key ); + if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key ); if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) { // split round robin groups if( strcmp( gtokens[0], "%meid" ) == 0 ) { ngtoks = 0; // special indicator that uses meid to find endpoint, no rrobin } rte = uta_add_rte( ctx->new_rtable, key, ngtoks ); // get/create entry for this key + rte->mtype = atoi( ts_field ); // capture mtype for debugging for( grp = 0; grp < ngtoks; grp++ ) { if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) { // remove any referneces to our ip addrs for( i = 0; i < ntoks; i++ ) { if( strcmp( tokens[i], ctx->my_name ) != 0 ) { // don't add if it is us -- cannot send to ourself - if( DEBUG > 1 || (vlevel > 1)) fprintf( stderr, "[DBUG] add endpoint ts=%s %s\n", ts_field, tokens[i] ); + if( DEBUG > 1 || (vlevel > 1)) rmr_vlog_force( RMR_VL_DEBUG, "add endpoint ts=%s %s\n", ts_field, tokens[i] ); uta_add_ep( ctx->new_rtable, rte, tokens[i], grp ); } } @@ -358,7 +457,7 @@ static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* r } } else { if( DEBUG || (vlevel > 2) ) { - fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] ); + rmr_vlog_force( RMR_VL_DEBUG, "entry not included, sender not matched: %s\n", tokens[1] ); } } } @@ -375,7 +474,7 @@ static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vle char* tok; int ntoks; uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype - char* tokens[128]; + char* tokens[128]; if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) { return; @@ -383,7 +482,7 @@ static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vle ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments - if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all) + if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all) (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses @@ -391,17 +490,17 @@ static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vle rte = rmr_sym_pull( ctx->new_rtable->hash, key ); // get it if( rte != NULL ) { if( DEBUG || (vlevel > 1) ) { - fprintf( stderr, "[DBUG] delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key ); + rmr_vlog_force( RMR_VL_DEBUG, "delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key ); } rmr_sym_ndel( ctx->new_rtable->hash, key ); // clear from the new table del_rte( NULL, NULL, NULL, rte, NULL ); // clean up the memory: reduce ref and free if ref == 0 } else { if( DEBUG || (vlevel > 1) ) { - fprintf( stderr, "[DBUG] delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key ); + rmr_vlog_force( RMR_VL_DEBUG, "delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key ); } } } else { - if( DEBUG ) fprintf( stderr, "[DBUG] delete rte skipped: %s\n", ts_field ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "delete rte skipped: %s\n", ts_field ); } } @@ -419,7 +518,7 @@ static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vle static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, int vlevel ) { char* tok; int ntoks; - char* tokens[128]; + char* tokens[128]; int i; int state; endpoint_t* ep; // endpoint struct for the owner @@ -431,10 +530,9 @@ static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, in for( i = 0; i < ntoks; i++ ) { if( (ep = rt_ensure_ep( rtab, owner )) != NULL ) { state = rmr_sym_put( rtab->hash, tokens[i], RT_ME_SPACE, ep ); // slam this one in if new; replace if there - if( DEBUG || (vlevel > 1) ) fprintf( stderr, "[DBUG] parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state ); -fprintf( stderr, "[DBUG] parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state ); + if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state ); } else { - fprintf( stderr, "[WRN] rmr parse_meid_ar: unable to create an endpoint for owner: %s", owner ); + rmr_vlog( RMR_VL_WARN, "rmr parse_meid_ar: unable to create an endpoint for owner: %s", owner ); } } } @@ -451,7 +549,7 @@ fprintf( stderr, "[DBUG] parse_meid_ar: add/replace meid: %s owned by: %s state= static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) { char* tok; int ntoks; - char* tokens[128]; + char* tokens[128]; int i; if( rtab->hash == NULL ) { @@ -463,21 +561,26 @@ static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) { ntoks = uta_tokenise( meid_list, tokens, 128, ' ' ); for( i = 0; i < ntoks; i++ ) { rmr_sym_del( rtab->hash, tokens[i], RT_ME_SPACE ); // and it only took my little finger to blow it away! - if( DEBUG || (vlevel > 1) ) fprintf( stderr, "[DBUG] parse_meid_del: meid deleted: %s\n", tokens[i] ); + if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_del: meid deleted: %s\n", tokens[i] ); } } /* Parse a partially parsed meid record. Tokens[0] should be one of: meid_map, mme_ar, mme_del. + + pctx is the private context needed to return an ack/nack using the provided + message buffer with the route managers address info. */ -static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel ) { +static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char** tokens, int ntoks, int vlevel ) { + char wbuf[1024]; + if( tokens == NULL || ntoks < 1 ) { return; // silent but should never happen } if( ntoks < 2 ) { // must have at least two for any valid request record - fprintf( stderr, "[ERR] meid_parse: not enough tokens on %s record\n", tokens[0] ); + rmr_vlog( RMR_VL_ERR, "meid_parse: not enough tokens on %s record\n", tokens[0] ); return; } @@ -485,24 +588,35 @@ static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel ) tokens[1] = clip( tokens[1] ); if( *(tokens[1]) == 's' ) { if( ctx->new_rtable != NULL ) { // one in progress? this forces it out - if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] meid map start: dropping incomplete table\n" ); + if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid map start: dropping incomplete table\n" ); uta_rt_drop( ctx->new_rtable ); + send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as end never made it } + if( ctx->table_id != NULL ) { + free( ctx->table_id ); + } + if( ntoks >2 ) { + ctx->table_id = strdup( clip( tokens[2] ) ); + } else { + ctx->table_id = NULL; + } ctx->new_rtable = uta_rt_clone_all( ctx->rtable ); // start with a clone of everything (mtype, endpoint refs and meid) ctx->new_rtable->mupdates = 0; - if( DEBUG || (vlevel > 1) ) fprintf( stderr, "[DBUG] meid_parse: meid map start found\n" ); + if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid_parse: meid map start found\n" ); } else { if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building if( ntoks > 2 ) { // meid_map | end | |??? given if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) { // count they added didn't match what we received - fprintf( stderr, "[ERR] meid_parse: meid map update had wrong number of records: received %d expected %s\n", ctx->new_rtable->mupdates, tokens[2] ); + rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n", ctx->new_rtable->mupdates, tokens[2] ); + snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates ); + send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf ); uta_rt_drop( ctx->new_rtable ); ctx->new_rtable = NULL; return; } - if( DEBUG ) fprintf( stderr, "[DBUG] meid_parse: meid map update ended; found expected number of entries: %s\n", tokens[2] ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid_parse: meid map update ended; found expected number of entries: %s\n", tokens[2] ); } if( ctx->new_rtable ) { @@ -510,32 +624,33 @@ static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel ) ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain' ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active ctx->new_rtable = NULL; - if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of meid map noticed\n" ); + if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of meid map noticed\n" ); + send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL ); if( vlevel > 0 ) { - fprintf( stderr, "[DBUG] old route table:\n" ); + rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" ); rt_stats( ctx->old_rtable ); - fprintf( stderr, "[DBUG] new route table:\n" ); + rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" ); rt_stats( ctx->rtable ); } } else { - if( DEBUG ) fprintf( stderr, "[DBUG] end of meid map noticed, but one was not started!\n" ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "end of meid map noticed, but one was not started!\n" ); ctx->new_rtable = NULL; } } } return; - } + } - if( ! ctx->new_rtable ) { // for any other mmap entries, there must be a table in progress or we punt - if( DEBUG ) fprintf( stderr, "[DBUG] meid update/delte (%s) encountered, but table update not started\n", tokens[0] ); + if( ! ctx->new_rtable ) { // for any other mmap entries, there must be a table in progress or we punt + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid update/delte (%s) encountered, but table update not started\n", tokens[0] ); return; } if( strcmp( tokens[0], "mme_ar" ) == 0 ) { if( ntoks < 3 || tokens[1] == NULL || tokens[2] == NULL ) { - fprintf( stderr, "[ERR] meid_parse: mme_ar record didn't have enough tokens found %d\n", ntoks ); + rmr_vlog( RMR_VL_ERR, "meid_parse: mme_ar record didn't have enough tokens found %d\n", ntoks ); return; } parse_meid_ar( ctx->new_rtable, tokens[1], tokens[2], vlevel ); @@ -544,7 +659,7 @@ static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel ) if( strcmp( tokens[0], "mme_del" ) == 0 ) { if( ntoks < 2 ) { - fprintf( stderr, "[ERR] meid_parse: mme_del record didn't have enough tokens\n" ); + rmr_vlog( RMR_VL_ERR, "meid_parse: mme_del record didn't have enough tokens\n" ); return; } parse_meid_del( ctx->new_rtable, tokens[1], vlevel ); @@ -574,20 +689,32 @@ static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel ) For a RT update, we expect: - newrt|{start|end [hold]} + newrt | start | + newrt | end | rte|[,sender]|[;,...] mse|[,sender]||[;,...] mse| [,sender] | | %meid For a meid map update we expect: - meid_map | start + meid_map | start | meid_map | end | | mme_ar | | ... mme_del | ... + + The pctx is our private context that must be used to send acks/status + messages back to the route manager. The regular ctx is the ctx that + the user has been given and thus that's where we have to hang the route + table we're working with. + + If mbuf is given, and we need to ack, then we ack using the mbuf and a + return to sender call (allows route manager to use wh_call() to send + an update and rts is required to get that back to the right thread). + If mbuf is nil, then one will be allocated (in ack) and a normal wh_send + will be used. */ -static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) { +static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf ) { int i; int ntoks; // number of tokens found in something int ngtoks; @@ -595,6 +722,7 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) { rtable_ent_t* rte; // route table entry added char* tokens[128]; char* tok; // pointer into a token or string + char wbuf[1024]; if( ! buf ) { return; @@ -606,6 +734,7 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) { for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too *(tok+1) = 0; + memset( tokens, 0, sizeof( tokens ) ); if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) { tokens[0] = clip( tokens[0] ); switch( *(tokens[0]) ) { @@ -620,7 +749,7 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) { } if( ntoks < 3 ) { - if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks ); + if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks ); break; } @@ -631,50 +760,77 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) { case 'n': // newrt|{start|end} tokens[1] = clip( tokens[1] ); if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building + if( ntoks >2 ) { + if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received + rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n", + ctx->new_rtable->updates, tokens[2] ); + snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates ); + send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf ); + uta_rt_drop( ctx->new_rtable ); + ctx->new_rtable = NULL; + break; + } + } + if( ctx->new_rtable ) { uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain' ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active ctx->new_rtable = NULL; - if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of route table noticed\n" ); + if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" ); if( vlevel > 0 ) { - fprintf( stderr, "[DBUG] old route table:\n" ); + rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" ); rt_stats( ctx->old_rtable ); - fprintf( stderr, "[DBUG] new route table:\n" ); + rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" ); rt_stats( ctx->rtable ); } + + send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL ); + ctx->rtable_ready = 1; // route based sends can now happen } else { - if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of route table noticed, but one was not started!\n" ); + if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" ); ctx->new_rtable = NULL; } - } else { // start a new table. - if( ctx->new_rtable != NULL ) { // one in progress? this forces it out - if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" ); + } else { // start a new table. + if( ctx->new_rtable != NULL ) { // one in progress? this forces it out + send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as end never made it + + if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" ); uta_rt_drop( ctx->new_rtable ); } + if( ctx->table_id != NULL ) { + free( ctx->table_id ); + } + if( ntoks >2 ) { + ctx->table_id = strdup( clip( tokens[2] ) ); + } else { + ctx->table_id = NULL; + } + ctx->new_rtable = NULL; ctx->new_rtable = uta_rt_clone( ctx->rtable ); // create by cloning endpoint and meidtentries from active table - if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] start of route table noticed\n" ); + ctx->new_rtable->updates = 0; // init count of entries received + if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "start of route table noticed\n" ); } break; - case 'm': // mse entry or one of the meid_ records + case 'm': // mse entry or one of the meid_ records if( strcmp( tokens[0], "mse" ) == 0 ) { if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently break; } if( ntoks < 4 ) { - if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks ); + if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks ); break; } build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel ); ctx->new_rtable->updates++; } else { - meid_parser( ctx, tokens, ntoks, vlevel ); + meid_parser( ctx, pctx, mbuf, tokens, ntoks, vlevel ); } break; @@ -700,7 +856,7 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) { if( ntoks >2 ) { if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received - fprintf( stderr, "[ERR] rmr_rtc: RT update had wrong number of records: received %d expected %s\n", + rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n", ctx->new_rtable->updates, tokens[2] ); uta_rt_drop( ctx->new_rtable ); ctx->new_rtable = NULL; @@ -713,32 +869,39 @@ static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) { ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain' ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active ctx->new_rtable = NULL; - if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of rt update noticed\n" ); + if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" ); if( vlevel > 0 ) { - fprintf( stderr, "[DBUG] old route table:\n" ); + rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" ); rt_stats( ctx->old_rtable ); - fprintf( stderr, "[DBUG] updated route table:\n" ); + rmr_vlog_force( RMR_VL_DEBUG, "updated route table:\n" ); rt_stats( ctx->rtable ); } } else { - if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of rt update noticed, but one was not started!\n" ); + if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed, but one was not started!\n" ); ctx->new_rtable = NULL; } } else { // start a new table. if( ctx->new_rtable != NULL ) { // one in progress? this forces it out - if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" ); + if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" ); uta_rt_drop( ctx->new_rtable ); } + if( ntoks >2 ) { + if( ctx->table_id != NULL ) { + free( ctx->table_id ); + } + ctx->table_id = strdup( clip( tokens[2] ) ); + } + ctx->new_rtable = uta_rt_clone_all( ctx->rtable ); // start with a clone of everything (endpts and entries) ctx->new_rtable->updates = 0; // init count of updates received - if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] start of rt update noticed\n" ); + if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "start of rt update noticed\n" ); } break; default: - if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: unrecognised request: %s\n", tokens[0] ); + if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: unrecognised request: %s\n", tokens[0] ); break; } } @@ -766,32 +929,35 @@ static void read_static_rt( uta_ctx_t* ctx, int vlevel ) { } if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) { // read file into a single buffer (nil terminated string) - fprintf( stderr, "[WRN] rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) ); + rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) ); return; } - if( DEBUG ) fprintf( stderr, "[DBUG] rmr: seed route table successfully opened: %s\n", fname ); + if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "seed route table successfully opened: %s\n", fname ); for( eor = fbuf; *eor; eor++ ) { // fix broken systems that use \r or \r\n to terminate records if( *eor == '\r' ) { *eor = '\n'; // will look like a blank line which is ok } } - for( rec = fbuf; rec && *rec; rec = eor+1 ) { + rec = fbuf; + while( rec && *rec ) { rcount++; if( (eor = strchr( rec, '\n' )) != NULL ) { *eor = 0; } else { - fprintf( stderr, "[WRN] rmr read_static: seed route table had malformed records (missing newline): %s\n", fname ); - fprintf( stderr, "[WRN] rmr read_static: seed route table not used: %s\n", fname ); + rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table had malformed records (missing newline): %s\n", fname ); + rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table not used: %s\n", fname ); free( fbuf ); return; } - parse_rt_rec( ctx, rec, vlevel ); + parse_rt_rec( ctx, NULL, rec, vlevel, NULL ); // no pvt context as we can't ack + + rec = eor+1; } - if( DEBUG ) fprintf( stderr, "[DBUG] rmr_read_static: seed route table successfully parsed: %d records\n", rcount ); + if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_read_static: seed route table successfully parsed: %d records\n", rcount ); free( fbuf ); } @@ -883,7 +1049,7 @@ static char* uta_fib( char* fname ) { fsize = stats.st_size; // stat ok, save the file size } } else { - fsize = 8192; // stat failed, we'll leave the file open and try to read a default max of 8k + fsize = 8192; // stat failed, we'll leave the file open and try to read a default max of 8k } } @@ -927,6 +1093,7 @@ static route_table_t* uta_rt_init( ) { if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) { return NULL; } + memset( rt, 0, sizeof( *rt ) ); if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) { free( rt ); @@ -963,7 +1130,9 @@ static route_table_t* rt_clone_space( route_table_t* srt, route_table_t* nrt, in things.nalloc = 2048; things.nused = 0; things.things = (void **) malloc( sizeof( void * ) * things.nalloc ); + memset( things.things, 0, sizeof( sizeof( void * ) * things.nalloc ) ); things.names = (const char **) malloc( sizeof( char * ) * things.nalloc ); + memset( things.names, 0, sizeof( char * ) * things.nalloc ); if( things.things == NULL ) { if( free_on_err ) { free( nrt->hash ); @@ -979,7 +1148,7 @@ static route_table_t* rt_clone_space( route_table_t* srt, route_table_t* nrt, in rmr_sym_foreach_class( sst, space, collect_things, &things ); // collect things from this space - if( DEBUG ) fprintf( stderr, "[DBUG] clone space cloned %d things in space %d\n", things.nused, space ); + if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "clone space cloned %d things in space %d\n", things.nused, space ); for( i = 0; i < things.nused; i++ ) { if( space ) { // string key, epoint reference ep = (endpoint_t *) things.things[i]; @@ -1017,9 +1186,9 @@ static route_table_t* uta_rt_clone( route_table_t* srt ) { } /* - Creates a new route table and then clones _all_ of the given route table (references - both endpoints AND the route table entries. Needed to support a partial update where - some route table entries will not be deleted if not explicitly in the update and when + Creates a new route table and then clones _all_ of the given route table (references + both endpoints AND the route table entries. Needed to support a partial update where + some route table entries will not be deleted if not explicitly in the update and when we are adding/replacing meid references. */ static route_table_t* uta_rt_clone_all( route_table_t* srt ) { @@ -1073,18 +1242,19 @@ static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) { endpoint_t* ep; if( !rt || !ep_name || ! *ep_name ) { - fprintf( stderr, "[WRN] rmr: rt_ensure: internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name ); + rmr_vlog( RMR_VL_WARN, "rt_ensure: internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name ); errno = EINVAL; return NULL; } - if( (ep = uta_get_ep( rt, ep_name )) == NULL ) { // not there yet, make + if( (ep = uta_get_ep( rt, ep_name )) == NULL ) { // not there yet, make if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) { - fprintf( stderr, "[WRN] rmr: rt_ensure: malloc failed for endpoint creation: %s\n", ep_name ); + rmr_vlog( RMR_VL_WARN, "rt_ensure: malloc failed for endpoint creation: %s\n", ep_name ); errno = ENOMEM; return NULL; } + ep->notify = 1; // show notification on first connection failure ep->open = 0; // not connected ep->addr = uta_h2ip( ep_name ); ep->name = strdup( ep_name ); @@ -1114,5 +1284,18 @@ static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) { return key; } +/* + Given a route table and meid string, find the owner (if known). Returns a pointer to + the endpoint struct or nil. +*/ +static inline endpoint_t* get_meid_owner( route_table_t *rt, char* meid ) { + endpoint_t* ep; // the ep we found in the hash + + if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) { + return NULL; + } + + return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE ); +} #endif