// :vi sw=4 ts=4 noet:
/*
==================================================================================
- Copyright (c) 2019 Nokia
- Copyright (c) 2018-2019 AT&T Intellectual Property.
+ Copyright (c) 2019-2020 Nokia
+ Copyright (c) 2018-2020 AT&T Intellectual Property.
- Licensed under the Apache License, Version 2.0 (the "License");
+ Licensed under the Apache License, Version 2.0 (the "License") ;
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
#include <sys/stat.h>
#include <unistd.h>
#include <netdb.h>
+#include <pthread.h>
#include <RIC_message_types.h> // needed for route manager messages
+#define ALL 1
+#define SOME 0
/*
Passed to a symtab foreach callback to construct a list of pointers from
/*
Dump some stats for an endpoint in the RT. This is generally called to
verify endpoints after a table load/change.
+
+ This is called by the for-each mechanism of the symtab and the prototype is
+ fixe; we don't really use some of the parms, but have dummy references to
+ keep sonar from complaining.
*/
static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
int* counter;
if( (counter = (int *) vcounter) != NULL ) {
(*counter)++;
+ } else {
+ rmr_vlog( RMR_VL_DEBUG, "ep_stas: nil counter %p %p %p", st, entry, name ); // dummy refs
}
rmr_vlog_force( RMR_VL_DEBUG, "rt endpoint: target=%s open=%d\n", ep->name, ep->open );
/*
Called to count meid entries in the table. The meid points to an 'owning' endpoint
so we can list what we find
+
+ See note in ep_stats about dummy refs.
*/
static void meid_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
int* counter;
if( (counter = (int *) vcounter) != NULL ) {
(*counter)++;
+ } else {
+ rmr_vlog( RMR_VL_DEBUG, "meid_stas: nil counter %p %p %p", st, entry, name ); // dummy refs
}
rmr_vlog_force( RMR_VL_DEBUG, "meid=%s owner=%s open=%d\n", name, ep->name, ep->open );
/*
Dump counts for an endpoint in the RT. The vid parm is assumed to point to
the 'source' information and is added to each message.
+
+ See note above about dummy references.
*/
static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
endpoint_t* ep;
char* id;
if( (ep = (endpoint_t *) thing) == NULL ) {
+ rmr_vlog( RMR_VL_DEBUG, "ep_counts: nil thing %p %p %p", st, entry, name ); // dummy refs
return;
}
*/
static void rte_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
int* counter;
- rtable_ent_t* rte; // thing is really an rte
+ rtable_ent_t const* rte; // thing is really an rte
int mtype;
int sid;
if( (rte = (rtable_ent_t *) thing) == NULL ) {
+ rmr_vlog( RMR_VL_DEBUG, "rte_stats: nil thing %p %p %p", st, entry, name ); // dummy refs
return;
}
*counter = 0;
rmr_vlog_force( RMR_VL_DEBUG, "route table stats:\n" );
rmr_vlog_force( RMR_VL_DEBUG, "route table endpoints:\n" );
- rmr_sym_foreach_class( rt->hash, RT_NAME_SPACE, ep_stats, counter ); // run endpoints (names) in the active table
+ rmr_sym_foreach_class( rt->ephash, RT_NAME_SPACE, ep_stats, counter ); // run endpoints (names) in the active table
rmr_vlog_force( RMR_VL_DEBUG, "rtable: %d known endpoints\n", *counter );
rmr_vlog_force( RMR_VL_DEBUG, "route table entries:\n" );
rmr_sym_foreach_class( rt->hash, 1, ep_counts, id ); // run endpoints in the active table
}
+
+static void dump_tables( uta_ctx_t *ctx ) {
+ if( ctx->old_rtable != NULL ) {
+ rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
+ rt_stats( ctx->old_rtable );
+ } else {
+ rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
+ }
+ rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
+ rt_stats( ctx->rtable );
+}
+
// ------------ route manager communication -------------------------------------------------
/*
Send a request for a table update to the route manager. Updates come in
if( smsg != NULL ) {
smsg->mtype = RMRRM_REQ_TABLE;
smsg->sub_id = 0;
- snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, (long) time( NULL ) );
+ snprintf( smsg->payload, 1024, "%s ts=%ld\n", ctx->my_name, time( NULL ) );
rmr_vlog( RMR_VL_INFO, "rmr_rtc: requesting table: (%s) whid=%d\n", smsg->payload, ctx->rtg_whid );
smsg->len = strlen( smsg->payload ) + 1;
-
+
smsg = rmr_wh_send_msg( pctx, ctx->rtg_whid, smsg );
if( (state = smsg->state) != RMR_OK ) {
rmr_vlog( RMR_VL_INFO, "rmr_rtc: send failed: %d whid=%d\n", smsg->state, ctx->rtg_whid );
/*
Send an ack to the route table manager for a table ID that we are
- processing. State is 1 for OK, and 0 for failed. Reason might
+ processing. State is 1 for OK, and 0 for failed. Reason might
be populated if we know why there was a failure.
Context should be the PRIVATE context that we use for messages
to route manger and NOT the user's context.
+
+ If a message buffere is passed we use that and use return to sender
+ assuming that this might be a response to a call and that is needed
+ to send back to the proper calling thread. If msg is nil, we allocate
+ and use it.
*/
-static void send_rt_ack( uta_ctx_t* ctx, char* table_id, int state, char* reason ) {
- rmr_mbuf_t* smsg;
-
+static void send_rt_ack( uta_ctx_t* ctx, rmr_mbuf_t* smsg, char* table_id, int state, char* reason ) {
+ int use_rts = 1;
+ int payload_size = 1024;
+
if( ctx == NULL || ctx->rtg_whid < 0 ) {
return;
}
return;
}
- smsg = rmr_alloc_msg( ctx, 1024 );
+ if( smsg != NULL ) {
+ smsg = rmr_realloc_payload( smsg, payload_size, FALSE, FALSE ); // ensure it's large enough to send a response
+ } else {
+ use_rts = 0;
+ smsg = rmr_alloc_msg( ctx, payload_size );
+ }
+
if( smsg != NULL ) {
smsg->mtype = RMRRM_TABLE_STATE;
- smsg->sub_id = 0;
- snprintf( smsg->payload, 1024, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR",
+ smsg->sub_id = -1;
+ snprintf( smsg->payload, payload_size-1, "%s %s %s\n", state == RMR_OK ? "OK" : "ERR",
table_id == NULL ? "<id-missing>" : table_id, reason == NULL ? "" : reason );
smsg->len = strlen( smsg->payload ) + 1;
-
- rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d\n", smsg->payload, smsg->state, ctx->rtg_whid );
- smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
+
+ rmr_vlog( RMR_VL_INFO, "rmr_rtc: sending table state: (%s) state=%d whid=%d table=%s\n", smsg->payload, state, ctx->rtg_whid, table_id );
+ if( use_rts ) {
+ smsg = rmr_rts_msg( ctx, smsg );
+ } else {
+ smsg = rmr_wh_send_msg( ctx, ctx->rtg_whid, smsg );
+ }
if( (state = smsg->state) != RMR_OK ) {
rmr_vlog( RMR_VL_WARN, "unable to send table state: %d\n", smsg->state );
rmr_wh_close( ctx, ctx->rtg_whid ); // send failed, assume connection lost
ctx->rtg_whid = -1;
}
- rmr_free_msg( smsg );
+ if( ! use_rts ) {
+ rmr_free_msg( smsg ); // if not our message we must free the leftovers
+ }
}
}
-// ------------------------------------------------------------------------------------------------
+// ---- utility -----------------------------------------------------------------------------------
/*
Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
*/
static char* clip( char* buf ) {
- char* tok;
+ char* tok;
while( *buf && isspace( *buf ) ) { // skip leading whitespace
buf++;
*/
static char* ensure_nlterm( char* buf ) {
char* nb = NULL;
- int len = 1;
+ int len = 0;
- nb = buf;
- if( buf == NULL || (len = strlen( buf )) < 2 ) {
- if( (nb = (char *) malloc( sizeof( char ) * 2 )) != NULL ) {
- *nb = '\n';
- *(nb+1) = 0;
- }
- } else {
- if( buf[len-1] != '\n' ) {
- rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
- if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
- memcpy( nb, buf, len );
- *(nb+len) = '\n'; // insert \n and nil into the two extra bytes we allocated
- *(nb+len+1) = 0;
- }
+ if( buf != NULL ) {
+ len = strlen( buf );
+ }
- free( buf );
- }
+ nb = buf; // default to returning original as is
+ switch( len ) {
+ case 0:
+ nb = strdup( "\n" );
+ break;
+
+ case 1:
+ if( *buf != '\n' ) { // not a newline; realloc
+ rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
+ nb = strdup( " \n" );
+ *nb = *buf;
+ free( buf );
+ }
+ break;
+
+ default:
+ if( buf[len-1] != '\n' ) { // not newline terminated, realloc
+ rmr_vlog( RMR_VL_WARN, "rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
+ if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
+ memcpy( nb, buf, len );
+ *(nb+len) = '\n'; // insert \n and nil into the two extra bytes we allocated
+ *(nb+len+1) = 0;
+ free( buf );
+ }
+ }
+ break;
}
return nb;
}
+/*
+ Roll the new table into the active and the active into the old table. We
+ must have the lock on the active table to do this. It's possible that there
+ is no active table (first load), so we have to account for that (no locking).
+*/
+static void roll_tables( uta_ctx_t* ctx ) {
+
+ if( ctx->rtable != NULL ) { // initially there isn't one, so must check!
+ pthread_mutex_lock( ctx->rtgate ); // must hold lock to move to active
+ ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
+ ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
+ pthread_mutex_unlock( ctx->rtgate );
+ } else {
+ ctx->old_rtable = NULL; // ensure there isn't an old reference
+ ctx->rtable = ctx->new_rtable; // make new the active one
+ }
+
+ ctx->new_rtable = NULL;
+}
+
+// ------------ entry update functions ---------------------------------------------------------------
/*
Given a message type create a route table entry and add to the hash keyed on the
message type. Once in the hash, endpoints can be added with uta_add_ep. Size
*/
static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
rtable_ent_t* rte; // route table entry added
- char* tok;
+ char const* tok;
int ntoks;
uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
- char* tokens[128];
- char* gtokens[64];
+ char* tokens[128];
+ char* gtokens[64];
int i;
int ngtoks; // number of tokens in the group list
int grp; // index into group list
+ int cgidx; // contiguous group index (prevents the addition of a contiguous group without ep)
+ int has_ep = FALSE; // indicates if an endpoint was added in a given round robin group
ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
rr_field = clip( rr_field );
- if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
+ if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
(uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
rte = uta_add_rte( ctx->new_rtable, key, ngtoks ); // get/create entry for this key
rte->mtype = atoi( ts_field ); // capture mtype for debugging
- for( grp = 0; grp < ngtoks; grp++ ) {
- if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) { // remove any referneces to our ip addrs
+ for( grp = 0, cgidx = 0; grp < ngtoks; grp++ ) {
+ if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) { // remove any references to our ip addrs
for( i = 0; i < ntoks; i++ ) {
if( strcmp( tokens[i], ctx->my_name ) != 0 ) { // don't add if it is us -- cannot send to ourself
if( DEBUG > 1 || (vlevel > 1)) rmr_vlog_force( RMR_VL_DEBUG, "add endpoint ts=%s %s\n", ts_field, tokens[i] );
- uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
+ uta_add_ep( ctx->new_rtable, rte, tokens[i], cgidx );
+ has_ep = TRUE;
}
}
+ if( has_ep ) {
+ cgidx++; // only increment to the next contiguous group if the current one has at least one endpoint
+ has_ep = FALSE;
+ }
}
}
}
} else {
if( DEBUG || (vlevel > 2) ) {
- rmr_vlog_force( RMR_VL_DEBUG, "entry not included, sender not matched: %s\n", tokens[1] );
+ rmr_vlog_force( RMR_VL_DEBUG, "build entry: ts_entry not of form msg-type,sender: %s\n", ts_field );
}
}
}
*/
static void trash_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, int vlevel ) {
rtable_ent_t* rte; // route table entry to be 'deleted'
- char* tok;
+ char const* tok;
int ntoks;
uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
- char* tokens[128];
+ char* tokens[128];
if( ctx == NULL || ctx->new_rtable == NULL || ctx->new_rtable->hash == NULL ) {
return;
ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
- if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
+ if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
(uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
}
}
+// -------------------------- parse functions --------------------------------------------------
+
/*
Given the tokens from an mme_ar (meid add/replace) entry, add the entries.
the 'owner' which should be the dns name or IP address of an enpoint
to send messages to.
*/
static void parse_meid_ar( route_table_t* rtab, char* owner, char* meid_list, int vlevel ) {
- char* tok;
+ char const* tok;
int ntoks;
- char* tokens[128];
+ char* tokens[128];
int i;
int state;
endpoint_t* ep; // endpoint struct for the owner
This function assumes the caller has vetted the pointers as needed.
*/
static void parse_meid_del( route_table_t* rtab, char* meid_list, int vlevel ) {
- char* tok;
+ char const* tok;
int ntoks;
- char* tokens[128];
+ char* tokens[128];
int i;
if( rtab->hash == NULL ) {
/*
Parse a partially parsed meid record. Tokens[0] should be one of:
meid_map, mme_ar, mme_del.
+
+ pctx is the private context needed to return an ack/nack using the provided
+ message buffer with the route managers address info.
*/
-static void meid_parser( uta_ctx_t* ctx, char** tokens, int ntoks, int vlevel ) {
+static void meid_parser( uta_ctx_t* ctx, uta_ctx_t* pctx, rmr_mbuf_t* mbuf, char** tokens, int ntoks, int vlevel ) {
+ char wbuf[1024];
+
if( tokens == NULL || ntoks < 1 ) {
return; // silent but should never happen
}
if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid map start: dropping incomplete table\n" );
uta_rt_drop( ctx->new_rtable );
+ ctx->new_rtable = NULL;
+ send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as and never made it
+ }
+
+ if( ctx->table_id != NULL ) {
+ free( ctx->table_id );
+ }
+ if( ntoks > 2 ) {
+ ctx->table_id = strdup( clip( tokens[2] ) );
+ } else {
+ ctx->table_id = NULL;
}
- ctx->new_rtable = uta_rt_clone_all( ctx->rtable ); // start with a clone of everything (mtype, endpoint refs and meid)
+ ctx->new_rtable = prep_new_rt( ctx, ALL ); // start with a clone of everything (mtype, endpoint refs and meid)
ctx->new_rtable->mupdates = 0;
+
if( DEBUG || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "meid_parse: meid map start found\n" );
} else {
if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
if( ntoks > 2 ) { // meid_map | end | <count> |??? given
if( ctx->new_rtable->mupdates != atoi( tokens[2] ) ) { // count they added didn't match what we received
- rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n", ctx->new_rtable->mupdates, tokens[2] );
+ rmr_vlog( RMR_VL_ERR, "meid_parse: meid map update had wrong number of records: received %d expected %s\n",
+ ctx->new_rtable->mupdates, tokens[2] );
+ snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
+ send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
uta_rt_drop( ctx->new_rtable );
ctx->new_rtable = NULL;
return;
}
if( ctx->new_rtable ) {
- uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
- ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
- ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
- ctx->new_rtable = NULL;
+ roll_tables( ctx ); // roll active to old, and new to active with proper locking
if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of meid map noticed\n" );
+ send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
if( vlevel > 0 ) {
- rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
- rt_stats( ctx->old_rtable );
+ if( ctx->old_rtable != NULL ) {
+ rmr_vlog_force( RMR_VL_DEBUG, "old route table: (ref_count=%d)\n", ctx->old_rtable->ref_count );
+ rt_stats( ctx->old_rtable );
+ } else {
+ rmr_vlog_force( RMR_VL_DEBUG, "old route table was empty\n" );
+ }
rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
rt_stats( ctx->rtable );
}
}
return;
- }
+ }
- if( ! ctx->new_rtable ) { // for any other mmap entries, there must be a table in progress or we punt
+ if( ! ctx->new_rtable ) { // for any other mmap entries, there must be a table in progress or we punt
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "meid update/delte (%s) encountered, but table update not started\n", tokens[0] );
return;
}
}
parse_meid_ar( ctx->new_rtable, tokens[1], tokens[2], vlevel );
ctx->new_rtable->mupdates++;
+ return;
}
- if( strcmp( tokens[0], "mme_del" ) == 0 ) {
- if( ntoks < 2 ) {
- rmr_vlog( RMR_VL_ERR, "meid_parse: mme_del record didn't have enough tokens\n" );
- return;
- }
+ if( strcmp( tokens[0], "mme_del" ) == 0 ) { // ntoks < 2 already validated
parse_meid_del( ctx->new_rtable, tokens[1], vlevel );
ctx->new_rtable->mupdates++;
+ return;
}
}
messages back to the route manager. The regular ctx is the ctx that
the user has been given and thus that's where we have to hang the route
table we're working with.
+
+ If mbuf is given, and we need to ack, then we ack using the mbuf and a
+ return to sender call (allows route manager to use wh_call() to send
+ an update and rts is required to get that back to the right thread).
+ If mbuf is nil, then one will be allocated (in ack) and a normal wh_send
+ will be used.
*/
-static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vlevel ) {
+static void parse_rt_rec( uta_ctx_t* ctx, uta_ctx_t* pctx, char* buf, int vlevel, rmr_mbuf_t* mbuf ) {
int i;
int ntoks; // number of tokens found in something
int ngtoks;
int grp; // group number
- rtable_ent_t* rte; // route table entry added
+ rtable_ent_t const* rte; // route table entry added
char* tokens[128];
char* tok; // pointer into a token or string
char wbuf[1024];
for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
*(tok+1) = 0;
+ memset( tokens, 0, sizeof( tokens ) );
if( (ntoks = uta_tokenise( buf, tokens, 128, '|' )) > 0 ) {
tokens[0] = clip( tokens[0] );
switch( *(tokens[0]) ) {
rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
ctx->new_rtable->updates, tokens[2] );
snprintf( wbuf, sizeof( wbuf ), "missing table records: expected %s got %d\n", tokens[2], ctx->new_rtable->updates );
- send_rt_ack( pctx, ctx->table_id, !RMR_OK, wbuf );
+ send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
uta_rt_drop( ctx->new_rtable );
ctx->new_rtable = NULL;
break;
}
if( ctx->new_rtable ) {
- uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
- ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
- ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
- ctx->new_rtable = NULL;
- if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
-
- if( vlevel > 0 ) {
- rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
- rt_stats( ctx->old_rtable );
- rmr_vlog_force( RMR_VL_DEBUG, "new route table:\n" );
- rt_stats( ctx->rtable );
+ roll_tables( ctx ); // roll active to old, and new to active with proper locking
+ if( DEBUG > 1 || (vlevel > 1) ) {
+ rmr_vlog( RMR_VL_DEBUG, "end of route table noticed\n" );
+ dump_tables( ctx );
}
- send_rt_ack( pctx, ctx->table_id, RMR_OK, NULL );
+ send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
+ ctx->rtable_ready = 1; // route based sends can now happen
} else {
if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of route table noticed, but one was not started!\n" );
ctx->new_rtable = NULL;
}
} else { // start a new table.
if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
- send_rt_ack( pctx, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as end never made it
+ send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as end never made it
if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
uta_rt_drop( ctx->new_rtable );
+ ctx->new_rtable = NULL;
}
if( ctx->table_id != NULL ) {
ctx->table_id = NULL;
}
- ctx->new_rtable = NULL;
- ctx->new_rtable = uta_rt_clone( ctx->rtable ); // create by cloning endpoint and meidtentries from active table
+ ctx->new_rtable = prep_new_rt( ctx, SOME ); // wait for old table to drain and shift it back to new
ctx->new_rtable->updates = 0; // init count of entries received
+
if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "start of route table noticed\n" );
}
break;
- case 'm': // mse entry or one of the meid_ records
+ case 'm': // mse entry or one of the meid_ records
if( strcmp( tokens[0], "mse" ) == 0 ) {
if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
break;
build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
ctx->new_rtable->updates++;
} else {
- meid_parser( ctx, tokens, ntoks, vlevel );
+ meid_parser( ctx, pctx, mbuf, tokens, ntoks, vlevel );
}
break;
if( ctx->new_rtable->updates != atoi( tokens[2] ) ) { // count they added didn't match what we received
rmr_vlog( RMR_VL_ERR, "rmr_rtc: RT update had wrong number of records: received %d expected %s\n",
ctx->new_rtable->updates, tokens[2] );
+ send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, wbuf );
uta_rt_drop( ctx->new_rtable );
ctx->new_rtable = NULL;
break;
}
if( ctx->new_rtable ) {
- uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
- ctx->old_rtable = ctx->rtable; // currently active becomes old and allowed to 'drain'
- ctx->rtable = ctx->new_rtable; // one we've been adding to becomes active
- ctx->new_rtable = NULL;
- if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
-
- if( vlevel > 0 ) {
- rmr_vlog_force( RMR_VL_DEBUG, "old route table:\n" );
- rt_stats( ctx->old_rtable );
- rmr_vlog_force( RMR_VL_DEBUG, "updated route table:\n" );
- rt_stats( ctx->rtable );
+ roll_tables( ctx ); // roll active to old, and new to active with proper locking
+ if( DEBUG > 1 || (vlevel > 1) ) {
+ rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed\n" );
+ dump_tables( ctx );
}
+
+ send_rt_ack( pctx, mbuf, ctx->table_id, RMR_OK, NULL );
+ ctx->rtable_ready = 1; // route based sends can now happen
} else {
if( DEBUG > 1 ) rmr_vlog_force( RMR_VL_DEBUG, "end of rt update noticed, but one was not started!\n" );
ctx->new_rtable = NULL;
} else { // start a new table.
if( ctx->new_rtable != NULL ) { // one in progress? this forces it out
if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "new table; dropping incomplete table\n" );
+ send_rt_ack( pctx, mbuf, ctx->table_id, !RMR_OK, "table not complete" ); // nack the one that was pending as end never made it
uta_rt_drop( ctx->new_rtable );
+ ctx->new_rtable = NULL;
}
- if( ntoks >2 ) {
+ if( ntoks > 2 ) {
if( ctx->table_id != NULL ) {
free( ctx->table_id );
}
ctx->table_id = strdup( clip( tokens[2] ) );
}
- ctx->new_rtable = uta_rt_clone_all( ctx->rtable ); // start with a clone of everything (endpts and entries)
- ctx->new_rtable->updates = 0; // init count of updates received
+ ctx->new_rtable = prep_new_rt( ctx, ALL ); // start with a copy of everything in the live table
+ ctx->new_rtable->updates = 0; // init count of updates received
+
if( DEBUG > 1 || (vlevel > 1) ) rmr_vlog_force( RMR_VL_DEBUG, "start of rt update noticed\n" );
}
break;
return;
}
- parse_rt_rec( ctx, NULL, rec, vlevel ); // no pvt context as we can't ack
+ parse_rt_rec( ctx, NULL, rec, vlevel, NULL ); // no pvt context as we can't ack
rec = eor+1;
}
}
if( thing == NULL ) {
+ rmr_vlog_force( RMR_VL_DEBUG, "collect things given nil thing: %p %p %p\n", st, entry, name ); // dummy ref for sonar
return;
}
int i;
if( (rte = (rtable_ent_t *) thing) == NULL ) {
+ rmr_vlog_force( RMR_VL_DEBUG, "delrte given nil table: %p %p %p\n", st, entry, name ); // dummy ref for sonar
return;
}
for( i = 0; i < rte->nrrgroups; i++ ) {
if( rte->rrgroups[i] ) {
free( rte->rrgroups[i]->epts ); // ditch list of endpoint pointers (end points are reused; don't trash them)
+ free( rte->rrgroups[i] ); // but must free the rrg itself too
}
+
}
free( rte->rrgroups );
an empty buffer, as opposed to a nil, so the caller can generate defaults
or error if an empty/missing file isn't tolerated.
*/
-static char* uta_fib( char* fname ) {
+static char* uta_fib( char const* fname ) {
struct stat stats;
off_t fsize = 8192; // size of the file
off_t nread; // number of bytes read
fsize = stats.st_size; // stat ok, save the file size
}
} else {
- fsize = 8192; // stat failed, we'll leave the file open and try to read a default max of 8k
+ fsize = 8192; // stat failed, we'll leave the file open and try to read a default max of 8k
}
}
return buf;
}
+// --------------------- initialisation/creation ---------------------------------------------
/*
Create and initialise a route table; Returns a pointer to the table struct.
*/
-static route_table_t* uta_rt_init( ) {
+static route_table_t* uta_rt_init( uta_ctx_t* ctx ) {
route_table_t* rt;
+ if( ctx == NULL ) {
+ return NULL;
+ }
if( (rt = (route_table_t *) malloc( sizeof( route_table_t ) )) == NULL ) {
return NULL;
}
+ memset( rt, 0, sizeof( *rt ) );
+
if( (rt->hash = rmr_sym_alloc( RT_SIZE )) == NULL ) {
free( rt );
return NULL;
}
+ rt->gate = ctx->rtgate; // single mutex needed for all route tables
+ rt->ephash = ctx->ephash; // all route tables share a common endpoint hash
+ pthread_mutex_init( rt->gate, NULL );
+
return rt;
}
Space is the space in the old table to copy. Space 0 uses an integer key and
references rte structs. All other spaces use a string key and reference endpoints.
*/
-static route_table_t* rt_clone_space( route_table_t* srt, route_table_t* nrt, int space ) {
- endpoint_t* ep; // an endpoint
- rtable_ent_t* rte; // a route table entry
+static route_table_t* rt_clone_space( uta_ctx_t* ctx, route_table_t* srt, route_table_t* nrt, int space ) {
+ endpoint_t* ep; // an endpoint (ignore sonar complaint about const*)
+ rtable_ent_t* rte; // a route table entry (ignore sonar complaint about const*)
void* sst; // source symtab
void* nst; // new symtab
thing_list_t things; // things from the space to copy
int i;
int free_on_err = 0;
+ if( ctx == NULL ) {
+ return NULL;
+ }
if( nrt == NULL ) { // make a new table if needed
free_on_err = 1;
- nrt = uta_rt_init();
+ nrt = uta_rt_init( ctx );
+ if( nrt == NULL ) {
+ return NULL;
+ }
}
if( srt == NULL ) { // source was nil, just give back the new table
things.nused = 0;
things.things = (void **) malloc( sizeof( void * ) * things.nalloc );
things.names = (const char **) malloc( sizeof( char * ) * things.nalloc );
- if( things.things == NULL ) {
+ if( things.things == NULL || things.names == NULL ){
+ if( things.things != NULL) { free( things.things ); }
+ if( things.names != NULL) { free( things.names ); }
+
if( free_on_err ) {
- free( nrt->hash );
+ rmr_sym_free( nrt->hash );
free( nrt );
nrt = NULL;
}
return nrt;
}
+ memset( things.things, 0, sizeof( sizeof( void * ) * things.nalloc ) );
+ memset( things.names, 0, sizeof( char * ) * things.nalloc );
sst = srt->hash; // convenience pointers (src symtab)
nst = nrt->hash;
}
/*
- Creates a new route table and then clones the parts of the table which we must keep with each newrt|start.
- The endpoint and meid entries in the hash must be preserved.
+ Given a destination route table (drt), clone from the source (srt) into it.
+ If drt is nil, alloc a new one. If srt is nil, then nothing is done (except to
+ allocate the drt if that was nil too). If all is true (1), then we will clone both
+ the MT and the ME spaces; otherwise only the ME space is cloned.
*/
-static route_table_t* uta_rt_clone( route_table_t* srt ) {
+static route_table_t* uta_rt_clone( uta_ctx_t* ctx, route_table_t* srt, route_table_t* drt, int all ) {
endpoint_t* ep; // an endpoint
rtable_ent_t* rte; // a route table entry
- route_table_t* nrt = NULL; // new route table
int i;
+ if( ctx == NULL ) {
+ return NULL;
+ }
+ if( drt == NULL ) {
+ drt = uta_rt_init( ctx );
+ }
if( srt == NULL ) {
- return uta_rt_init(); // no source to clone, just return an empty table
+ return drt;
}
- nrt = rt_clone_space( srt, nrt, RT_NAME_SPACE ); // allocate a new one, add endpoint refs
- rt_clone_space( srt, nrt, RT_ME_SPACE ); // add meid refs to new
+ drt->ephash = ctx->ephash; // all rts reference the same EP symtab
+ rt_clone_space( ctx, srt, drt, RT_ME_SPACE );
+ if( all ) {
+ rt_clone_space( ctx, srt, drt, RT_MT_SPACE );
+ }
- return nrt;
+ return drt;
}
/*
- Creates a new route table and then clones _all_ of the given route table (references
- both endpoints AND the route table entries. Needed to support a partial update where
- some route table entries will not be deleted if not explicitly in the update and when
- we are adding/replacing meid references.
+ Prepares the "new" route table for populating. If the old_rtable is not nil, then
+ we wait for it's use count to reach 0. Then the table is cleared, and moved on the
+ context to be referenced by the new pointer; the old pointer is set to nil.
+
+ If the old table doesn't exist, then a new table is created and the new pointer is
+ set to reference it.
*/
-static route_table_t* uta_rt_clone_all( route_table_t* srt ) {
- endpoint_t* ep; // an endpoint
- rtable_ent_t* rte; // a route table entry
- route_table_t* nrt = NULL; // new route table
- int i;
+static route_table_t* prep_new_rt( uta_ctx_t* ctx, int all ) {
+ int counter = 0;
+ route_table_t* rt;
- if( srt == NULL ) {
- return uta_rt_init(); // no source to clone, just return an empty table
+ if( ctx == NULL ) {
+ return NULL;
}
- nrt = rt_clone_space( srt, nrt, RT_MT_SPACE ); // create new, clone all spaces to it
- rt_clone_space( srt, nrt, RT_NAME_SPACE );
- rt_clone_space( srt, nrt, RT_ME_SPACE );
+ if( (rt = ctx->old_rtable) != NULL ) {
+ ctx->old_rtable = NULL;
+ while( rt->ref_count > 0 ) { // wait for all who are using to stop
+ if( counter++ > 1000 ) {
+ rmr_vlog( RMR_VL_WARN, "rt_prep_newrt: internal mishap, ref count on table seems wedged" );
+ break;
+ }
- return nrt;
+ usleep( 1000 ); // small sleep to yield the processer if that is needed
+ }
+
+ if( rt->hash != NULL ) {
+ rmr_sym_foreach_class( rt->hash, 0, del_rte, NULL ); // deref and drop if needed
+ rmr_sym_clear( rt->hash ); // clear all entries from the old table
+ }
+ } else {
+ rt = NULL;
+ }
+
+ rt = uta_rt_clone( ctx, ctx->rtable, rt, all ); // also sets the ephash pointer
+ rt->ref_count = 0; // take no chances; ensure it's 0!
+
+ return rt;
}
+
/*
Given a name, find the endpoint struct in the provided route table.
*/
static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name ) {
- if( rt == NULL || rt->hash == NULL || ep_name == NULL || *ep_name == 0 ) {
+ if( rt == NULL || rt->ephash == NULL || ep_name == NULL || *ep_name == 0 ) {
return NULL;
}
- return rmr_sym_get( rt->hash, ep_name, 1 );
+ return rmr_sym_get( rt->ephash, ep_name, 1 );
}
/*
Drop the given route table. Purge all type 0 entries, then drop the symtab itself.
+ Does NOT destroy the gate as it's a common gate for ALL route tables.
*/
static void uta_rt_drop( route_table_t* rt ) {
if( rt == NULL ) {
return NULL;
}
- if( (ep = uta_get_ep( rt, ep_name )) == NULL ) { // not there yet, make
+ if( (ep = uta_get_ep( rt, ep_name )) == NULL ) { // not there yet, make
if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
rmr_vlog( RMR_VL_WARN, "rt_ensure: malloc failed for endpoint creation: %s\n", ep_name );
errno = ENOMEM;
pthread_mutex_init( &ep->gate, NULL ); // init with default attrs
memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
- rmr_sym_put( rt->hash, ep_name, 1, ep );
+ rmr_sym_put( rt->ephash, ep_name, 1, ep );
}
return ep;
Given a route table and meid string, find the owner (if known). Returns a pointer to
the endpoint struct or nil.
*/
-static inline endpoint_t* get_meid_owner( route_table_t *rt, char* meid ) {
- endpoint_t* ep; // the ep we found in the hash
+static inline endpoint_t* get_meid_owner( route_table_t *rt, char const* meid ) {
+ endpoint_t const* ep; // the ep we found in the hash
if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) {
return NULL;
}
- return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE );
+ return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE );
}
+/*
+ This returns a pointer to the currently active route table and ups
+ the reference count so that the route table is not freed while it
+ is being used. The caller MUST call release_rt() when finished
+ with the pointer.
+
+ Care must be taken: the ctx->rtable pointer _could_ change during the time
+ between the release of the lock and the return. Therefore we MUST grab
+ the current pointer when we have the lock so that if it does we don't
+ return a pointer to the wrong table.
+
+ This will return NULL if there is no active table.
+*/
+static inline route_table_t* get_rt( uta_ctx_t* ctx ) {
+ route_table_t* rrt; // return value
+
+ if( ctx == NULL || ctx->rtable == NULL ) {
+ return NULL;
+ }
+
+ pthread_mutex_lock( ctx->rtgate ); // must hold lock to bump use
+ rrt = ctx->rtable; // must stash the pointer while we hold lock
+ rrt->ref_count++;
+ pthread_mutex_unlock( ctx->rtgate );
+
+ return rrt; // pointer we upped the count with
+}
+
+/*
+ This will "release" the route table by reducing the use counter
+ in the table. The table may not be freed until the counter reaches
+ 0, so it's imparative that the pointer be "released" when it is
+ fetched by get_rt(). Once the caller has released the table it
+ may not safely use the pointer that it had.
+*/
+static inline void release_rt( uta_ctx_t* ctx, route_table_t* rt ) {
+ if( ctx == NULL || rt == NULL ) {
+ return;
+ }
+
+ pthread_mutex_lock( ctx->rtgate ); // must hold lock
+ if( rt->ref_count > 0 ) { // something smells if it's already 0, don't do antyhing if it is
+ rt->ref_count--;
+ }
+ pthread_mutex_unlock( ctx->rtgate );
+}
#endif