cmake_minimum_required( VERSION 3.5 )
set( major_version "1" ) # should be automatically populated from git tag later, but until CI process sets a tag we use this
-set( minor_version "9" )
-set( patch_level "1" )
+set( minor_version "10" )
+set( patch_level "0" )
set( install_root "${CMAKE_INSTALL_PREFIX}" )
set( install_inc "include/rmr" )
#define UNSET_SUBID (-1) // initial value on msg allocation indicating not set
#define UNSET_MSGTYPE (-1)
+ // index values into the send counters for an enpoint
+#define EPSC_GOOD 0 // successful send
+#define EPSC_FAIL 1 // hard failurs
+#define EPSC_TRANS 2 // transient/soft faiures
+#define EPSC_SIZE 3 // number of counters
+
// -- header length/offset macros must ensure network conversion ----
#define RMR_HDR_LEN(h) (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3)) // ALL things, not just formal struct
#define RMR_TR_LEN(h) (ntohl(((uta_mhdr_t *)h)->len1))
// ---- debugging/testing -------------------------------------------------------------------------
/*
- Dump stats for an endpoint in the RT.
+ Dump some stats for an endpoint in the RT. This is generally called to
+ verify endpoints after a table load/change.
*/
static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
int* counter;
(*counter)++;
}
- fprintf( stderr, "[DBUG] endpoint: %s open=%d\n", ep->name, ep->open );
+ fprintf( stderr, "[DBUG] RMR sends: target=%s open=%d\n", ep->name, ep->open );
+}
+
+/*
+ Dump counts for an endpoint in the RT. The vid parm is assumed to point to
+ the 'source' information and is added to each message.
+*/
+static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
+ endpoint_t* ep;
+ char* id;
+
+ if( (ep = (endpoint_t *) thing) == NULL ) {
+ return;
+ }
+
+ if( (id = (char *) vid) == NULL ) {
+ id = "missing";
+ }
+
+ fprintf( stderr, "[INFO] RMR sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n",
+ (long long) time( NULL ),
+ id,
+ ep->name,
+ ep->open,
+ ep->scounts[EPSC_GOOD],
+ ep->scounts[EPSC_FAIL] + ep->scounts[EPSC_TRANS],
+ ep->scounts[EPSC_FAIL],
+ ep->scounts[EPSC_TRANS] );
}
/*
free( counter );
}
+/*
+ Given a route table, cause endpoint counters to be written to stderr. The id
+ parm is written as the "source" in the output.
+*/
+static void rt_epcounts( route_table_t* rt, char* id ) {
+ if( rt == NULL ) {
+ fprintf( stderr, "[INFO] RMR endpoint: no counts: empty table\n" );
+ return;
+ }
+
+ rmr_sym_foreach_class( rt->hash, 1, ep_counts, id ); // run endpoints in the active table
+}
+
// ------------------------------------------------------------------------------------------------
/*
ep->addr = uta_h2ip( ep_name );
ep->name = strdup( ep_name );
pthread_mutex_init( &ep->gate, NULL ); // init with default attrs
+ memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
rmr_sym_put( rt->hash, ep_name, 1, ep );
}
records, but each buffer must be less than 4K in length, and the last record in a
buffere may NOT be split across buffers.
+ Other chores:
+ In addition to the primary task of getting, vetting, and installing a new route table, or
+ updates to the existing table, this thread will periodically cause the send counts for each
+ endpoint known to be written to standard error. The frequency is once every 180 seconds, and
+ more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
*/
static void* rtc( void* vctx ) {
uta_ctx_t* ctx; // context user has -- where we pin the route table
int vfd = -1; // verbose file des if we have one
int vlevel = 0; // how chatty we should be 0== no nattering allowed
char* eptr;
+ int epfd = -1; // fd for epoll so we can multi-task
+ struct epoll_event events[1]; // list of events to give to epoll; we only have one we care about
+ struct epoll_event epe; // event definition for event to listen to
+ int rcv_fd = -1; // pollable file des from NNG to use for timeout
+ int count_delay = 30; // number of seconds between writing count info; initially every 30s
+ int bump_freq = 0; // time at which we will bump count frequency to every 5 minutes
+
if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
fprintf( stderr, "[CRI] rmr_rtc: internal mishap: context passed in was nil\n" );
read( vfd, wbuf, 10 );
vlevel = atoi( wbuf );
}
- }
+ }
read_static_rt( ctx, vlevel ); // seed the route table if one provided
if( (pvt_cx = init( port, MAX_RTG_MSG_SZ, FL_NOTHREAD )) == NULL ) { // open a private context
fprintf( stderr, "[CRI] rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
- free( fport );
+
+ while( TRUE ) { // no listen port, just dump counts now and then
+ sleep( count_delay );
+ rt_epcounts( ctx->rtable, ctx->my_name );
+ }
+
+ free( fport ); // parinoid free and return
return NULL;
}
+ if( (rcv_fd = rmr_get_rcvfd( pvt_cx )) >= 0 ) { // get the epoll fd for the rtg socket
+ if( rcv_fd < 0 ) {
+ fprintf( stderr, "[WARN] cannot get epoll fd for rtg session; stats will generate only after update from rt manager\n" );
+ } else {
+ if( (epfd = epoll_create1( 0 )) < 0 ) {
+ fprintf( stderr, "[WARN] stats will generate only after rt manager update; unable to create epoll fd for rtg session: %s\n", strerror( errno ) );
+ rcv_fd = -1;
+ } else {
+ epe.events = EPOLLIN;
+ epe.data.fd = rcv_fd;
+
+ if( epoll_ctl( epfd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 ) {
+ fprintf( stderr, "[WARN] stats will generate only after rt manager update; unable to init epoll_ctl: %s\n", strerror( errno ) );
+ rcv_fd = -1;
+ }
+ }
+ }
+ }
+
if( DEBUG ) fprintf( stderr, "[DBUG] rtc thread is running and listening; listening for rtg conns on %s\n", port );
free( fport );
// future: if we need to register with the rtg, then build a message and send it through a wormhole here
+ bump_freq = time( NULL ) + 300; // after 5 minutes we decrease the count frequency
blabber = 0;
while( 1 ) { // until the cows return, pigs fly, or somesuch event likely not to happen
- if( raw_interface ) {
- msg = (rmr_mbuf_t *) rcv_payload( pvt_cx, msg ); // receive from non-RMr sender
- } else {
- msg = rmr_rcv_msg( pvt_cx, msg ); // receive from an RMr sender
+ while( msg == NULL || msg->len <= 0 ) { // until we actually have something from the other side
+ if( rcv_fd < 0 || epoll_wait( epfd, events, 1, 1000 ) > 0 ) { // skip epoll if init failed, else block for max 1 sec
+ if( raw_interface ) {
+ msg = (rmr_mbuf_t *) rcv_payload( pvt_cx, msg ); // receive from non-RMr sender
+ } else {
+ msg = rmr_rcv_msg( pvt_cx, msg ); // receive from an RMr sender
+ }
+ } else { // no msg, do extra tasks
+ if( msg != NULL ) { // if we were working with a message; ensure no len
+ msg->len = 0;
+ msg->state = RMR_ERR_TIMEOUT;
+ }
+ }
+
+ if( time( NULL ) > blabber ) {
+ blabber = time( NULL ) + count_delay; // set next time to blabber, then do so
+ if( blabber > bump_freq ) {
+ count_delay = 300;
+ }
+ rt_epcounts( ctx->rtable, ctx->my_name );
+ }
}
- if( vfd >= 0 ) { // if changed since last go round
+ if( vfd >= 0 ) { // if file open, check for change to vlevel
wbuf[0] = 0;
lseek( vfd, 0, 0 );
read( vfd, wbuf, 10 );
if( ctx->shutdown ) { // mostly for testing, but allows user app to close us down if rmr_*() function sets this
break;
}
- } else {
- if( time( NULL ) > blabber ) {
- fprintf( stderr, "[WRN] rmr_rtc: nil buffer, or 0 len msg, received from rtg\n" );
- blabber = time( NULL ) + 180; // limit to 1 every 3 min or so
- }
+
+ msg->len = 0; // force back into the listen loop
}
}
-// : vi ts=4 sw=4 noet :
+// vim: ts=4 sw=4 noet :
/*
==================================================================================
Copyright (c) 2019 Nokia
nng_dialer dialer; // the connection specific information (retry timout etc)
int open; // set to true if we've connected as socket cannot be checked directly)
pthread_mutex_t gate; // we must serialise when we open/link to the endpoint
+ long long scounts[EPSC_SIZE]; // send counts (indexed by EPSCOUNT_* constants
};
/*
// --- rt table things ---------------------------
static int uta_link2( endpoint_t* ep );
static int rt_link2_ep( endpoint_t* ep );
-static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock );
-static int uta_epsock_rr( rtable_ent_t* rte, int group, int* more, nng_socket* nn_sock );
+static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock, endpoint_t** uepp );
+static int uta_epsock_rr( rtable_ent_t* rte, int group, int* more, nng_socket* nn_sock, endpoint_t** uepp );
static rtable_ent_t* uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt );
static inline int xlate_nng_state( int state, int def_state );
-// : vi ts=4 sw=4 noet :
+// vim: ts=4 sw=4 noet :
/*
==================================================================================
Copyright (c) 2019 Nokia
char* hold_src; // we need the original source if send fails
char* hold_ip; // also must hold original ip
int sock_ok = 0; // true if we found a valid endpoint socket
+ endpoint_t* ep; // end point to track counts
if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
errno = EINVAL; // if msg is null, this is their clue
((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
- sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock ); // src is always used first for rts
+ sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep ); // src is always used first for rts
if( ! sock_ok ) {
if( HDR_VERSION( msg->header ) > 2 ) { // with ver2 the ip is there, try if src name not known
- sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock );
+ sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep );
}
if( ! sock_ok ) {
msg->state = RMR_ERR_NOENDPT;
strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours
msg = send_msg( ctx, msg, nn_sock, -1 );
if( msg ) {
+ if( ep != NULL ) {
+ switch( msg->state ) {
+ case RMR_OK:
+ ep->scounts[EPSC_GOOD]++;
+ break;
+
+ case RMR_ERR_RETRY:
+ ep->scounts[EPSC_TRANS]++;
+ break;
+
+ default:
+ ep->scounts[EPSC_FAIL]++;
+ break;
+ }
+ }
strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC ); // always return original source so rts can be called again
strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC ); // always return original source so rts can be called again
msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source
int state;
if( ! announced ) {
- fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d (%s %s.%s.%s built: %s)\n",
- RMR_MSG_VER, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
+ fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
+ RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
announced = 1;
}
-// : vi ts=4 sw=4 noet :
+// vim: ts=4 sw=4 noet :
/*
==================================================================================
Copyright (c) 2019 Nokia
get things into a bad state if we allow a collision here. The lock grab
only happens on the intial session setup.
*/
-//static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer, pthread_mutex* gate ) {
static int uta_link2( endpoint_t* ep ) {
static int flags = -1;
the user pointer passed in and sets the return value to true (1). If the
endpoint cannot be found false (0) is returned.
*/
-static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock ) {
+static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock, endpoint_t** uepp ) {
endpoint_t* ep;
int state = FALSE;
}
ep = rmr_sym_get( rt->hash, ep_name, 1 );
+ if( uepp != NULL ) { // caller needs endpoint too, give it back
+ *uepp = ep;
+ }
if( ep == NULL ) {
if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s not in hash!\n", ep_name );
if( ! ep_name || (ep = rt_ensure_ep( rt, ep_name)) == NULL ) { // create one if not in rt (support rts without entry in our table)
Both of these, in the grand scheme of things, is minor compared to the
overhead of grabbing a lock on each call.
*/
-static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, nng_socket* nn_sock ) {
- //rtable_ent_t* rte; // matching rt entry
+static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, nng_socket* nn_sock, endpoint_t** uepp ) {
endpoint_t* ep; // seected end point
int state = FALSE; // processing state
int dummy;
break;
}
+ if( uepp != NULL ) { // caller needs refernce to endpoint too
+ *uepp = ep;
+ }
if( state ) { // end point selected, open if not, get socket either way
if( ! ep->open ) { // not connected
if( ep->addr == NULL ) { // name didn't resolve before, try again
return rte;
}
+/*
+ Return a string of count information. E.g.:
+ <ep-name>:<port> <good> <hard-fail> <soft-fail>
+
+ Caller must free the string allocated if a buffer was not provided.
+
+ Pointer returned is to a freshly allocated string, or the user buffer
+ for convenience.
+
+ If the endpoint passed is a nil pointer, then we return a nil -- caller
+ must check!
+*/
+static inline char* get_ep_counts( endpoint_t* ep, char* ubuf, int ubuf_len ) {
+ char* rs; // result string
+
+ if( ep == NULL ) {
+ return NULL;
+ }
+
+ if( ubuf != NULL ) {
+ rs = ubuf;
+ } else {
+ ubuf_len = 256;
+ rs = malloc( sizeof( char ) * ubuf_len );
+ }
+
+ snprintf( rs, ubuf_len, "%s %lld %lld %lld", ep->name, ep->scounts[EPSC_GOOD], ep->scounts[EPSC_FAIL], ep->scounts[EPSC_TRANS] );
+
+ return rs;
+}
+
#endif
-// : vi ts=4 sw=4 noet 2
+// vim: ts=4 sw=4 noet :
/*
==================================================================================
Copyright (c) 2019 Nokia
uta_mhdr_t* hdr;
int nng_flags = NNG_FLAG_NONBLOCK; // if we need to set any nng flags (zc buffer) add it to this
int spin_retries = 1000; // if eagain/timeout we'll spin, at max, this many times before giving up the CPU
- int tr_len; // trace len in sending message so we alloc new message with same trace size
+ int tr_len; // trace len in sending message so we alloc new message with same trace sizes
// future: ensure that application did not overrun the XID buffer; last byte must be 0
strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
}
+ if( retries == 0 ) {
+ spin_retries = 100;
+ retries++;
+ }
+
errno = 0;
msg->state = RMR_OK;
if( msg->flags & MFL_ZEROCOPY ) { // faster sending with zcopy buffer
*/
static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
+ endpoint_t* ep; // end point that we're attempting to send to
rtable_ent_t* rte; // the route table entry which matches the message key
nng_socket nn_sock; // endpoint socket for send
uta_ctx_t* ctx;
errno = 0; // clear; nano might set, but ensure it's not left over if it doesn't
if( msg->header == NULL ) {
- fprintf( stderr, "rmr_send_msg: ERROR: message had no header\n" );
+ fprintf( stderr, "rmr_mtosend_msg: ERROR: message had no header\n" );
msg->state = RMR_ERR_NOHDR;
errno = EBADMSG; // must ensure it's not eagain
msg->tp_state = errno;
send_again = 1; // force loop entry
group = 0; // always start with group 0
while( send_again ) {
- sock_ok = uta_epsock_rr( rte, group, &send_again, &nn_sock ); // select endpt from rr group and set again if more groups
+ sock_ok = uta_epsock_rr( rte, group, &send_again, &nn_sock, &ep ); // select endpt from rr group and set again if more groups
if( DEBUG ) fprintf( stderr, "[DBUG] mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );
}
}
}
+
+ if( ep != NULL && msg != NULL ) {
+ switch( msg->state ) {
+ case RMR_OK:
+ ep->scounts[EPSC_GOOD]++;
+ break;
+
+ case RMR_ERR_RETRY:
+ ep->scounts[EPSC_TRANS]++;
+ break;
+
+ default:
+ ep->scounts[EPSC_FAIL]++;
+ break;
+ }
+ }
} else {
if( ctx->flags & CTXFL_WARN ) {
fprintf( stderr, "[WARN] invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
ep = uta_get_ep( rt, "bad_name:4560" );
errors += fail_not_nil( ep, "end point (fetch by name with bad name)" );
- state = uta_epsock_byname( rt, "localhost:4561", &nn_sock ); // this should be found
+ ep = NULL;
+ state = uta_epsock_byname( rt, "localhost:4561", &nn_sock, &ep ); // this should be found
errors += fail_if_equal( state, 0, "socket (by name)" );
+ errors += fail_if_nil( ep, "epsock_byname did not populate endpoint pointer when expected to" );
//alt_value = uta_epsock_byname( rt, "localhost:4562" ); // we might do a memcmp on the two structs, but for now nothing
//errors += fail_if_equal( value, alt_value, "app1/app2 sockets" );
rte = uta_get_rte( rt, 0, 1, FALSE ); // get an rte for the next loop
if( rte ) {
for( i = 0; i < 10; i++ ) { // round robin return value should be different each time
- value = uta_epsock_rr( rte, 0, &more, &nn_sock ); // msg type 1, group 1
+ value = uta_epsock_rr( rte, 0, &more, &nn_sock, &ep ); // msg type 1, group 1
errors += fail_if_equal( value, alt_value, "round robiin sockets with multiple end points" );
errors += fail_if_false( more, "more for mtype==1" );
alt_value = value;
rte = uta_get_rte( rt, 0, 3, FALSE ); // get an rte for the next loop
if( rte ) {
for( i = 0; i < 10; i++ ) { // this mtype has only one endpoint, so rr should be same each time
- value = uta_epsock_rr( rte, 0, NULL, &nn_sock ); // also test ability to deal properly with nil more pointer
+ value = uta_epsock_rr( rte, 0, NULL, &nn_sock, &ep ); // also test ability to deal properly with nil more pointer
if( i ) {
errors += fail_not_equal( value, alt_value, "round robin sockets with one endpoint" );
errors += fail_not_equal( more, -1, "more value changed in single group instance" );
}
rte = uta_get_rte( rt, 11, 3, TRUE );
- state = uta_epsock_rr( rte, 22, NULL, NULL );
+ state = uta_epsock_rr( rte, 22, NULL, NULL, &ep );
errors += fail_if_true( state, "uta_epsock_rr returned bad (non-zero) state when given nil socket pointer" );
uta_rt_clone( NULL ); // verify null parms don't crash things
uta_rt_drop( NULL );
- uta_epsock_rr( NULL, 0, &more, &nn_sock ); // drive null case for coverage
+ uta_epsock_rr( NULL, 0, &more, &nn_sock, &ep ); // drive null case for coverage
uta_add_rte( NULL, 99, 1 );
uta_get_rte( NULL, 0, 1000, TRUE );