-// :vi sw=4 ts=4 noet:
+ // :vi sw=4 ts=4 noet2
/*
==================================================================================
- Copyright (c) 2019 Nokia
- Copyright (c) 2018-2019 AT&T Intellectual Property.
+ Copyright (c) 2019-2020 Nokia
+ Copyright (c) 2018-2020 AT&T Intellectual Property.
- Licensed under the Apache License, Version 2.0 (the "License");
+ Licensed under the Apache License, Version 2.0 (the "License") ;
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
#include <sys/stat.h>
#include <unistd.h>
#include <netdb.h>
+#include <pthread.h>
+#include <immintrin.h>
+#include <stdbool.h>
#include <RIC_message_types.h> // needed for route manager messages
+#define ALL 1
+#define SOME 0
/*
Passed to a symtab foreach callback to construct a list of pointers from
a current symtab.
*/
typedef struct thing_list {
+ int error; // if a realloc failed, this will be set
int nalloc;
int nused;
void** things;
/*
Dump some stats for an endpoint in the RT. This is generally called to
verify endpoints after a table load/change.
+
+ This is called by the for-each mechanism of the symtab and the prototype is
+ fixe; we don't really use some of the parms, but have dummy references to
+ keep sonar from complaining.
*/
static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
int* counter;
if( (counter = (int *) vcounter) != NULL ) {
(*counter)++;
+ } else {
+ rmr_vlog( RMR_VL_DEBUG, "ep_stas: nil counter %p %p %p", st, entry, name ); // dummy refs
+ return;
}
rmr_vlog_force( RMR_VL_DEBUG, "rt endpoint: target=%s open=%d\n", ep->name, ep->open );
/*
Called to count meid entries in the table. The meid points to an 'owning' endpoint
so we can list what we find
+
+ See note in ep_stats about dummy refs.
*/
static void meid_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
int* counter;
if( (counter = (int *) vcounter) != NULL ) {
(*counter)++;
+ } else {
+ rmr_vlog( RMR_VL_DEBUG, "meid_stas: nil counter %p %p %p", st, entry, name ); // dummy refs
}
rmr_vlog_force( RMR_VL_DEBUG, "meid=%s owner=%s open=%d\n", name, ep->name, ep->open );
/*
Dump counts for an endpoint in the RT. The vid parm is assumed to point to
the 'source' information and is added to each message.
+
+ See note above about dummy references.
*/
static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
endpoint_t* ep;
char* id;
if( (ep = (endpoint_t *) thing) == NULL ) {
+ rmr_vlog( RMR_VL_DEBUG, "ep_counts: nil thing %p %p %p", st, entry, name ); // dummy refs
return;
}
*/
static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
int* counter;
- rtable_ent_t* rte; // thing is really an rte
+ rtable_ent_t const* rte; // thing is really an rte
int mtype;
int sid;
if( (rte = (rtable_ent_t *) thing) == NULL ) {
+ rmr_vlog( RMR_VL_DEBUG, "rte_stats: nil thing %p %p %p", st, entry, name ); // dummy refs
return;
}
*counter = 0;
rmr_vlog_force( RMR_VL_DEBUG, "route table stats:\n" );
rmr_vlog_force( RMR_VL_DEBUG, "route table endpoints:\n" );
- rmr_sym_foreach_class( rt->hash, RT_NAME_SPACE, ep_stats, counter ); // run endpoints (names) in the active table
+ rmr_sym_foreach_class( rt->ephash, RT_NAME_SPACE, ep_stats, counter ); // run endpoints (names) in the active table
rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d known endpoints\n", *counter );
rmr_vlog_force( RMR_VL_DEBUG, "route table entries:\n" );
return;
}
- rmr_sym_foreach_class( rt->hash, 1, ep_counts, id ); // run endpoints in the active table
+ rmr_sym_foreach_class( rt->ephash, 1, ep_counts, id ); // run endpoints in the active table
+}
+
+
+static void dump_tables( uta_ctx_t *ctx ) {
+ if( ctx->old_rtable != NULL ) {
+ rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
+ rt_stats( ctx->old_rtable );
+ } else {
+ rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
+ }
+ rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
+ rt_stats( ctx->rtable );
}
// ------------ route manager communication -------------------------------------------------
if( smsg != NULL ) {
smsg->mtype = RMRRM_REQ_TABLE;
smsg->sub_id = 0;
- snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, (long) time( NULL ) );
+ snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, time( NULL ) );
rmr_vlog( RMR_VL_INFO, "rmr_rtc: requesting table: (%s) whid=%d\n", smsg->payload, ctx->rtg_whid );
smsg->len = strlen( smsg->payload ) + 1;
-
+
smsg = rmr_wh_send_msg( pctx, ctx->rtg_whid, smsg );
if( (state = smsg->state) != RMR_OK ) {
rmr_vlog( RMR_VL_INFO, "rmr_rtc: send failed: %d whid=%d\n", smsg->state, ctx->rtg_whid );
/*
Send an ack to the route table manager for a table ID that we are
- processing. State is 1 for OK, and 0 for failed. Reason might
+ processing. State is 1 for OK, and 0 for failed. Reason might
be populated if we know why there was a failure.
Context should be the PRIVATE context that we use for messages
to route manger and NOT the user's context.
+
+ If a message buffere is passed we use that and use return to sender
+ assuming that this might be a response to a call and that is needed
+ to send back to the proper calling thread. If msg is nil, we allocate
+ and use it.
*/
-static void send_rt_ack( uta_ctx_t* ctx, int state, char* reason ) {
- rmr_mbuf_t* smsg;
-
+static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int state, char* reason ) {
+ int use_rts = 1;
+ int payload_size = 1024;
+
if( ctx == NULL || ctx->rtg_whid < 0 ) {
return;
}
return;
}
- smsg = rmr_alloc_msg( ctx, 1024 );
+ if( smsg != NULL ) {
+ smsg = rmr_realloc_payload( smsg, payload_size, FALSE, FALSE ); // ensure it's large enough to send a response
+ } else {
+ use_rts = 0;
+ smsg = rmr_alloc_msg( ctx, payload_size );
+ }
+
if( smsg != NULL ) {
smsg->mtype = RMRRM_TABLE_STATE;
- smsg->sub_id = 0;
- snprintf( smsg->payload, 1024, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR",
- ctx->table_id == NULL ? "<id-missing>" : ctx->table_id, reason == NULL ? "" : reason );
+ smsg->sub_id = -1;
+ snprintf( smsg->payload, payload_size-1, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR",
+ table_id == NULL ? "<id-missing>" : table_id, reason == NULL ? "" : reason );
smsg->len = strlen( smsg->payload ) + 1;
-
- rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d\n", smsg->payload, smsg->state, ctx->rtg_whid );
- smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
+
+ rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d table=%s\n", smsg->payload, state, ctx->rtg_whid, table_id );
+ if( use_rts ) {
+ smsg = rmr_rts_msg( ctx, smsg );
+ } else {
+ smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
+ }
if( (state = smsg->state) != RMR_OK ) {
rmr_vlog( RMR_VL_WARN, "unable to send table state: %d\n", smsg->state );
rmr_wh_close( ctx, ctx->rtg_whid ); // send failed, assume connection lost
ctx->rtg_whid = -1;
}
- rmr_free_msg( smsg );
+ if( ! use_rts ) {
+ rmr_free_msg( smsg ); // if not our message we must free the leftovers
+ }
+ }
+}
+
+// ---- alarm generation --------------------------------------------------------------------------
+
+/*
+ Given the user's context (not the thread private context) look to see if the application isn't
+ working fast enough and we're dropping messages. If the drop counter has changed since the last
+ peeked, and we have not raised an alarm, then we will alarm. If the counter hasn't changed, then we
+ set a timer and if the counter still hasn't changed when it expires we will clear the alarm.
+
+ The private context is what we use to send so as not to interfere with the user flow.
+*/
+static void alarm_if_drops( uta_ctx_t* uctx, uta_ctx_t* pctx ) {
+ static int alarm_raised = 0;
+ static int ok2clear = 0; // time that we can clear
+ static int lastd = 0; // the last counter value so we can compute delta
+ static int prob_id = 0; // problem ID we assume alarm manager handles dups between processes
+
+ rmr_vlog( RMR_VL_DEBUG, "checking for drops... raised=%d 0k2clear=%d lastd=%d probid=%d\n", alarm_raised, ok2clear, lastd, prob_id );
+ if( ! alarm_raised ) {
+ if( uctx->dcount - lastd == 0 ) { // not actively dropping, ok to do nothing
+ return;
+ }
+
+ alarm_raised = 1;
+ uta_alarm( pctx, ALARM_DROPS | ALARM_RAISE, prob_id, "application running slow; RMR is dropping messages" );
+ rmr_vlog( RMR_VL_INFO, "drop alarm raised" );
+ } else {
+ if( uctx->dcount - lastd != 0 ) { // still dropping or dropping again; we've alarmed so nothing to do
+ lastd = uctx->dcount;
+ ok2clear = 0; // reset the timer
+ return;
+ }
+
+ if( ok2clear == 0 ) { // first round where not dropping
+ ok2clear = time( NULL ) + 60; // we'll clear the alarm in 60s
+ } else {
+ if( time( NULL ) > ok2clear ) { // things still stable after expiry
+ rmr_vlog( RMR_VL_INFO, "drop alarm cleared\n" );
+ alarm_raised = 0;
+ uta_alarm( pctx, ALARM_DROPS | ALARM_CLEAR, prob_id, "RMR message dropping has stopped" );
+ prob_id++;
+ }
+ }
}
}
-// ------------------------------------------------------------------------------------------------
+// ---- utility -----------------------------------------------------------------------------------
+
+int isspace_with_fence(int c) {
+ _mm_lfence();
+ return isspace( c );
+}
+
/*
Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
*/
static char* clip( char* buf ) {
- char* tok;
+ char* tok=NULL;
while( *buf && isspace( *buf ) ) { // skip leading whitespace
buf++;
}
}
- for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
+ for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace_with_fence( *tok ); tok-- ); // trim trailing spaces too
*(tok+1) = 0;
return buf;
*/
static char* ensure_nlterm( char* buf ) {
char* nb = NULL;
- int len = 1;
+ int len = 0;
- nb = buf;
- if( buf == NULL || (len = strlen( buf )) < 2 ) {
- if( (nb = (char *) malloc( sizeof( char ) * 2 )) != NULL ) {
- *nb = '\n';
- *(nb+1) = 0;
- }
- } else {
- if( buf[len-1] != '\n' ) {
- rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
- if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
- memcpy( nb, buf, len );
- *(nb+len) = '\n'; // insert \n and nil into the two extra bytes we allocated
- *(nb+len+1) = 0;
- }
+ if( buf != NULL ) {
+ len = strlen( buf );
+ }
- free( buf );
- }
+ nb = buf; // default to returning original as is
+ switch( len ) {
+ case 0:
+ nb = strdup( "\n" );
+ break;
+
+ case 1:
+ if( *buf != '\n' ) { // not a newline; realloc
+ rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
+ nb = strdup( " \n" );
+ *nb = *buf;
+ free( buf );
+ }
+ break;
+
+ default:
+ if( buf[len-1] != '\n' ) { // not newline terminated, realloc
+ rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
+ if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
+ memcpy( nb, buf, len );
+ *(nb+len) = '\n'; // insert \n and nil into the two extra bytes we allocated
+ *(nb+len+1) = 0;
+ free( buf );
+ }
+ }
+ break;
}
return nb;
}
+/*
+ Roll the new table into the active and the active into the old table. We
+ must have the lock on the active table to do this. It's possible that there
+ is no active table (first load), so we have to account for that (no locking).
+*/
+static void roll_tables( uta_ctx_t* ctx ) {
+ pthread_mutex_lock( ctx->rtgate ); // must hold lock to move to active
+ if( ctx->new_rtable == NULL || ctx->new_rtable->error ) {
+ rmr_vlog( RMR_VL_WARN, "new route table NOT rolled in: nil pointer or error indicated\n" );
+ ctx->old_rtable = ctx->new_rtable;
+ }else if( ctx->rtable != NULL ) { // initially there isn't one, so must check!
+ ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
+ ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
+ } else {
+ ctx->old_rtable = NULL; // ensure there isn't an old reference
+ ctx->rtable = ctx->new_rtable; // make new the active one
+ }
+ ctx->new_rtable = NULL;
+ pthread_mutex_unlock( ctx->rtgate );
+}
+
+/*
+ Given a thing list, extend the array of pointers by 1/2 of the current
+ number allocated. If we cannot realloc an array, then we set the error
+ flag. Unlikely, but will prevent a crash, AND will prevent us from
+ trying to use the table since we couldn't capture everything.
+*/
+static void extend_things( thing_list_t* tl ) {
+ int old_alloc;
+ void* old_things;
+ void* old_names;
+
+ if( tl == NULL ) {
+ return;
+ }
+
+ old_alloc = tl->nalloc; // capture current things
+ old_things = tl->things;
+ old_names = tl->names;
+
+ tl->nalloc += tl->nalloc/2; // new allocation size
+
+ tl->things = (void **) malloc( sizeof( void * ) * tl->nalloc ); // allocate larger arrays
+ tl->names = (const char **) malloc( sizeof( char * ) * tl->nalloc );
+
+ if( tl->things == NULL || tl->names == NULL ){ // if either failed, then set error
+ tl->error = 1;
+ return;
+ }
+
+ memcpy( tl->things, old_things, sizeof( void * ) * old_alloc );
+ memcpy( tl->names, old_names, sizeof( void * ) * old_alloc );
+
+ free( old_things );
+ free( old_names );
+}
+
+// ------------ entry update functions ---------------------------------------------------------------
/*
Given a message type create a route table entry and add to the hash keyed on the
message type. Once in the hash, endpoints can be added with uta_add_ep. Size
*/
static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
rtable_ent_t* rte; // route table entry added
- char* tok;
+ char const* tok;
int ntoks;
uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
- char* tokens[128];
- char* gtokens[64];
- int i;
+ char* tokens[128];
+ char* gtokens[64];
int ngtoks; // number of tokens in the group list
int grp; // index into group list
+ int cgidx; // contiguous group index (prevents the addition of a contiguous group without ep)
+ int has_ep = FALSE; // indicates if an endpoint was added in a given round robin group
ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
rr_field = clip( rr_field );
- if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
+ if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
(uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
rte = uta_add_rte( ctx->new_rtable, key, ngtoks ); // get/create entry for this key
rte->mtype = atoi( ts_field ); // capture mtype for debugging
- for( grp = 0; grp < ngtoks; grp++ ) {
- if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) { // remove any referneces to our ip addrs
+ for( grp = 0, cgidx = 0; grp < ngtoks; grp++ ) {
+ int i; // avoid sonar grumbling by defining this here
+
+ if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) { // remove any references to our ip addrs
for( i = 0; i < ntoks; i++ ) {
if( strcmp( tokens[i], ctx->my_name ) != 0 ) { // don't add if it is us -- cannot send to ourself
if( DEBUG > 1 || (vlevel > 1)) rmr_vlog_force( RMR_VL_DEBUG, "add endpoint ts=%s %s\n", ts_field, tokens[i] );
- uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
+ uta_add_ep( ctx->new_rtable, rte, tokens[i], cgidx );
+ has_ep = TRUE;
}
}
+ if( has_ep ) {
+ cgidx++; // only increment to the next contiguous group if the current one has at least one endpoint
+ has_ep = FALSE;
+ }
}
}
}
} else {
if( DEBUG || (vlevel > 2) ) {
- rmr_vlog_force( RMR_VL_DEBUG, "entry not included, sender not matched: %s\n", tokens[1] );
+ rmr_vlog_force( RMR_VL_DEBUG, "build entry: ts_entry not of form msg-type,sender: %s\n", ts_field );
}
}
}
*/
static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
rtable_ent_t* rte; // route table entry to be 'deleted'
- char* tok;
+ char const* tok;
int ntoks;
uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
- char* tokens[128];
+ char* tokens[128];
if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
return;
ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
- if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
+ if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
(uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
}
}
+// -------------------------- parse functions --------------------------------------------------
+
/*
Given the tokens from an mme_ar (meid add/replace) entry, add the entries.
the 'owner' which should be the dns name or IP address of an enpoint
to send messages to.
*/
static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, int vlevel ) {
- char* tok;
+ char const* tok;
int ntoks;
- char* tokens[128];
+ char* tokens[128];
int i;
int state;
endpoint_t* ep; // endpoint struct for the owner
This function assumes the caller has vetted the pointers as needed.
*/
static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) {
- char* tok;
+ char const* tok;
int ntoks;
- char* tokens[128];
+ char* tokens[128];
int i;
if( rtab->hash == NULL ) {
/*
Parse a partially parsed meid record. Tokens[0] should be one of:
meid_map, mme_ar, mme_del.
+
+ pctx is the private context needed to return an ack/nack using the provided
+ message buffer with the route managers address info.
*/
-static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel ) {
+static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char** tokens, int ntoks, int vlevel ) {
+ char wbuf[1024];
+
if( tokens == NULL || ntoks < 1 ) {
return; // silent but should never happen
}
if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid map start: dropping incomplete table\n" );
uta_rt_drop( ctx->new_rtable );
+ ctx->new_rtable = NULL;
+ send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as and never made it
}
- ctx->new_rtable = uta_rt_clone_all( ctx->rtable ); // start with a clone of everything (mtype, endpoint refs and meid)
+ if( ctx->table_id != NULL ) {
+ free( ctx->table_id );
+ }
+ if( ntoks > 2 ) {
+ ctx->table_id = strdup( clip( tokens[2] ) );
+ } else {
+ ctx->table_id = NULL;
+ }
+
+ ctx->new_rtable = prep_new_rt( ctx, ALL ); // start with a clone of everything (mtype, endpoint refs and meid)
ctx->new_rtable->mupdates = 0;
+
if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid_parse: meid map start found\n" );
} else {
if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
if( ntoks > 2 ) { // meid_map | end | <count> |??? given
if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) { // count they added didn't match what we received
- rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n", ctx->new_rtable->mupdates, tokens[2] );
+ rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n",
+ ctx->new_rtable->mupdates, tokens[2] );
+ snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
+ send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
uta_rt_drop( ctx->new_rtable );
ctx->new_rtable = NULL;
return;
}
if( ctx->new_rtable ) {
- uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
- ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
- ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
- ctx->new_rtable = NULL;
+ roll_tables( ctx ); // roll active to old, and new to active with proper locking
if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of meid map noticed\n" );
+ send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
if( vlevel > 0 ) {
- rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
- rt_stats( ctx->old_rtable );
+ if( ctx->old_rtable != NULL ) {
+ rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
+ rt_stats( ctx->old_rtable );
+ } else {
+ rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
+ }
rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
rt_stats( ctx->rtable );
}
}
return;
- }
+ }
- if( ! ctx->new_rtable ) { // for any other mmap entries, there must be a table in progress or we punt
+ if( ! ctx->new_rtable ) { // for any other mmap entries, there must be a table in progress or we punt
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid update/delte (%s) encountered, but table update not started\n", tokens[0] );
return;
}
}
parse_meid_ar( ctx->new_rtable, tokens[1], tokens[2], vlevel );
ctx->new_rtable->mupdates++;
+ return;
}
- if( strcmp( tokens[0], "mme_del" ) == 0 ) {
- if( ntoks < 2 ) {
- rmr_vlog( RMR_VL_ERR, "meid_parse: mme_del record didn't have enough tokens\n" );
- return;
- }
+ if( strcmp( tokens[0], "mme_del" ) == 0 ) { // ntoks < 2 already validated
parse_meid_del( ctx->new_rtable, tokens[1], vlevel );
ctx->new_rtable->mupdates++;
+ return;
+ }
+}
+
+/*
+ This will close the current table snarf file (in *.inc) and open a new one.
+ The curent one is renamed. The final file name is determined by the setting of
+ RMR_SNARF_RT, and if not set then the variable RMR_SEED_RT is used and given
+ an additional extension of .snarf. If neither seed or snarf environment vars are
+ set then this does nothing.
+
+ If this is called before the tmp snarf file is opened, then this just opens the file.
+*/
+static void cycle_snarfed_rt( uta_ctx_t* ctx ) {
+ static int ok2warn = 0; // some warnings squelched on first call
+
+ char* seed_fname; // the filename from env
+ char tfname[512]; // temp fname
+ char wfname[512]; // working buffer for filename
+ char* snarf_fname = NULL; // prevent overlay of the static table if snarf_rt not given
+
+ if( ctx == NULL ) {
+ return;
+ }
+
+ if( (snarf_fname = getenv( ENV_STASH_RT )) == NULL ) { // specific place to stash the rt not given
+ if( (seed_fname = getenv( ENV_SEED_RT )) != NULL ) { // no seed, we leave in the default file
+ memset( wfname, 0, sizeof( wfname ) );
+ snprintf( wfname, sizeof( wfname ) - 1, "%s.stash", seed_fname );
+ snarf_fname = wfname;
+ }
+ }
+
+ if( snarf_fname == NULL ) {
+ rmr_vlog( RMR_VL_DEBUG, "cycle_snarf: no file to save in" );
+ return;
+ }
+
+ memset( tfname, 0, sizeof( tfname ) );
+ snprintf( tfname, sizeof( tfname ) -1, "%s.inc", snarf_fname ); // must ensure tmp file is moveable
+
+ if( ctx->snarf_rt_fd >= 0 ) {
+ char* msg= "### captured from route manager\n";
+ write( ctx->snarf_rt_fd, msg, strlen( msg ) );
+ if( close( ctx->snarf_rt_fd ) < 0 ) {
+ rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to close working rt snarf file: %s\n", strerror( errno ) );
+ return;
+ }
+
+ if( unlink( snarf_fname ) < 0 && ok2warn ) { // first time through this can fail and we ignore it
+ rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to unlink old static table: %s: %s\n", snarf_fname, strerror( errno ) );
+ }
+
+ if( rename( tfname, snarf_fname ) ) {
+ rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to move new route table to seed aname : %s -> %s: %s\n", tfname, snarf_fname, strerror( errno ) );
+ } else {
+ rmr_vlog( RMR_VL_INFO, "latest route table info saved in: %s\n", snarf_fname );
+ }
+ }
+ ok2warn = 1;
+
+ ctx->snarf_rt_fd = open( tfname, O_WRONLY | O_CREAT | O_TRUNC, 0660 );
+ if( ctx->snarf_rt_fd < 0 ) {
+ rmr_vlog( RMR_VL_WARN, "rmr_rtc: unable to open trt file: %s: %s\n", tfname, strerror( errno ) );
+ } else {
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: rt snarf file opened: %s\n", tfname );
}
}
messages back to the route manager. The regular ctx is the ctx that
the user has been given and thus that's where we have to hang the route
table we're working with.
+
+ If mbuf is given, and we need to ack, then we ack using the mbuf and a
+ return to sender call (allows route manager to use wh_call() to send
+ an update and rts is required to get that back to the right thread).
+ If mbuf is nil, then one will be allocated (in ack) and a normal wh_send
+ will be used.
*/
-static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vlevel ) {
+static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf ) {
int i;
int ntoks; // number of tokens found in something
int ngtoks;
int grp; // group number
- rtable_ent_t* rte; // route table entry added
+ rtable_ent_t const* rte; // route table entry added
char* tokens[128];
- char* tok; // pointer into a token or string
+ char* tok=NULL; // pointer into a token or string
char wbuf[1024];
if( ! buf ) {
return;
}
+ if( ctx && ctx->snarf_rt_fd >= 0 ) { // if snarfing table as it arrives, write this puppy
+ write( ctx->snarf_rt_fd, buf, strlen( buf ) );
+ write( ctx->snarf_rt_fd, "\n", 1 );
+ }
+
while( *buf && isspace( *buf ) ) { // skip leading whitespace
buf++;
}
- for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
+ for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace_with_fence( *tok ); tok-- ); // trim trailing spaces too
*(tok+1) = 0;
+ memset( tokens, 0, sizeof( tokens ) );
if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
tokens[0] = clip( tokens[0] );
switch( *(tokens[0]) ) {
case 'n': // newrt|{start|end}
tokens[1] = clip( tokens[1] );
if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
+ if( ctx && ctx->snarf_rt_fd >= 0 ) {
+ cycle_snarfed_rt( ctx ); // make it available and open a new one
+ }
+
if( ntoks >2 ) {
if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received
rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
ctx->new_rtable->updates, tokens[2] );
snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
- send_rt_ack( pctx, RMR_OK, wbuf );
+ send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
uta_rt_drop( ctx->new_rtable );
ctx->new_rtable = NULL;
break;
}
if( ctx->new_rtable ) {
- uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
- ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
- ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
- ctx->new_rtable = NULL;
- if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
-
- if( vlevel > 0 ) {
- rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
- rt_stats( ctx->old_rtable );
- rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
- rt_stats( ctx->rtable );
+ roll_tables( ctx ); // roll active to old, and new to active with proper locking
+ if( DEBUG > 1 || (vlevel > 1) ) {
+ rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
+ dump_tables( ctx );
}
- send_rt_ack( pctx, RMR_OK, NULL );
+ send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
+ ctx->rtable_ready = 1; // route based sends can now happen
+ ctx->flags |= CFL_FULLRT; // indicate we have seen a complete route table
} else {
if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" );
ctx->new_rtable = NULL;
}
} else { // start a new table.
if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
- send_rt_ack( pctx, !RMR_OK, "table not complete" ); // nack the one that was pending as end never made it
+ send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as end never made it
if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
uta_rt_drop( ctx->new_rtable );
+ ctx->new_rtable = NULL;
}
if( ctx->table_id != NULL ) {
free( ctx->table_id );
}
if( ntoks >2 ) {
- ctx->table_id = strdup( tokens[2] );
+ ctx->table_id = strdup( clip( tokens[2] ) );
} else {
ctx->table_id = NULL;
}
- ctx->new_rtable = NULL;
- ctx->new_rtable = uta_rt_clone( ctx->rtable ); // create by cloning endpoint and meidtentries from active table
+ ctx->new_rtable = prep_new_rt( ctx, SOME ); // wait for old table to drain and shift it back to new
ctx->new_rtable->updates = 0; // init count of entries received
+
if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "start of route table noticed\n" );
}
break;
- case 'm': // mse entry or one of the meid_ records
+ case 'm': // mse entry or one of the meid_ records
if( strcmp( tokens[0], "mse" ) == 0 ) {
if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
break;
build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
ctx->new_rtable->updates++;
} else {
- meid_parser( ctx, tokens, ntoks, vlevel );
+ meid_parser( ctx, pctx, mbuf, tokens, ntoks, vlevel );
}
break;
break;
case 'u': // update current table, not a total replacement
+ if( ! (ctx->flags & CFL_FULLRT) ) { // we cannot update until we have a full table from route generator
+ rmr_vlog( RMR_VL_WARN, "route table update ignored: full table not previously recevied" );
+ break;
+ }
+
tokens[1] = clip( tokens[1] );
if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
if( ctx->new_rtable == NULL ) { // update table not in progress
break;
}
+ if( ctx->snarf_rt_fd >= 0 ) {
+ cycle_snarfed_rt( ctx ); // make it available and open a new one
+ }
if( ntoks >2 ) {
if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received
rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
ctx->new_rtable->updates, tokens[2] );
+ send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
uta_rt_drop( ctx->new_rtable );
ctx->new_rtable = NULL;
break;
}
if( ctx->new_rtable ) {
- uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
- ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
- ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
- ctx->new_rtable = NULL;
- if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
-
- if( vlevel > 0 ) {
- rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
- rt_stats( ctx->old_rtable );
- rmr_vlog_force( RMR_VL_DEBUG, "updated route table:\n" );
- rt_stats( ctx->rtable );
+ roll_tables( ctx ); // roll active to old, and new to active with proper locking
+ if( DEBUG > 1 || (vlevel > 1) ) {
+ rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
+ dump_tables( ctx );
}
+
+ send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
+ ctx->rtable_ready = 1; // route based sends can now happen
} else {
if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed, but one was not started!\n" );
ctx->new_rtable = NULL;
} else { // start a new table.
if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
+ send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as end never made it
uta_rt_drop( ctx->new_rtable );
+ ctx->new_rtable = NULL;
}
- if( ntoks >2 ) {
+ if( ntoks > 2 ) {
if( ctx->table_id != NULL ) {
free( ctx->table_id );
}
- ctx->table_id = strdup( tokens[2] );
+ ctx->table_id = strdup( clip( tokens[2] ) );
}
- ctx->new_rtable = uta_rt_clone_all( ctx->rtable ); // start with a clone of everything (endpts and entries)
- ctx->new_rtable->updates = 0; // init count of updates received
+ ctx->new_rtable = prep_new_rt( ctx, ALL ); // start with a copy of everything in the live table
+ ctx->new_rtable->updates = 0; // init count of updates received
+
if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "start of rt update noticed\n" );
}
break;
char* eor; // end of the record
int rcount = 0; // record count for debug
- if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
- return;
+ if( (fname = ctx->seed_rt_fname) == NULL ) {
+ if( (fname = getenv( ENV_SEED_RT )) == NULL ) {
+ return;
+ }
+
+ ctx->seed_rt_fname = strdup( fname );
+ fname = ctx->seed_rt_fname;
}
if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) { // read file into a single buffer (nil terminated string)
}
}
- for( rec = fbuf; rec && *rec; rec = eor+1 ) {
+ rec = fbuf;
+ while( rec && *rec ) {
rcount++;
if( (eor = strchr( rec, '\n' )) != NULL ) {
*eor = 0;
return;
}
- parse_rt_rec( ctx, NULL, rec, vlevel ); // no pvt context as we can't ack
+ parse_rt_rec( ctx, NULL, rec, vlevel, NULL ); // no pvt context as we can't ack
+
+ rec = eor+1;
}
if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_read_static: seed route table successfully parsed: %d records\n", rcount );
}
/*
- Callback driven for each named thing in a symtab. We collect the pointers to those
+ Callback driven for each thing in a symtab. We collect the pointers to those
things for later use (cloning).
*/
static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list ) {
}
if( thing == NULL ) {
+ rmr_vlog_force( RMR_VL_DEBUG, "collect things given nil thing: %p %p %p\n", st, entry, name ); // dummy ref for sonar
return;
}
- tl->names[tl->nused] = name; // the name/key
+ tl->names[tl->nused] = name; // the name/key (space 0 uses int keys, so name can be nil and that is OK)
tl->things[tl->nused++] = thing; // save a reference to the thing
+
+ if( tl->nused >= tl->nalloc ) {
+ extend_things( tl ); // not enough allocated
+ }
}
/*
int i;
if( (rte = (rtable_ent_t *) thing) == NULL ) {
+ rmr_vlog_force( RMR_VL_DEBUG, "delrte given nil table: %p %p %p\n", st, entry, name ); // dummy ref for sonar
return;
}
for( i = 0; i < rte->nrrgroups; i++ ) {
if( rte->rrgroups[i] ) {
free( rte->rrgroups[i]->epts ); // ditch list of endpoint pointers (end points are reused; don't trash them)
+ free( rte->rrgroups[i] ); // but must free the rrg itself too
}
+
}
free( rte->rrgroups );
an empty buffer, as opposed to a nil, so the caller can generate defaults
or error if an empty/missing file isn't tolerated.
*/
-static char* uta_fib( char* fname ) {
+static char* uta_fib( char const* fname ) {
struct stat stats;
off_t fsize = 8192; // size of the file
off_t nread; // number of bytes read
fsize = stats.st_size; // stat ok, save the file size
}
} else {
- fsize = 8192; // stat failed, we'll leave the file open and try to read a default max of 8k
+ fsize = 8192; // stat failed, we'll leave the file open and try to read a default max of 8k
}
}
return buf;
}
+// --------------------- initialisation/creation ---------------------------------------------
/*
Create and initialise a route table; Returns a pointer to the table struct.
*/
-static route_table_t* uta_rt_init( ) {
+static route_table_t* uta_rt_init( uta_ctx_t* ctx ) {
route_table_t* rt;
+ if( ctx == NULL ) {
+ return NULL;
+ }
if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
return NULL;
}
+ memset( rt, 0, sizeof( *rt ) );
+
if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) {
free( rt );
return NULL;
}
+ rt->ephash = ctx->ephash; // all route tables share a common endpoint hash
return rt;
}
Space is the space in the old table to copy. Space 0 uses an integer key and
references rte structs. All other spaces use a string key and reference endpoints.
*/
-static route_table_t* rt_clone_space( route_table_t* srt, route_table_t* nrt, int space ) {
- endpoint_t* ep; // an endpoint
- rtable_ent_t* rte; // a route table entry
+static route_table_t* rt_clone_space( uta_ctx_t* ctx, route_table_t* srt, route_table_t* nrt, int space ) {
+ endpoint_t* ep; // an endpoint (ignore sonar complaint about const*)
+ rtable_ent_t* rte; // a route table entry (ignore sonar complaint about const*)
void* sst; // source symtab
void* nst; // new symtab
thing_list_t things; // things from the space to copy
int i;
int free_on_err = 0;
+ if( ctx == NULL ) {
+ return NULL;
+ }
if( nrt == NULL ) { // make a new table if needed
free_on_err = 1;
- nrt = uta_rt_init();
+ nrt = uta_rt_init( ctx );
+ if( nrt == NULL ) {
+ return NULL;
+ }
}
if( srt == NULL ) { // source was nil, just give back the new table
things.nalloc = 2048;
things.nused = 0;
+ things.error = 0;
things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
things.names = (const char **) malloc( sizeof( char * ) * things.nalloc );
- if( things.things == NULL ) {
+ if( things.things == NULL || things.names == NULL ){
+ if( things.things != NULL ) {
+ free( things.things );
+ }
+ if( things.names != NULL ) {
+ free( things.names );
+ }
+
if( free_on_err ) {
- free( nrt->hash );
+ rmr_sym_free( nrt->hash );
free( nrt );
nrt = NULL;
+ } else {
+ nrt->error = 1;
}
return nrt;
}
+ memset( things.things, 0, sizeof( sizeof( void * ) * things.nalloc ) );
+ memset( things.names, 0, sizeof( char * ) * things.nalloc );
sst = srt->hash; // convenience pointers (src symtab)
nst = nrt->hash;
rmr_sym_foreach_class( sst, space, collect_things, &things ); // collect things from this space
+ if( things.error ) { // something happened and capture failed
+ rmr_vlog( RMR_VL_ERR, "unable to clone route table: unable to capture old contents\n" );
+ free( things.things );
+ free( things.names );
+ if( free_on_err ) {
+ rmr_sym_free( nrt->hash );
+ free( nrt );
+ nrt = NULL;
+ } else {
+ nrt->error = 1;
+ }
+ return nrt;
+ }
if( DEBUG ) rmr_vlog_force( RMR_VL_DEBUG, "clone space cloned %d things in space %d\n", things.nused, space );
for( i = 0; i < things.nused; i++ ) {
}
/*
- Creates a new route table and then clones the parts of the table which we must keep with each newrt|start.
- The endpoint and meid entries in the hash must be preserved.
+ Given a destination route table (drt), clone from the source (srt) into it.
+ If drt is nil, alloc a new one. If srt is nil, then nothing is done (except to
+ allocate the drt if that was nil too). If all is true (1), then we will clone both
+ the MT and the ME spaces; otherwise only the ME space is cloned.
*/
-static route_table_t* uta_rt_clone( route_table_t* srt ) {
+static route_table_t* uta_rt_clone( uta_ctx_t* ctx, route_table_t* srt, route_table_t* drt, int all ) {
endpoint_t* ep; // an endpoint
rtable_ent_t* rte; // a route table entry
- route_table_t* nrt = NULL; // new route table
int i;
+ if( ctx == NULL ) {
+ return NULL;
+ }
+ if( drt == NULL ) {
+ drt = uta_rt_init( ctx );
+ }
if( srt == NULL ) {
- return uta_rt_init(); // no source to clone, just return an empty table
+ return drt;
}
- nrt = rt_clone_space( srt, nrt, RT_NAME_SPACE ); // allocate a new one, add endpoint refs
- rt_clone_space( srt, nrt, RT_ME_SPACE ); // add meid refs to new
+ drt->ephash = ctx->ephash; // all rts reference the same EP symtab
+ rt_clone_space( ctx, srt, drt, RT_ME_SPACE );
+ if( all ) {
+ rt_clone_space( ctx, srt, drt, RT_MT_SPACE );
+ }
- return nrt;
+ return drt;
}
/*
- Creates a new route table and then clones _all_ of the given route table (references
- both endpoints AND the route table entries. Needed to support a partial update where
- some route table entries will not be deleted if not explicitly in the update and when
- we are adding/replacing meid references.
+ Prepares the "new" route table for populating. If the old_rtable is not nil, then
+ we wait for it's use count to reach 0. Then the table is cleared, and moved on the
+ context to be referenced by the new pointer; the old pointer is set to nil.
+
+ If the old table doesn't exist, then a new table is created and the new pointer is
+ set to reference it.
+
+ The ME namespace references endpoints which do not need to be released, therefore we
+ do not need to run that portion of the table to deref like we do for the RTEs.
*/
-static route_table_t* uta_rt_clone_all( route_table_t* srt ) {
- endpoint_t* ep; // an endpoint
- rtable_ent_t* rte; // a route table entry
- route_table_t* nrt = NULL; // new route table
- int i;
+static route_table_t* prep_new_rt( uta_ctx_t* ctx, int all ) {
+ //int counter = 0;
+ route_table_t* rt;
- if( srt == NULL ) {
- return uta_rt_init(); // no source to clone, just return an empty table
+ if( ctx == NULL ) {
+ return NULL;
}
- nrt = rt_clone_space( srt, nrt, RT_MT_SPACE ); // create new, clone all spaces to it
- rt_clone_space( srt, nrt, RT_NAME_SPACE );
- rt_clone_space( srt, nrt, RT_ME_SPACE );
+ pthread_mutex_lock( ctx->rtgate );
+ if( (rt = ctx->old_rtable) != NULL ) {
+ ctx->old_rtable = NULL;
- return nrt;
+ while( rt->ref_count > 0 ) { // wait for all who are using to stop
+ //if( counter++ > 1000 ) {
+ // rmr_vlog( RMR_VL_WARN, "rt_prep_newrt: internal mishap, ref count on table seems wedged" );
+ // break;
+ //}
+
+ pthread_mutex_unlock( ctx->rtgate );
+ usleep( 1000 ); // small sleep to yield the processer if that is needed
+ pthread_mutex_lock( ctx->rtgate );
+
+ }
+
+ if( rt->hash != NULL ) {
+ rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL ); // deref and drop if needed
+ rmr_sym_clear( rt->hash ); // clear all entries from the old table
+ }
+
+ rt->error = 0; // table with errors can be here, so endure clear before attempt to load
+ } else {
+ rt = NULL;
+ }
+ pthread_mutex_unlock( ctx->rtgate );
+
+ rt = uta_rt_clone( ctx, ctx->rtable, rt, all ); // also sets the ephash pointer
+ if( rt != NULL ) { // very small chance for nil, but not zero, so test
+ rt->ref_count = 0; // take no chances; ensure it's 0!
+ } else {
+ rmr_vlog( RMR_VL_ERR, "route table clone returned nil; marking dummy table as error\n" );
+ rt = uta_rt_init( ctx ); // must hav something, but mark it in error state
+ rt->error = 1;
+ }
+
+ return rt;
}
+
/*
Given a name, find the endpoint struct in the provided route table.
*/
static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
- if( rt == NULL || rt->hash == NULL || ep_name == NULL || *ep_name == 0 ) {
+ if( rt == NULL || rt->ephash == NULL || ep_name == NULL || *ep_name == 0 ) {
return NULL;
}
- return rmr_sym_get( rt->hash, ep_name, 1 );
+ return rmr_sym_get( rt->ephash, ep_name, 1 );
}
/*
Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
+ Does NOT destroy the gate as it's a common gate for ALL route tables.
*/
static void uta_rt_drop( route_table_t* rt ) {
if( rt == NULL ) {
return NULL;
}
- if( (ep = uta_get_ep( rt, ep_name )) == NULL ) { // not there yet, make
+ if( (ep = uta_get_ep( rt, ep_name )) == NULL ) { // not there yet, make
if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
rmr_vlog( RMR_VL_WARN, "rt_ensure: malloc failed for endpoint creation: %s\n", ep_name );
errno = ENOMEM;
pthread_mutex_init( &ep->gate, NULL ); // init with default attrs
memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
- rmr_sym_put( rt->hash, ep_name, 1, ep );
+ rmr_sym_put( rt->ephash, ep_name, 1, ep );
}
return ep;
Given a route table and meid string, find the owner (if known). Returns a pointer to
the endpoint struct or nil.
*/
-static inline endpoint_t* get_meid_owner( route_table_t *rt, char* meid ) {
- endpoint_t* ep; // the ep we found in the hash
+static inline endpoint_t* get_meid_owner( route_table_t *rt, char const* meid ) {
+ endpoint_t const* ep; // the ep we found in the hash
if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) {
return NULL;
}
- return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE );
+ return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE );
+}
+
+/*
+ This returns a pointer to the currently active route table and ups
+ the reference count so that the route table is not freed while it
+ is being used. The caller MUST call release_rt() when finished
+ with the pointer.
+
+ Care must be taken: the ctx->rtable pointer _could_ change during the time
+ between the release of the lock and the return. Therefore we MUST grab
+ the current pointer when we have the lock so that if it does we don't
+ return a pointer to the wrong table.
+
+ This will return NULL if there is no active table.
+*/
+static inline route_table_t* get_rt( uta_ctx_t* ctx ) {
+ route_table_t* rrt; // return value
+
+ if( ctx == NULL || ctx->rtable == NULL ) {
+ return NULL;
+ }
+
+ pthread_mutex_lock( ctx->rtgate ); // must hold lock to bump use
+ rrt = ctx->rtable; // must stash the pointer while we hold lock
+ rrt->ref_count++;
+ pthread_mutex_unlock( ctx->rtgate );
+
+ return rrt; // pointer we upped the count with
+}
+
+/*
+ This will "release" the route table by reducing the use counter
+ in the table. The table may not be freed until the counter reaches
+ 0, so it's imparative that the pointer be "released" when it is
+ fetched by get_rt(). Once the caller has released the table it
+ may not safely use the pointer that it had.
+*/
+static inline void release_rt( uta_ctx_t* ctx, route_table_t* rt ) {
+ if( ctx == NULL || rt == NULL ) {
+ return;
+ }
+
+ pthread_mutex_lock( ctx->rtgate ); // must hold lock
+ if( rt->ref_count > 0 ) { // something smells if it's already 0, don't do antyhing if it is
+ rt->ref_count--;
+ }
+ pthread_mutex_unlock( ctx->rtgate );
}
#endif