-// :vi sw=4 ts=4 noet:
+ // :vi sw=4 ts=4 noet2
/*
==================================================================================
- Copyright (c) 2019 Nokia
- Copyright (c) 2018-2019 AT&T Intellectual Property.
+ Copyright (c) 2019-2020 Nokia
+ Copyright (c) 2018-2020 AT&T Intellectual Property.
- Licensed under the Apache License, Version 2.0 (the "License");
+ Licensed under the Apache License, Version 2.0 (the "License") ;
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
#include <sys/stat.h>
#include <unistd.h>
#include <netdb.h>
+#include <pthread.h>
+#include <immintrin.h>
+#include <stdbool.h>
+#include <RIC_message_types.h> // needed for route manager messages
+
+#define ALL 1
+#define SOME 0
/*
Passed to a symtab foreach callback to construct a list of pointers from
a current symtab.
*/
typedef struct thing_list {
+ int error; // if a realloc failed, this will be set
int nalloc;
int nused;
void** things;
+ const char** names;
} thing_list_t;
// ---- debugging/testing -------------------------------------------------------------------------
/*
Dump some stats for an endpoint in the RT. This is generally called to
verify endpoints after a table load/change.
+
+ This is called by the for-each mechanism of the symtab and the prototype is
+ fixe; we don't really use some of the parms, but have dummy references to
+ keep sonar from complaining.
*/
static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
int* counter;
if( (counter = (int *) vcounter) != NULL ) {
(*counter)++;
+ } else {
+ rmr_vlog( RMR_VL_DEBUG, "ep_stas: nil counter %p %p %p", st, entry, name ); // dummy refs
+ return;
+ }
+
+ rmr_vlog_force( RMR_VL_DEBUG, "rt endpoint: target=%s open=%d\n", ep->name, ep->open );
+}
+
+/*
+ Called to count meid entries in the table. The meid points to an 'owning' endpoint
+ so we can list what we find
+
+ See note in ep_stats about dummy refs.
+*/
+static void meid_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
+ int* counter;
+ endpoint_t* ep;
+
+ if( (ep = (endpoint_t *) thing) == NULL ) {
+ return;
+ }
+
+ if( (counter = (int *) vcounter) != NULL ) {
+ (*counter)++;
+ } else {
+ rmr_vlog( RMR_VL_DEBUG, "meid_stas: nil counter %p %p %p", st, entry, name ); // dummy refs
}
- fprintf( stderr, "[DBUG] RMR sends: target=%s open=%d\n", ep->name, ep->open );
+ rmr_vlog_force( RMR_VL_DEBUG, "meid=%s owner=%s open=%d\n", name, ep->name, ep->open );
}
/*
Dump counts for an endpoint in the RT. The vid parm is assumed to point to
the 'source' information and is added to each message.
+
+ See note above about dummy references.
*/
static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
endpoint_t* ep;
char* id;
if( (ep = (endpoint_t *) thing) == NULL ) {
+ rmr_vlog( RMR_VL_DEBUG, "ep_counts: nil thing %p %p %p", st, entry, name ); // dummy refs
return;
}
id = "missing";
}
- fprintf( stderr, "[INFO] RMR sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n",
- (long long) time( NULL ),
- id,
- ep->name,
- ep->open,
- ep->scounts[EPSC_GOOD],
- ep->scounts[EPSC_FAIL] + ep->scounts[EPSC_TRANS],
- ep->scounts[EPSC_FAIL],
+ rmr_vlog_force( RMR_VL_INFO, "sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n",
+ (long long) time( NULL ),
+ id,
+ ep->name,
+ ep->open,
+ ep->scounts[EPSC_GOOD],
+ ep->scounts[EPSC_FAIL] + ep->scounts[EPSC_TRANS],
+ ep->scounts[EPSC_FAIL],
ep->scounts[EPSC_TRANS] );
}
*/
static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
int* counter;
- rtable_ent_t* rte; // thing is really an rte
+ rtable_ent_t const* rte; // thing is really an rte
int mtype;
int sid;
if( (rte = (rtable_ent_t *) thing) == NULL ) {
+ rmr_vlog( RMR_VL_DEBUG, "rte_stats: nil thing %p %p %p", st, entry, name ); // dummy refs
return;
}
mtype = rte->key & 0xffff;
sid = (int) (rte->key >> 32);
- fprintf( stderr, "[DBUG] rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
+ rmr_vlog_force( RMR_VL_DEBUG, "rte: key=%016lx mtype=%4d sid=%4d nrrg=%2d refs=%d\n", rte->key, mtype, sid, rte->nrrgroups, rte->refs );
}
/*
int* counter;
if( rt == NULL ) {
- fprintf( stderr, "[DBUG] rtstats: nil table\n" );
+ rmr_vlog_force( RMR_VL_DEBUG, "rtstats: nil table\n" );
return;
}
counter = (int *) malloc( sizeof( int ) );
*counter = 0;
- fprintf( stderr, "[DBUG] rtstats:\n" );
- rmr_sym_foreach_class( rt->hash, 1, ep_stats, counter ); // run endpoints in the active table
- fprintf( stderr, "[DBUG] %d endpoints\n", *counter );
+ rmr_vlog_force( RMR_VL_DEBUG, "route table stats:\n" );
+ rmr_vlog_force( RMR_VL_DEBUG, "route table endpoints:\n" );
+ rmr_sym_foreach_class( rt->ephash, RT_NAME_SPACE, ep_stats, counter ); // run endpoints (names) in the active table
+ rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d known endpoints\n", *counter );
+
+ rmr_vlog_force( RMR_VL_DEBUG, "route table entries:\n" );
+ *counter = 0;
+ rmr_sym_foreach_class( rt->hash, RT_MT_SPACE, rte_stats, counter ); // run message type entries
+ rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d mt entries in table\n", *counter );
+ rmr_vlog_force( RMR_VL_DEBUG, "route table meid map:\n" );
*counter = 0;
- rmr_sym_foreach_class( rt->hash, 0, rte_stats, counter ); // run entries
- fprintf( stderr, "[DBUG] %d entries\n", *counter );
+ rmr_sym_foreach_class( rt->hash, RT_ME_SPACE, meid_stats, counter ); // run meid space
+ rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d meids in map\n", *counter );
free( counter );
}
*/
static void rt_epcounts( route_table_t* rt, char* id ) {
if( rt == NULL ) {
- fprintf( stderr, "[INFO] RMR endpoint: no counts: empty table\n" );
+ rmr_vlog_force( RMR_VL_INFO, "endpoint: no counts: empty table\n" );
return;
}
- rmr_sym_foreach_class( rt->hash, 1, ep_counts, id ); // run endpoints in the active table
+ rmr_sym_foreach_class( rt->ephash, 1, ep_counts, id ); // run endpoints in the active table
}
-// ------------------------------------------------------------------------------------------------
+static void dump_tables( uta_ctx_t *ctx ) {
+ if( ctx->old_rtable != NULL ) {
+ rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
+ rt_stats( ctx->old_rtable );
+ } else {
+ rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
+ }
+ rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
+ rt_stats( ctx->rtable );
+}
+
+// ------------ route manager communication -------------------------------------------------
+/*
+ Send a request for a table update to the route manager. Updates come in
+ async, so send and go.
+
+ pctx is the private context for the thread; ctx is the application context
+ that we need to be able to send the application ID in case rt mgr needs to
+ use it to idenfity us.
+
+ Returns 0 if we were not able to send a request.
+*/
+static int send_update_req( uta_ctx_t* pctx, uta_ctx_t* ctx ) {
+ rmr_mbuf_t* smsg;
+ int state = 0;
+
+ if( ctx->rtg_whid < 0 ) {
+ return state;
+ }
+
+ smsg = rmr_alloc_msg( pctx, 1024 );
+ if( smsg != NULL ) {
+ smsg->mtype = RMRRM_REQ_TABLE;
+ smsg->sub_id = 0;
+ snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, time( NULL ) );
+ rmr_vlog( RMR_VL_INFO, "rmr_rtc: requesting table: (%s) whid=%d\n", smsg->payload, ctx->rtg_whid );
+ smsg->len = strlen( smsg->payload ) + 1;
+
+ smsg = rmr_wh_send_msg( pctx, ctx->rtg_whid, smsg );
+ if( (state = smsg->state) != RMR_OK ) {
+ rmr_vlog( RMR_VL_INFO, "rmr_rtc: send failed: %d whid=%d\n", smsg->state, ctx->rtg_whid );
+ rmr_wh_close( ctx, ctx->rtg_whid ); // send failed, assume connection lost
+ ctx->rtg_whid = -1;
+ }
+
+ rmr_free_msg( smsg );
+ }
+
+ return state;
+}
+
+/*
+ Send an ack to the route table manager for a table ID that we are
+ processing. State is 1 for OK, and 0 for failed. Reason might
+ be populated if we know why there was a failure.
+
+ Context should be the PRIVATE context that we use for messages
+ to route manger and NOT the user's context.
+
+ If a message buffere is passed we use that and use return to sender
+ assuming that this might be a response to a call and that is needed
+ to send back to the proper calling thread. If msg is nil, we allocate
+ and use it.
+*/
+static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int state, char* reason ) {
+ int use_rts = 1;
+ int payload_size = 1024;
+
+ if( ctx == NULL || ctx->rtg_whid < 0 ) {
+ return;
+ }
+
+ if( ctx->flags & CFL_NO_RTACK ) { // don't ack if reading from file etc
+ return;
+ }
+
+ if( smsg != NULL ) {
+ smsg = rmr_realloc_payload( smsg, payload_size, FALSE, FALSE ); // ensure it's large enough to send a response
+ } else {
+ use_rts = 0;
+ smsg = rmr_alloc_msg( ctx, payload_size );
+ }
+
+ if( smsg != NULL ) {
+ smsg->mtype = RMRRM_TABLE_STATE;
+ smsg->sub_id = -1;
+ snprintf( smsg->payload, payload_size-1, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR",
+ table_id == NULL ? "<id-missing>" : table_id, reason == NULL ? "" : reason );
+
+ smsg->len = strlen( smsg->payload ) + 1;
+
+ rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d table=%s\n", smsg->payload, state, ctx->rtg_whid, table_id );
+ if( use_rts ) {
+ smsg = rmr_rts_msg( ctx, smsg );
+ } else {
+ smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
+ }
+ if( (state = smsg->state) != RMR_OK ) {
+ rmr_vlog( RMR_VL_WARN, "unable to send table state: %d\n", smsg->state );
+ rmr_wh_close( ctx, ctx->rtg_whid ); // send failed, assume connection lost
+ ctx->rtg_whid = -1;
+ }
+
+ if( ! use_rts ) {
+ rmr_free_msg( smsg ); // if not our message we must free the leftovers
+ }
+ }
+}
+
+// ---- alarm generation --------------------------------------------------------------------------
+
+/*
+ Given the user's context (not the thread private context) look to see if the application isn't
+ working fast enough and we're dropping messages. If the drop counter has changed since the last
+ peeked, and we have not raised an alarm, then we will alarm. If the counter hasn't changed, then we
+ set a timer and if the counter still hasn't changed when it expires we will clear the alarm.
+
+ The private context is what we use to send so as not to interfere with the user flow.
+*/
+static void alarm_if_drops( uta_ctx_t* uctx, uta_ctx_t* pctx ) {
+ static int alarm_raised = 0;
+ static int ok2clear = 0; // time that we can clear
+ static int lastd = 0; // the last counter value so we can compute delta
+ static int prob_id = 0; // problem ID we assume alarm manager handles dups between processes
+
+ rmr_vlog( RMR_VL_DEBUG, "checking for drops... raised=%d 0k2clear=%d lastd=%d probid=%d\n", alarm_raised, ok2clear, lastd, prob_id );
+ if( ! alarm_raised ) {
+ if( uctx->dcount - lastd == 0 ) { // not actively dropping, ok to do nothing
+ return;
+ }
+
+ alarm_raised = 1;
+ uta_alarm( pctx, ALARM_DROPS | ALARM_RAISE, prob_id, "application running slow; RMR is dropping messages" );
+ rmr_vlog( RMR_VL_INFO, "drop alarm raised" );
+ } else {
+ if( uctx->dcount - lastd != 0 ) { // still dropping or dropping again; we've alarmed so nothing to do
+ lastd = uctx->dcount;
+ ok2clear = 0; // reset the timer
+ return;
+ }
+
+ if( ok2clear == 0 ) { // first round where not dropping
+ ok2clear = time( NULL ) + 60; // we'll clear the alarm in 60s
+ } else {
+ if( time( NULL ) > ok2clear ) { // things still stable after expiry
+ rmr_vlog( RMR_VL_INFO, "drop alarm cleared\n" );
+ alarm_raised = 0;
+ uta_alarm( pctx, ALARM_DROPS | ALARM_CLEAR, prob_id, "RMR message dropping has stopped" );
+ prob_id++;
+ }
+ }
+ }
+}
+
+// ---- utility -----------------------------------------------------------------------------------
+
+int isspace_with_fence(int c) {
+ _mm_lfence();
+ return isspace( c );
+}
+
/*
Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
*/
static char* clip( char* buf ) {
- char* tok;
+ char* tok=NULL;
while( *buf && isspace( *buf ) ) { // skip leading whitespace
buf++;
}
}
- for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
+ for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace_with_fence( *tok ); tok-- ); // trim trailing spaces too
*(tok+1) = 0;
return buf;
*/
static char* ensure_nlterm( char* buf ) {
char* nb = NULL;
- int len = 1;
-
+ int len = 0;
- nb = buf;
- if( buf == NULL || (len = strlen( buf )) < 2 ) {
- if( (nb = (char *) malloc( sizeof( char ) * 2 )) != NULL ) {
- *nb = '\n';
- *(nb+1) = 0;
- }
- } else {
- if( buf[len-1] != '\n' ) {
- fprintf( stderr, "[WRN] rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
- if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
- memcpy( nb, buf, len );
- *(nb+len) = '\n'; // insert \n and nil into the two extra bytes we allocated
- *(nb+len+1) = 0;
- }
+ if( buf != NULL ) {
+ len = strlen( buf );
+ }
- free( buf );
- }
+ nb = buf; // default to returning original as is
+ switch( len ) {
+ case 0:
+ nb = strdup( "\n" );
+ break;
+
+ case 1:
+ if( *buf != '\n' ) { // not a newline; realloc
+ rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
+ nb = strdup( " \n" );
+ *nb = *buf;
+ free( buf );
+ }
+ break;
+
+ default:
+ if( buf[len-1] != '\n' ) { // not newline terminated, realloc
+ rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
+ if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
+ memcpy( nb, buf, len );
+ *(nb+len) = '\n'; // insert \n and nil into the two extra bytes we allocated
+ *(nb+len+1) = 0;
+ free( buf );
+ }
+ }
+ break;
}
return nb;
}
+/*
+ Roll the new table into the active and the active into the old table. We
+ must have the lock on the active table to do this. It's possible that there
+ is no active table (first load), so we have to account for that (no locking).
+*/
+static void roll_tables( uta_ctx_t* ctx ) {
+ pthread_mutex_lock( ctx->rtgate ); // must hold lock to move to active
+ if( ctx->new_rtable == NULL || ctx->new_rtable->error ) {
+ rmr_vlog( RMR_VL_WARN, "new route table NOT rolled in: nil pointer or error indicated\n" );
+ ctx->old_rtable = ctx->new_rtable;
+ }else if( ctx->rtable != NULL ) { // initially there isn't one, so must check!
+ ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
+ ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
+ } else {
+ ctx->old_rtable = NULL; // ensure there isn't an old reference
+ ctx->rtable = ctx->new_rtable; // make new the active one
+ }
+ ctx->new_rtable = NULL;
+ pthread_mutex_unlock( ctx->rtgate );
+}
+
+/*
+ Given a thing list, extend the array of pointers by 1/2 of the current
+ number allocated. If we cannot realloc an array, then we set the error
+ flag. Unlikely, but will prevent a crash, AND will prevent us from
+ trying to use the table since we couldn't capture everything.
+*/
+static void extend_things( thing_list_t* tl ) {
+ int old_alloc;
+ void* old_things;
+ void* old_names;
+
+ if( tl == NULL ) {
+ return;
+ }
+
+ old_alloc = tl->nalloc; // capture current things
+ old_things = tl->things;
+ old_names = tl->names;
+
+ tl->nalloc += tl->nalloc/2; // new allocation size
+
+ tl->things = (void **) malloc( sizeof( void * ) * tl->nalloc ); // allocate larger arrays
+ tl->names = (const char **) malloc( sizeof( char * ) * tl->nalloc );
+
+ if( tl->things == NULL || tl->names == NULL ){ // if either failed, then set error
+ tl->error = 1;
+ return;
+ }
+
+ memcpy( tl->things, old_things, sizeof( void * ) * old_alloc );
+ memcpy( tl->names, old_names, sizeof( void * ) * old_alloc );
+
+ free( old_things );
+ free( old_names );
+}
+
+// ------------ entry update functions ---------------------------------------------------------------
/*
Given a message type create a route table entry and add to the hash keyed on the
message type. Once in the hash, endpoints can be added with uta_add_ep. Size
}
if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
- fprintf( stderr, "[ERR] rmr_add_rte: malloc failed for entry\n" );
+ rmr_vlog( RMR_VL_ERR, "rmr_add_rte: malloc failed for entry\n" );
return NULL;
}
memset( rte, 0, sizeof( *rte ) );
rte->refs = 1;
rte->key = key;
- if( nrrgroups <= 0 ) {
+ if( nrrgroups < 0 ) { // zero is allowed as %meid entries have no groups
nrrgroups = 10;
}
- if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
- fprintf( stderr, "rmr_add_rte: malloc failed for rrgroup array\n" );
- free( rte );
- return NULL;
+ if( nrrgroups ) {
+ if( (rte->rrgroups = (rrgroup_t **) malloc( sizeof( rrgroup_t * ) * nrrgroups )) == NULL ) {
+ free( rte );
+ return NULL;
+ }
+ memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
+ } else {
+ rte->rrgroups = NULL;
}
- memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
+
rte->nrrgroups = nrrgroups;
if( (old_rte = rmr_sym_pull( rt->hash, key )) != NULL ) {
rmr_sym_map( rt->hash, key, rte ); // add to hash using numeric mtype as key
- if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups );
return rte;
}
/*
- This accepts partially parsed information from a record sent by route manager or read from
+ This accepts partially parsed information from an rte or mse record sent by route manager or read from
a file such that:
ts_field is the msg-type,sender field
subid is the integer subscription id
*/
static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
rtable_ent_t* rte; // route table entry added
- char* tok;
+ char const* tok;
int ntoks;
uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
- char* tokens[128];
- char* gtokens[64];
- int i;
+ char* tokens[128];
+ char* gtokens[64];
int ngtoks; // number of tokens in the group list
int grp; // index into group list
+ int cgidx; // contiguous group index (prevents the addition of a contiguous group without ep)
+ int has_ep = FALSE; // indicates if an endpoint was added in a given round robin group
ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
rr_field = clip( rr_field );
- if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
+ if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
(uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
- key = build_rt_key( subid, atoi( ts_field ) );
-
- if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
+ key = build_rt_key( subid, atoi( ts_field ) );
- if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) { // split round robin groups
- rte = uta_add_rte( ctx->new_rtable, key, ngtoks ); // get/create entry for this key
+ if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "create rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
- for( grp = 0; grp < ngtoks; grp++ ) {
- if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) { // remove any referneces to our ip addrs
- for( i = 0; i < ntoks; i++ ) {
- if( strcmp( tokens[i], ctx->my_name ) != 0 ) { // don't add if it is us -- cannot send to ourself
- if( DEBUG > 1 || (vlevel > 1)) fprintf( stderr, "[DBUG] add endpoint ts=%s %s\n", ts_field, tokens[i] );
- uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
- }
+ if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) { // split round robin groups
+ if( strcmp( gtokens[0], "%meid" ) == 0 ) {
+ ngtoks = 0; // special indicator that uses meid to find endpoint, no rrobin
+ }
+ rte = uta_add_rte( ctx->new_rtable, key, ngtoks ); // get/create entry for this key
+ rte->mtype = atoi( ts_field ); // capture mtype for debugging
+
+ for( grp = 0, cgidx = 0; grp < ngtoks; grp++ ) {
+ int i; // avoid sonar grumbling by defining this here
+
+ if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) { // remove any references to our ip addrs
+ for( i = 0; i < ntoks; i++ ) {
+ if( strcmp( tokens[i], ctx->my_name ) != 0 ) { // don't add if it is us -- cannot send to ourself
+ if( DEBUG > 1 || (vlevel > 1)) rmr_vlog_force( RMR_VL_DEBUG, "add endpoint ts=%s %s\n", ts_field, tokens[i] );
+ uta_add_ep( ctx->new_rtable, rte, tokens[i], cgidx );
+ has_ep = TRUE;
}
}
+ if( has_ep ) {
+ cgidx++; // only increment to the next contiguous group if the current one has at least one endpoint
+ has_ep = FALSE;
+ }
}
}
- } else {
- if( DEBUG || (vlevel > 2) ) {
- fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] );
- }
}
+ } else {
+ if( DEBUG || (vlevel > 2) ) {
+ rmr_vlog_force( RMR_VL_DEBUG, "build entry: ts_entry not of form msg-type,sender: %s\n", ts_field );
+ }
+ }
}
/*
*/
static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
rtable_ent_t* rte; // route table entry to be 'deleted'
- char* tok;
+ char const* tok;
int ntoks;
uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
- char* tokens[128];
+ char* tokens[128];
if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
return;
ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
- if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
+ if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
(uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
rte = rmr_sym_pull( ctx->new_rtable->hash, key ); // get it
if( rte != NULL ) {
if( DEBUG || (vlevel > 1) ) {
- fprintf( stderr, "[DBUG] delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
+ rmr_vlog_force( RMR_VL_DEBUG, "delete rte for mtype=%s subid=%d key=%08lx\n", ts_field, subid, key );
}
rmr_sym_ndel( ctx->new_rtable->hash, key ); // clear from the new table
del_rte( NULL, NULL, NULL, rte, NULL ); // clean up the memory: reduce ref and free if ref == 0
} else {
if( DEBUG || (vlevel > 1) ) {
- fprintf( stderr, "[DBUG] delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
+ rmr_vlog_force( RMR_VL_DEBUG, "delete could not find rte for mtype=%s subid=%d key=%lx\n", ts_field, subid, key );
+ }
+ }
+ } else {
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "delete rte skipped: %s\n", ts_field );
+ }
+}
+
+// -------------------------- parse functions --------------------------------------------------
+
+/*
+ Given the tokens from an mme_ar (meid add/replace) entry, add the entries.
+ the 'owner' which should be the dns name or IP address of an enpoint
+ the meid_list is a space separated list of me IDs
+
+ This function assumes the caller has vetted the pointers as needed.
+
+ For each meid in the list, an entry is pushed into the hash which references the owner
+ endpoint such that when the meid is used to route a message it references the endpoint
+ to send messages to.
+*/
+static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, int vlevel ) {
+ char const* tok;
+ int ntoks;
+ char* tokens[128];
+ int i;
+ int state;
+ endpoint_t* ep; // endpoint struct for the owner
+
+ owner = clip( owner ); // ditch extra whitespace and trailing comments
+ meid_list = clip( meid_list );
+
+ ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
+ for( i = 0; i < ntoks; i++ ) {
+ if( (ep = rt_ensure_ep( rtab, owner )) != NULL ) {
+ state = rmr_sym_put( rtab->hash, tokens[i], RT_ME_SPACE, ep ); // slam this one in if new; replace if there
+ if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_ar: add/replace meid: %s owned by: %s state=%d\n", tokens[i], owner, state );
+ } else {
+ rmr_vlog( RMR_VL_WARN, "rmr parse_meid_ar: unable to create an endpoint for owner: %s", owner );
+ }
+ }
+}
+
+/*
+ Given the tokens from an mme_del, delete the listed meid entries from the new
+ table. The list is a space separated list of meids.
+
+ The meids in the hash reference endpoints which are never deleted and so
+ the only thing that we need to do here is to remove the meid from the hash.
+
+ This function assumes the caller has vetted the pointers as needed.
+*/
+static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) {
+ char const* tok;
+ int ntoks;
+ char* tokens[128];
+ int i;
+
+ if( rtab->hash == NULL ) {
+ return;
+ }
+
+ meid_list = clip( meid_list );
+
+ ntoks = uta_tokenise( meid_list, tokens, 128, ' ' );
+ for( i = 0; i < ntoks; i++ ) {
+ rmr_sym_del( rtab->hash, tokens[i], RT_ME_SPACE ); // and it only took my little finger to blow it away!
+ if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "parse_meid_del: meid deleted: %s\n", tokens[i] );
+ }
+}
+
+/*
+ Parse a partially parsed meid record. Tokens[0] should be one of:
+ meid_map, mme_ar, mme_del.
+
+ pctx is the private context needed to return an ack/nack using the provided
+ message buffer with the route managers address info.
+*/
+static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char** tokens, int ntoks, int vlevel ) {
+ char wbuf[1024];
+
+ if( tokens == NULL || ntoks < 1 ) {
+ return; // silent but should never happen
+ }
+
+ if( ntoks < 2 ) { // must have at least two for any valid request record
+ rmr_vlog( RMR_VL_ERR, "meid_parse: not enough tokens on %s record\n", tokens[0] );
+ return;
+ }
+
+ if( strcmp( tokens[0], "meid_map" ) == 0 ) { // start or end of the meid map update
+ tokens[1] = clip( tokens[1] );
+ if( *(tokens[1]) == 's' ) {
+ if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
+ if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid map start: dropping incomplete table\n" );
+ uta_rt_drop( ctx->new_rtable );
+ ctx->new_rtable = NULL;
+ send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as and never made it
+ }
+
+ if( ctx->table_id != NULL ) {
+ free( ctx->table_id );
+ }
+ if( ntoks > 2 ) {
+ ctx->table_id = strdup( clip( tokens[2] ) );
+ } else {
+ ctx->table_id = NULL;
+ }
+
+ ctx->new_rtable = prep_new_rt( ctx, ALL ); // start with a clone of everything (mtype, endpoint refs and meid)
+ ctx->new_rtable->mupdates = 0;
+
+ if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid_parse: meid map start found\n" );
+ } else {
+ if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
+ if( ntoks > 2 ) { // meid_map | end | <count> |??? given
+ if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) { // count they added didn't match what we received
+ rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n",
+ ctx->new_rtable->mupdates, tokens[2] );
+ snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
+ send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
+ uta_rt_drop( ctx->new_rtable );
+ ctx->new_rtable = NULL;
+ return;
+ }
+
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid_parse: meid map update ended; found expected number of entries: %s\n", tokens[2] );
+ }
+
+ if( ctx->new_rtable ) {
+ roll_tables( ctx ); // roll active to old, and new to active with proper locking
+ if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of meid map noticed\n" );
+ send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
+
+ if( vlevel > 0 ) {
+ if( ctx->old_rtable != NULL ) {
+ rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
+ rt_stats( ctx->old_rtable );
+ } else {
+ rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
+ }
+ rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
+ rt_stats( ctx->rtable );
+ }
+ } else {
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "end of meid map noticed, but one was not started!\n" );
+ ctx->new_rtable = NULL;
+ }
}
}
+
+ return;
+ }
+
+ if( ! ctx->new_rtable ) { // for any other mmap entries, there must be a table in progress or we punt
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid update/delte (%s) encountered, but table update not started\n", tokens[0] );
+ return;
+ }
+
+ if( strcmp( tokens[0], "mme_ar" ) == 0 ) {
+ if( ntoks < 3 || tokens[1] == NULL || tokens[2] == NULL ) {
+ rmr_vlog( RMR_VL_ERR, "meid_parse: mme_ar record didn't have enough tokens found %d\n", ntoks );
+ return;
+ }
+ parse_meid_ar( ctx->new_rtable, tokens[1], tokens[2], vlevel );
+ ctx->new_rtable->mupdates++;
+ return;
+ }
+
+ if( strcmp( tokens[0], "mme_del" ) == 0 ) { // ntoks < 2 already validated
+ parse_meid_del( ctx->new_rtable, tokens[1], vlevel );
+ ctx->new_rtable->mupdates++;
+ return;
+ }
+}
+
+/*
+ This will close the current table snarf file (in *.inc) and open a new one.
+ The curent one is renamed. The final file name is determined by the setting of
+ RMR_SNARF_RT, and if not set then the variable RMR_SEED_RT is used and given
+ an additional extension of .snarf. If neither seed or snarf environment vars are
+ set then this does nothing.
+
+ If this is called before the tmp snarf file is opened, then this just opens the file.
+*/
+static void cycle_snarfed_rt( uta_ctx_t* ctx ) {
+ static int ok2warn = 0; // some warnings squelched on first call
+
+ char* seed_fname; // the filename from env
+ char tfname[512]; // temp fname
+ char wfname[512]; // working buffer for filename
+ char* snarf_fname = NULL; // prevent overlay of the static table if snarf_rt not given
+
+ if( ctx == NULL ) {
+ return;
+ }
+
+ if( (snarf_fname = getenv( ENV_STASH_RT )) == NULL ) { // specific place to stash the rt not given
+ if( (seed_fname = getenv( ENV_SEED_RT )) != NULL ) { // no seed, we leave in the default file
+ memset( wfname, 0, sizeof( wfname ) );
+ snprintf( wfname, sizeof( wfname ) - 1, "%s.stash", seed_fname );
+ snarf_fname = wfname;
+ }
+ }
+
+ if( snarf_fname == NULL ) {
+ rmr_vlog( RMR_VL_DEBUG, "cycle_snarf: no file to save in" );
+ return;
+ }
+
+ memset( tfname, 0, sizeof( tfname ) );
+ snprintf( tfname, sizeof( tfname ) -1, "%s.inc", snarf_fname ); // must ensure tmp file is moveable
+
+ if( ctx->snarf_rt_fd >= 0 ) {
+ char* msg= "### captured from route manager\n";
+ write( ctx->snarf_rt_fd, msg, strlen( msg ) );
+ if( close( ctx->snarf_rt_fd ) < 0 ) {
+ rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to close working rt snarf file: %s\n", strerror( errno ) );
+ return;
+ }
+
+ if( unlink( snarf_fname ) < 0 && ok2warn ) { // first time through this can fail and we ignore it
+ rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to unlink old static table: %s: %s\n", snarf_fname, strerror( errno ) );
+ }
+
+ if( rename( tfname, snarf_fname ) ) {
+ rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to move new route table to seed aname : %s -> %s: %s\n", tfname, snarf_fname, strerror( errno ) );
+ } else {
+ rmr_vlog( RMR_VL_INFO, "latest route table info saved in: %s\n", snarf_fname );
+ }
+ }
+ ok2warn = 1;
+
+ ctx->snarf_rt_fd = open( tfname, O_WRONLY | O_CREAT | O_TRUNC, 0660 );
+ if( ctx->snarf_rt_fd < 0 ) {
+ rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to open trt file: %s: %s\n", tfname, strerror( errno ) );
} else {
- if( DEBUG ) fprintf( stderr, "[DBUG] delete rte skipped: %s\n", ts_field );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: rt snarf file opened: %s\n", tfname );
}
}
end record causes the in progress table to be finalised and the
currently active table is replaced.
- We expect one of several types:
- newrt|{start|end}
+ The updated table will be activated when the *|end record is encountered.
+ However, to allow for a "double" update, where both the meid map and the
+ route table must be updated at the same time, the end indication on a
+ route table (new or update) may specifiy "hold" which indicates that meid
+ map entries are to follow and the updated route table should be held as
+ pending until the end of the meid map is received and validated.
+
+ CAUTION: we are assuming that there is a single route/meid map generator
+ and as such only one type of update is received at a time; in other
+ words, the sender cannot mix update records and if there is more than
+ one sender process they must synchronise to avoid issues.
+
+
+ For a RT update, we expect:
+ newrt | start | <table-id>
+ newrt | end | <count>
rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
+ mse| <mtype>[,sender] | <sub-id> | %meid
+
+
+ For a meid map update we expect:
+ meid_map | start | <table-id>
+ meid_map | end | <count> | <md5-hash>
+ mme_ar | <e2term-id> | <meid0> <meid1>...<meidn>
+ mme_del | <meid0> <meid1>...<meidn>
+
+
+ The pctx is our private context that must be used to send acks/status
+ messages back to the route manager. The regular ctx is the ctx that
+ the user has been given and thus that's where we have to hang the route
+ table we're working with.
+
+ If mbuf is given, and we need to ack, then we ack using the mbuf and a
+ return to sender call (allows route manager to use wh_call() to send
+ an update and rts is required to get that back to the right thread).
+ If mbuf is nil, then one will be allocated (in ack) and a normal wh_send
+ will be used.
*/
-static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
+static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf ) {
int i;
int ntoks; // number of tokens found in something
int ngtoks;
int grp; // group number
- rtable_ent_t* rte; // route table entry added
+ rtable_ent_t const* rte; // route table entry added
char* tokens[128];
- char* gtokens[64]; // groups
- char* tok; // pointer into a token or string
+ char* tok=NULL; // pointer into a token or string
+ char wbuf[1024];
if( ! buf ) {
return;
}
+ if( ctx && ctx->snarf_rt_fd >= 0 ) { // if snarfing table as it arrives, write this puppy
+ write( ctx->snarf_rt_fd, buf, strlen( buf ) );
+ write( ctx->snarf_rt_fd, "\n", 1 );
+ }
+
while( *buf && isspace( *buf ) ) { // skip leading whitespace
buf++;
}
- for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
+ for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace_with_fence( *tok ); tok-- ); // trim trailing spaces too
*(tok+1) = 0;
+ memset( tokens, 0, sizeof( tokens ) );
if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
+ tokens[0] = clip( tokens[0] );
switch( *(tokens[0]) ) {
case 0: // ignore blanks
// fallthrough
}
if( ntoks < 3 ) {
- if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
+ if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: del record had too few fields: %d instead of 3\n", ntoks );
break;
}
case 'n': // newrt|{start|end}
tokens[1] = clip( tokens[1] );
if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
- if( ctx->new_rtable ) {
- uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
- ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
- ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
- ctx->new_rtable = NULL;
- if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of route table noticed\n" );
+ if( ctx && ctx->snarf_rt_fd >= 0 ) {
+ cycle_snarfed_rt( ctx ); // make it available and open a new one
+ }
- if( vlevel > 0 ) {
- fprintf( stderr, "[DBUG] old route table:\n" );
- rt_stats( ctx->old_rtable );
- fprintf( stderr, "[DBUG] new route table:\n" );
- rt_stats( ctx->rtable );
+ if( ntoks >2 ) {
+ if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received
+ rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
+ ctx->new_rtable->updates, tokens[2] );
+ snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
+ send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
+ uta_rt_drop( ctx->new_rtable );
+ ctx->new_rtable = NULL;
+ break;
}
+ }
+
+ if( ctx->new_rtable ) {
+ roll_tables( ctx ); // roll active to old, and new to active with proper locking
+ if( DEBUG > 1 || (vlevel > 1) ) {
+ rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
+ dump_tables( ctx );
+ }
+
+ send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
+ ctx->rtable_ready = 1; // route based sends can now happen
+ ctx->flags |= CFL_FULLRT; // indicate we have seen a complete route table
} else {
- if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of route table noticed, but one was not started!\n" );
+ if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" );
ctx->new_rtable = NULL;
}
- } else { // start a new table.
- if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
- if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
+ } else { // start a new table.
+ if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
+ send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as end never made it
+
+ if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
uta_rt_drop( ctx->new_rtable );
+ ctx->new_rtable = NULL;
}
- if( ctx->rtable ) {
- ctx->new_rtable = uta_rt_clone( ctx->rtable ); // create by cloning endpoint entries from active table
+ if( ctx->table_id != NULL ) {
+ free( ctx->table_id );
+ }
+ if( ntoks >2 ) {
+ ctx->table_id = strdup( clip( tokens[2] ) );
} else {
- ctx->new_rtable = uta_rt_init( ); // don't have one yet, just crate empty
+ ctx->table_id = NULL;
}
- if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] start of route table noticed\n" );
+
+ ctx->new_rtable = prep_new_rt( ctx, SOME ); // wait for old table to drain and shift it back to new
+ ctx->new_rtable->updates = 0; // init count of entries received
+
+ if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "start of route table noticed\n" );
}
break;
- case 'm': // assume mse entry
- if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
- break;
- }
+ case 'm': // mse entry or one of the meid_ records
+ if( strcmp( tokens[0], "mse" ) == 0 ) {
+ if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
+ break;
+ }
- if( ntoks < 4 ) {
- if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
- break;
- }
+ if( ntoks < 4 ) {
+ if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
+ break;
+ }
- build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
- ctx->new_rtable->updates++;
+ build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
+ ctx->new_rtable->updates++;
+ } else {
+ meid_parser( ctx, pctx, mbuf, tokens, ntoks, vlevel );
+ }
break;
case 'r': // assume rt entry
break;
case 'u': // update current table, not a total replacement
+ if( ! (ctx->flags & CFL_FULLRT) ) { // we cannot update until we have a full table from route generator
+ rmr_vlog( RMR_VL_WARN, "route table update ignored: full table not previously recevied" );
+ break;
+ }
+
tokens[1] = clip( tokens[1] );
if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
if( ctx->new_rtable == NULL ) { // update table not in progress
break;
}
+ if( ctx->snarf_rt_fd >= 0 ) {
+ cycle_snarfed_rt( ctx ); // make it available and open a new one
+ }
if( ntoks >2 ) {
if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received
- fprintf( stderr, "[ERR] rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
+ rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
ctx->new_rtable->updates, tokens[2] );
+ send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
uta_rt_drop( ctx->new_rtable );
ctx->new_rtable = NULL;
break;
}
if( ctx->new_rtable ) {
- uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
- ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
- ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
- ctx->new_rtable = NULL;
- if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] end of rt update noticed\n" );
-
- if( vlevel > 0 ) {
- fprintf( stderr, "[DBUG] old route table:\n" );
- rt_stats( ctx->old_rtable );
- fprintf( stderr, "[DBUG] updated route table:\n" );
- rt_stats( ctx->rtable );
+ roll_tables( ctx ); // roll active to old, and new to active with proper locking
+ if( DEBUG > 1 || (vlevel > 1) ) {
+ rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
+ dump_tables( ctx );
}
+
+ send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
+ ctx->rtable_ready = 1; // route based sends can now happen
} else {
- if( DEBUG > 1 ) fprintf( stderr, "[DBUG] end of rt update noticed, but one was not started!\n" );
+ if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed, but one was not started!\n" );
ctx->new_rtable = NULL;
}
} else { // start a new table.
if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
- if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] new table; dropping incomplete table\n" );
+ if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
+ send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as end never made it
uta_rt_drop( ctx->new_rtable );
+ ctx->new_rtable = NULL;
}
- if( ctx->rtable ) {
- ctx->new_rtable = uta_rt_clone_all( ctx->rtable ); // start with a clone of everything (endpts and entries)
- } else {
- ctx->new_rtable = uta_rt_init( ); // don't have one yet, just crate empty
+ if( ntoks > 2 ) {
+ if( ctx->table_id != NULL ) {
+ free( ctx->table_id );
+ }
+ ctx->table_id = strdup( clip( tokens[2] ) );
}
- ctx->new_rtable->updates = 0; // init count of updates received
- if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] start of rt update noticed\n" );
+ ctx->new_rtable = prep_new_rt( ctx, ALL ); // start with a copy of everything in the live table
+ ctx->new_rtable->updates = 0; // init count of updates received
+
+ if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "start of rt update noticed\n" );
}
break;
default:
- if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: unrecognised request: %s\n", tokens[0] );
+ if( DEBUG ) rmr_vlog( RMR_VL_WARN, "rmr_rtc: unrecognised request: %s\n", tokens[0] );
break;
}
}
char* eor; // end of the record
int rcount = 0; // record count for debug
- if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
- return;
+ if( (fname = ctx->seed_rt_fname) == NULL ) {
+ if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
+ return;
+ }
+
+ ctx->seed_rt_fname = strdup( fname );
+ fname = ctx->seed_rt_fname;
}
if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) { // read file into a single buffer (nil terminated string)
- fprintf( stderr, "[WRN] rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
+ rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
return;
}
- if( DEBUG ) fprintf( stderr, "[DBUG] rmr: seed route table successfully opened: %s\n", fname );
+ if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "seed route table successfully opened: %s\n", fname );
for( eor = fbuf; *eor; eor++ ) { // fix broken systems that use \r or \r\n to terminate records
if( *eor == '\r' ) {
*eor = '\n'; // will look like a blank line which is ok
}
}
- for( rec = fbuf; rec && *rec; rec = eor+1 ) {
+ rec = fbuf;
+ while( rec && *rec ) {
rcount++;
if( (eor = strchr( rec, '\n' )) != NULL ) {
*eor = 0;
} else {
- fprintf( stderr, "[WRN] rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
- fprintf( stderr, "[WRN] rmr read_static: seed route table not used: %s\n", fname );
+ rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
+ rmr_vlog( RMR_VL_WARN, "rmr read_static: seed route table not used: %s\n", fname );
free( fbuf );
return;
}
- parse_rt_rec( ctx, rec, vlevel );
+ parse_rt_rec( ctx, NULL, rec, vlevel, NULL ); // no pvt context as we can't ack
+
+ rec = eor+1;
}
- if( DEBUG ) fprintf( stderr, "[DBUG] rmr: seed route table successfully parsed: %d records\n", rcount );
+ if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_read_static: seed route table successfully parsed: %d records\n", rcount );
free( fbuf );
}
/*
- Callback driven for each named thing in a symtab. We collect the pointers to those
+ Callback driven for each thing in a symtab. We collect the pointers to those
things for later use (cloning).
*/
static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
}
if( thing == NULL ) {
+ rmr_vlog_force( RMR_VL_DEBUG, "collect things given nil thing: %p %p %p\n", st, entry, name ); // dummy ref for sonar
return;
}
+ tl->names[tl->nused] = name; // the name/key (space 0 uses int keys, so name can be nil and that is OK)
tl->things[tl->nused++] = thing; // save a reference to the thing
+
+ if( tl->nused >= tl->nalloc ) {
+ extend_things( tl ); // not enough allocated
+ }
}
/*
int i;
if( (rte = (rtable_ent_t *) thing) == NULL ) {
+ rmr_vlog_force( RMR_VL_DEBUG, "delrte given nil table: %p %p %p\n", st, entry, name ); // dummy ref for sonar
return;
}
for( i = 0; i < rte->nrrgroups; i++ ) {
if( rte->rrgroups[i] ) {
free( rte->rrgroups[i]->epts ); // ditch list of endpoint pointers (end points are reused; don't trash them)
+ free( rte->rrgroups[i] ); // but must free the rrg itself too
}
+
}
free( rte->rrgroups );
an empty buffer, as opposed to a nil, so the caller can generate defaults
or error if an empty/missing file isn't tolerated.
*/
-static char* uta_fib( char* fname ) {
+static char* uta_fib( char const* fname ) {
struct stat stats;
off_t fsize = 8192; // size of the file
off_t nread; // number of bytes read
fsize = stats.st_size; // stat ok, save the file size
}
} else {
- fsize = 8192; // stat failed, we'll leave the file open and try to read a default max of 8k
+ fsize = 8192; // stat failed, we'll leave the file open and try to read a default max of 8k
}
}
return buf;
}
+// --------------------- initialisation/creation ---------------------------------------------
/*
Create and initialise a route table; Returns a pointer to the table struct.
*/
-static route_table_t* uta_rt_init( ) {
+static route_table_t* uta_rt_init( uta_ctx_t* ctx ) {
route_table_t* rt;
+ if( ctx == NULL ) {
+ return NULL;
+ }
if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
return NULL;
}
- if( (rt->hash = rmr_sym_alloc( 509 )) == NULL ) { // modest size, prime
+ memset( rt, 0, sizeof( *rt ) );
+
+ if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) {
free( rt );
return NULL;
}
+ rt->ephash = ctx->ephash; // all route tables share a common endpoint hash
return rt;
}
/*
- Clone (sort of) an existing route table. This is done to preserve the endpoint
- names referenced in a table (and thus existing sessions) when a new set
- of message type to endpoint name mappings is received. A new route table
- with only endpoint name references is returned based on the active table in
- the context.
+ Clones one of the spaces in the given table.
+ Srt is the source route table, Nrt is the new route table; if nil, we allocate it.
+ Space is the space in the old table to copy. Space 0 uses an integer key and
+ references rte structs. All other spaces use a string key and reference endpoints.
*/
-static route_table_t* uta_rt_clone( route_table_t* srt ) {
- endpoint_t* ep; // an endpoint
- route_table_t* nrt; // new route table
+static route_table_t* rt_clone_space( uta_ctx_t* ctx, route_table_t* srt, route_table_t* nrt, int space ) {
+ endpoint_t* ep; // an endpoint (ignore sonar complaint about const*)
+ rtable_ent_t* rte; // a route table entry (ignore sonar complaint about const*)
void* sst; // source symtab
void* nst; // new symtab
- thing_list_t things;
- int i;
+ thing_list_t things; // things from the space to copy
+ int i;
+ int free_on_err = 0;
- if( srt == NULL ) {
+ if( ctx == NULL ) {
return NULL;
}
-
- if( (nrt = (route_table_t *) malloc( sizeof( *nrt ) )) == NULL ) {
- return NULL;
+ if( nrt == NULL ) { // make a new table if needed
+ free_on_err = 1;
+ nrt = uta_rt_init( ctx );
+ if( nrt == NULL ) {
+ return NULL;
+ }
}
- if( (nrt->hash = rmr_sym_alloc( 509 )) == NULL ) { // modest size, prime
- free( nrt );
- return NULL;
+ if( srt == NULL ) { // source was nil, just give back the new table
+ return nrt;
}
things.nalloc = 2048;
things.nused = 0;
+ things.error = 0;
things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
- if( things.things == NULL ) {
- free( nrt->hash );
- free( nrt );
- return NULL;
+ things.names = (const char **) malloc( sizeof( char * ) * things.nalloc );
+ if( things.things == NULL || things.names == NULL ){
+ if( things.things != NULL ) {
+ free( things.things );
+ }
+ if( things.names != NULL ) {
+ free( things.names );
+ }
+
+ if( free_on_err ) {
+ rmr_sym_free( nrt->hash );
+ free( nrt );
+ nrt = NULL;
+ } else {
+ nrt->error = 1;
+ }
+
+ return nrt;
}
+ memset( things.things, 0, sizeof( sizeof( void * ) * things.nalloc ) );
+ memset( things.names, 0, sizeof( char * ) * things.nalloc );
sst = srt->hash; // convenience pointers (src symtab)
nst = nrt->hash;
- rmr_sym_foreach_class( sst, 1, collect_things, &things ); // collect the named endpoints in the active table
+ rmr_sym_foreach_class( sst, space, collect_things, &things ); // collect things from this space
+ if( things.error ) { // something happened and capture failed
+ rmr_vlog( RMR_VL_ERR, "unable to clone route table: unable to capture old contents\n" );
+ free( things.things );
+ free( things.names );
+ if( free_on_err ) {
+ rmr_sym_free( nrt->hash );
+ free( nrt );
+ nrt = NULL;
+ } else {
+ nrt->error = 1;
+ }
+ return nrt;
+ }
+ if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "clone space cloned %d things in space %d\n", things.nused, space );
for( i = 0; i < things.nused; i++ ) {
- ep = (endpoint_t *) things.things[i];
- rmr_sym_put( nst, ep->name, 1, ep ); // slam this one into the new table
+ if( space ) { // string key, epoint reference
+ ep = (endpoint_t *) things.things[i];
+ rmr_sym_put( nst, things.names[i], space, ep ); // slam this one into the new table
+ } else {
+ rte = (rtable_ent_t *) things.things[i];
+ rte->refs++; // rtes can be removed, so we track references
+ rmr_sym_map( nst, rte->key, rte ); // add to hash using numeric mtype/sub-id as key (default to space 0)
+ }
}
free( things.things );
+ free( (void *) things.names );
return nrt;
}
/*
- Clones _all_ of the given route table (references both endpoints AND the route table
- entries. Needed to support a partial update where some route table entries will not
- be deleted if not explicitly in the update.
+ Given a destination route table (drt), clone from the source (srt) into it.
+ If drt is nil, alloc a new one. If srt is nil, then nothing is done (except to
+ allocate the drt if that was nil too). If all is true (1), then we will clone both
+ the MT and the ME spaces; otherwise only the ME space is cloned.
*/
-static route_table_t* uta_rt_clone_all( route_table_t* srt ) {
- endpoint_t* ep; // an endpoint
- rtable_ent_t* rte; // a route table entry
- route_table_t* nrt; // new route table
- void* sst; // source symtab
- void* nst; // new symtab
- thing_list_t things0; // things from space 0 (table entries)
- thing_list_t things1; // things from space 1 (end points)
+static route_table_t* uta_rt_clone( uta_ctx_t* ctx, route_table_t* srt, route_table_t* drt, int all ) {
+ endpoint_t* ep; // an endpoint
+ rtable_ent_t* rte; // a route table entry
int i;
- if( srt == NULL ) {
+ if( ctx == NULL ) {
return NULL;
}
-
- if( (nrt = (route_table_t *) malloc( sizeof( *nrt ) )) == NULL ) {
- return NULL;
+ if( drt == NULL ) {
+ drt = uta_rt_init( ctx );
}
-
- if( (nrt->hash = rmr_sym_alloc( 509 )) == NULL ) { // modest size, prime
- free( nrt );
- return NULL;
+ if( srt == NULL ) {
+ return drt;
}
- things0.nalloc = 2048;
- things0.nused = 0;
- things0.things = (void **) malloc( sizeof( void * ) * things0.nalloc );
- if( things0.things == NULL ) {
- free( nrt->hash );
- free( nrt );
- return NULL;
+ drt->ephash = ctx->ephash; // all rts reference the same EP symtab
+ rt_clone_space( ctx, srt, drt, RT_ME_SPACE );
+ if( all ) {
+ rt_clone_space( ctx, srt, drt, RT_MT_SPACE );
}
- things1.nalloc = 2048;
- things1.nused = 0;
- things1.things = (void **) malloc( sizeof( void * ) * things1.nalloc );
- if( things1.things == NULL ) {
- free( nrt->hash );
- free( nrt );
+ return drt;
+}
+
+/*
+ Prepares the "new" route table for populating. If the old_rtable is not nil, then
+ we wait for it's use count to reach 0. Then the table is cleared, and moved on the
+ context to be referenced by the new pointer; the old pointer is set to nil.
+
+ If the old table doesn't exist, then a new table is created and the new pointer is
+ set to reference it.
+
+ The ME namespace references endpoints which do not need to be released, therefore we
+ do not need to run that portion of the table to deref like we do for the RTEs.
+*/
+static route_table_t* prep_new_rt( uta_ctx_t* ctx, int all ) {
+ //int counter = 0;
+ route_table_t* rt;
+
+ if( ctx == NULL ) {
return NULL;
}
- sst = srt->hash; // convenience pointers (src symtab)
- nst = nrt->hash;
+ pthread_mutex_lock( ctx->rtgate );
+ if( (rt = ctx->old_rtable) != NULL ) {
+ ctx->old_rtable = NULL;
+
+ while( rt->ref_count > 0 ) { // wait for all who are using to stop
+ //if( counter++ > 1000 ) {
+ // rmr_vlog( RMR_VL_WARN, "rt_prep_newrt: internal mishap, ref count on table seems wedged" );
+ // break;
+ //}
- rmr_sym_foreach_class( sst, 0, collect_things, &things0 ); // collect the rtes
- rmr_sym_foreach_class( sst, 1, collect_things, &things1 ); // collect the named endpoints in the active table
+ pthread_mutex_unlock( ctx->rtgate );
+ usleep( 1000 ); // small sleep to yield the processer if that is needed
+ pthread_mutex_lock( ctx->rtgate );
- for( i = 0; i < things0.nused; i++ ) {
- rte = (rtable_ent_t *) things0.things[i];
- rte->refs++; // rtes can be removed, so we track references
- rmr_sym_map( nst, rte->key, rte ); // add to hash using numeric mtype/sub-id as key (default to space 0)
+ }
+
+ if( rt->hash != NULL ) {
+ rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL ); // deref and drop if needed
+ rmr_sym_clear( rt->hash ); // clear all entries from the old table
+ }
+
+ rt->error = 0; // table with errors can be here, so endure clear before attempt to load
+ } else {
+ rt = NULL;
}
+ pthread_mutex_unlock( ctx->rtgate );
- for( i = 0; i < things1.nused; i++ ) {
- ep = (endpoint_t *) things1.things[i];
- rmr_sym_put( nst, ep->name, 1, ep ); // slam this one into the new table
+ rt = uta_rt_clone( ctx, ctx->rtable, rt, all ); // also sets the ephash pointer
+ if( rt != NULL ) { // very small chance for nil, but not zero, so test
+ rt->ref_count = 0; // take no chances; ensure it's 0!
+ } else {
+ rmr_vlog( RMR_VL_ERR, "route table clone returned nil; marking dummy table as error\n" );
+ rt = uta_rt_init( ctx ); // must hav something, but mark it in error state
+ rt->error = 1;
}
- free( things0.things );
- free( things1.things );
- return nrt;
+ return rt;
}
+
/*
Given a name, find the endpoint struct in the provided route table.
*/
static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
- if( rt == NULL || rt->hash == NULL || ep_name == NULL || *ep_name == 0 ) {
+ if( rt == NULL || rt->ephash == NULL || ep_name == NULL || *ep_name == 0 ) {
return NULL;
}
- return rmr_sym_get( rt->hash, ep_name, 1 );
+ return rmr_sym_get( rt->ephash, ep_name, 1 );
}
/*
Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
+ Does NOT destroy the gate as it's a common gate for ALL route tables.
*/
static void uta_rt_drop( route_table_t* rt ) {
if( rt == NULL ) {
endpoint_t* ep;
if( !rt || !ep_name || ! *ep_name ) {
- fprintf( stderr, "[WRN] rmr: rt_ensure: internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
+ rmr_vlog( RMR_VL_WARN, "rt_ensure: internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
errno = EINVAL;
return NULL;
}
- if( (ep = uta_get_ep( rt, ep_name )) == NULL ) { // not there yet, make
+ if( (ep = uta_get_ep( rt, ep_name )) == NULL ) { // not there yet, make
if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
- fprintf( stderr, "[WRN] rmr: rt_ensure: malloc failed for endpoint creation: %s\n", ep_name );
+ rmr_vlog( RMR_VL_WARN, "rt_ensure: malloc failed for endpoint creation: %s\n", ep_name );
errno = ENOMEM;
return NULL;
}
+ ep->notify = 1; // show notification on first connection failure
ep->open = 0; // not connected
ep->addr = uta_h2ip( ep_name );
ep->name = strdup( ep_name );
pthread_mutex_init( &ep->gate, NULL ); // init with default attrs
memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
- rmr_sym_put( rt->hash, ep_name, 1, ep );
+ rmr_sym_put( rt->ephash, ep_name, 1, ep );
}
return ep;
return key;
}
+/*
+ Given a route table and meid string, find the owner (if known). Returns a pointer to
+ the endpoint struct or nil.
+*/
+static inline endpoint_t* get_meid_owner( route_table_t *rt, char const* meid ) {
+ endpoint_t const* ep; // the ep we found in the hash
+
+ if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) {
+ return NULL;
+ }
+
+ return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE );
+}
+
+/*
+ This returns a pointer to the currently active route table and ups
+ the reference count so that the route table is not freed while it
+ is being used. The caller MUST call release_rt() when finished
+ with the pointer.
+
+ Care must be taken: the ctx->rtable pointer _could_ change during the time
+ between the release of the lock and the return. Therefore we MUST grab
+ the current pointer when we have the lock so that if it does we don't
+ return a pointer to the wrong table.
+
+ This will return NULL if there is no active table.
+*/
+static inline route_table_t* get_rt( uta_ctx_t* ctx ) {
+ route_table_t* rrt; // return value
+
+ if( ctx == NULL || ctx->rtable == NULL ) {
+ return NULL;
+ }
+
+ pthread_mutex_lock( ctx->rtgate ); // must hold lock to bump use
+ rrt = ctx->rtable; // must stash the pointer while we hold lock
+ rrt->ref_count++;
+ pthread_mutex_unlock( ctx->rtgate );
+
+ return rrt; // pointer we upped the count with
+}
+
+/*
+ This will "release" the route table by reducing the use counter
+ in the table. The table may not be freed until the counter reaches
+ 0, so it's imparative that the pointer be "released" when it is
+ fetched by get_rt(). Once the caller has released the table it
+ may not safely use the pointer that it had.
+*/
+static inline void release_rt( uta_ctx_t* ctx, route_table_t* rt ) {
+ if( ctx == NULL || rt == NULL ) {
+ return;
+ }
+
+ pthread_mutex_lock( ctx->rtgate ); // must hold lock
+ if( rt->ref_count > 0 ) { // something smells if it's already 0, don't do antyhing if it is
+ rt->ref_count--;
+ }
+ pthread_mutex_unlock( ctx->rtgate );
+}
#endif