// ---- debugging/testing -------------------------------------------------------------------------
/*
- Dump stats for an endpoint in the RT.
+ Dump some stats for an endpoint in the RT. This is generally called to
+ verify endpoints after a table load/change.
*/
static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
int* counter;
(*counter)++;
}
- fprintf( stderr, "[DBUG] endpoint: %s open=%d\n", ep->name, ep->open );
+ fprintf( stderr, "[DBUG] RMR sends: target=%s open=%d\n", ep->name, ep->open );
+}
+
+/*
+ Dump counts for an endpoint in the RT. The vid parm is assumed to point to
+ the 'source' information and is added to each message.
+*/
+static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
+ endpoint_t* ep;
+ char* id;
+
+ if( (ep = (endpoint_t *) thing) == NULL ) {
+ return;
+ }
+
+ if( (id = (char *) vid) == NULL ) {
+ id = "missing";
+ }
+
+ fprintf( stderr, "[INFO] RMR sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n",
+ (long long) time( NULL ),
+ id,
+ ep->name,
+ ep->open,
+ ep->scounts[EPSC_GOOD],
+ ep->scounts[EPSC_FAIL] + ep->scounts[EPSC_TRANS],
+ ep->scounts[EPSC_FAIL],
+ ep->scounts[EPSC_TRANS] );
}
/*
free( counter );
}
+/*
+ Given a route table, cause endpoint counters to be written to stderr. The id
+ parm is written as the "source" in the output.
+*/
+static void rt_epcounts( route_table_t* rt, char* id ) {
+ if( rt == NULL ) {
+ fprintf( stderr, "[INFO] RMR endpoint: no counts: empty table\n" );
+ return;
+ }
+
+ rmr_sym_foreach_class( rt->hash, 1, ep_counts, id ); // run endpoints in the active table
+}
+
// ------------------------------------------------------------------------------------------------
/*
return buf;
}
+/*
+ This accepts a pointer to a nil terminated string, and ensures that there is a
+ newline as the last character. If there is not, a new buffer is allocated and
+ the newline is added. If a new buffer is allocated, the buffer passed in is
+ freed. The function returns a pointer which the caller should use, and must
+ free. In the event of an error, a nil pointer is returned.
+*/
+static char* ensure_nlterm( char* buf ) {
+ char* nb = NULL;
+ int len = 1;
+
+
+ nb = buf;
+ if( buf == NULL || (len = strlen( buf )) < 2 ) {
+ if( (nb = (char *) malloc( sizeof( char ) * 2 )) != NULL ) {
+ *nb = '\n';
+ *(nb+1) = 0;
+ }
+ } else {
+ if( buf[len-1] != '\n' ) {
+ fprintf( stderr, "[WRN] rmr buf_check: input buffer was not newline terminated (file missing final \\n?)\n" );
+ if( (nb = (char *) malloc( sizeof( char ) * (len + 2) )) != NULL ) {
+ memcpy( nb, buf, len );
+ *(nb+len) = '\n'; // insert \n and nil into the two extra bytes we allocated
+ *(nb+len+1) = 0;
+ }
+
+ free( buf );
+ }
+ }
+
+ return nb;
+}
/*
Given a message type create a route table entry and add to the hash keyed on the
}
if( (rte = (rtable_ent_t *) malloc( sizeof( *rte ) )) == NULL ) {
- fprintf( stderr, "rmr_add_rte: malloc failed for entry\n" );
+ fprintf( stderr, "[ERR] rmr_add_rte: malloc failed for entry\n" );
return NULL;
}
memset( rte, 0, sizeof( *rte ) );
rmr_sym_map( rt->hash, key, rte ); // add to hash using numeric mtype as key
- if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: k=%lx groups=%d\n", key, nrrgroups );
+ if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: k=%llx groups=%d\n", (long long) key, nrrgroups );
return rte;
}
rte = uta_add_rte( ctx->new_rtable, key, ngtoks ); // get/create entry for this key
for( grp = 0; grp < ngtoks; grp++ ) {
- if( (ntoks = uta_tokenise( gtokens[grp], tokens, 64, ',' )) > 0 ) {
+ if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) { // remove any referneces to our ip addrs
for( i = 0; i < ntoks; i++ ) {
- if( DEBUG > 1 || (vlevel > 1)) fprintf( stderr, "[DBUG] add endpoint %s\n", ts_field );
- uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
+ if( strcmp( tokens[i], ctx->my_name ) != 0 ) { // don't add if it is us -- cannot send to ourself
+ if( DEBUG > 1 || (vlevel > 1)) fprintf( stderr, "[DBUG] add endpoint ts=%s %s\n", ts_field, tokens[i] );
+ uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
+ }
}
}
}
}
} else {
- if( DEBUG || (vlevel > 2) )
+ if( DEBUG || (vlevel > 2) ) {
fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] );
+ }
}
}
return;
}
- if( (fbuf = uta_fib( fname ) ) == NULL ) { // read file into a single buffer (nil terminated string)
+ if( (fbuf = ensure_nlterm( uta_fib( fname ) ) ) == NULL ) { // read file into a single buffer (nil terminated string)
fprintf( stderr, "[WRN] rmr read_static: seed route table could not be opened: %s: %s\n", fname, strerror( errno ) );
return;
}
if( (eor = strchr( rec, '\n' )) != NULL ) {
*eor = 0;
} else {
- fprintf( stderr, "[WARN] rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
- fprintf( stderr, "[WARN] rmr read_static: seed route table not used%s\n", fname );
+ fprintf( stderr, "[WRN] rmr read_static: seed route table had malformed records (missing newline): %s\n", fname );
+ fprintf( stderr, "[WRN] rmr read_static: seed route table not used: %s\n", fname );
free( fbuf );
return;
}
endpoint_t* ep;
if( !rt || !ep_name || ! *ep_name ) {
- fprintf( stderr, "[WARN] rmr: rt_ensure: internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
+ fprintf( stderr, "[WRN] rmr: rt_ensure: internal mishap, something undefined rt=%p ep_name=%p\n", rt, ep_name );
errno = EINVAL;
return NULL;
}
if( (ep = uta_get_ep( rt, ep_name )) == NULL ) { // not there yet, make
if( (ep = (endpoint_t *) malloc( sizeof( *ep ) )) == NULL ) {
- fprintf( stderr, "[WARN] rmr: rt_ensure: malloc failed for endpoint creation: %s\n", ep_name );
+ fprintf( stderr, "[WRN] rmr: rt_ensure: malloc failed for endpoint creation: %s\n", ep_name );
errno = ENOMEM;
return NULL;
}
ep->addr = uta_h2ip( ep_name );
ep->name = strdup( ep_name );
pthread_mutex_init( &ep->gate, NULL ); // init with default attrs
+ memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
rmr_sym_put( rt->hash, ep_name, 1, ep );
}