From 58ccd68c7e98074aa65cb9fc271db963192de68d Mon Sep 17 00:00:00 2001 From: "E. Scott Daniels" Date: Wed, 2 Oct 2019 10:21:24 -0400 Subject: [PATCH] Add ability to track send counts for an endpoint Send counts, success, failure and transient failure, are being collected for each endpoint. The route table collector thread will write counts to standard error on a periodic basis. Enabled forced minimum retry. Signed-off-by: E. Scott Daniels Change-Id: I3c315d86e6db79cb1404788ef954a0a664084631 Fix context used for RTC epoll Signed-off-by: E. Scott Daniels Change-Id: Ie4883e98237eb87b53b24f68bd434edcf533c92f Fix bug when table received from RTG Signed-off-by: E. Scott Daniels Change-Id: I66dd44ab7cde715d6daf144253ff5a2bd5861ae3 After stress testing and tweaks Signed-off-by: E. Scott Daniels Change-Id: I0e814bf25844c19ab9f9f6c38eca50a1a66db309 Update unit tests and resulting fixes Signed-off-by: E. Scott Daniels Change-Id: Ifd780cfaaf924e202407f822551ca3e61c84c63d Set minimum retry Signed-off-by: E. Scott Daniels Change-Id: I17f1c096106c96d31b919bef6af759b3f4ec72c3 --- CMakeLists.txt | 4 +- src/rmr/common/include/rmr_agnostic.h | 6 +++ src/rmr/common/src/rt_generic_static.c | 46 +++++++++++++++++++- src/rmr/common/src/rtc_static.c | 76 ++++++++++++++++++++++++++++------ src/rmr/nng/include/rmr_nng_private.h | 7 ++-- src/rmr/nng/src/rmr_nng.c | 26 +++++++++--- src/rmr/nng/src/rtable_nng_static.c | 45 +++++++++++++++++--- src/rmr/nng/src/sr_nng_static.c | 30 ++++++++++++-- test/rt_static_test.c | 12 +++--- 9 files changed, 214 insertions(+), 38 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 6be67f7..8bccdd4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -35,8 +35,8 @@ project( rmr LANGUAGES C ) cmake_minimum_required( VERSION 3.5 ) set( major_version "1" ) # should be automatically populated from git tag later, but until CI process sets a tag we use this -set( minor_version "9" ) -set( patch_level "1" ) +set( minor_version "10" ) +set( patch_level "0" ) set( install_root "${CMAKE_INSTALL_PREFIX}" ) set( install_inc "include/rmr" ) diff --git a/src/rmr/common/include/rmr_agnostic.h b/src/rmr/common/include/rmr_agnostic.h index e83a951..8694574 100644 --- a/src/rmr/common/include/rmr_agnostic.h +++ b/src/rmr/common/include/rmr_agnostic.h @@ -86,6 +86,12 @@ typedef struct uta_ctx uta_ctx_t; #define UNSET_SUBID (-1) // initial value on msg allocation indicating not set #define UNSET_MSGTYPE (-1) + // index values into the send counters for an enpoint +#define EPSC_GOOD 0 // successful send +#define EPSC_FAIL 1 // hard failurs +#define EPSC_TRANS 2 // transient/soft faiures +#define EPSC_SIZE 3 // number of counters + // -- header length/offset macros must ensure network conversion ---- #define RMR_HDR_LEN(h) (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3)) // ALL things, not just formal struct #define RMR_TR_LEN(h) (ntohl(((uta_mhdr_t *)h)->len1)) diff --git a/src/rmr/common/src/rt_generic_static.c b/src/rmr/common/src/rt_generic_static.c index 816e18a..d65acdf 100644 --- a/src/rmr/common/src/rt_generic_static.c +++ b/src/rmr/common/src/rt_generic_static.c @@ -60,7 +60,8 @@ typedef struct thing_list { // ---- debugging/testing ------------------------------------------------------------------------- /* - Dump stats for an endpoint in the RT. + Dump some stats for an endpoint in the RT. This is generally called to + verify endpoints after a table load/change. */ static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) { int* counter; @@ -74,7 +75,34 @@ static void ep_stats( void* st, void* entry, char const* name, void* thing, void (*counter)++; } - fprintf( stderr, "[DBUG] endpoint: %s open=%d\n", ep->name, ep->open ); + fprintf( stderr, "[DBUG] RMR sends: target=%s open=%d\n", ep->name, ep->open ); +} + +/* + Dump counts for an endpoint in the RT. The vid parm is assumed to point to + the 'source' information and is added to each message. +*/ +static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) { + endpoint_t* ep; + char* id; + + if( (ep = (endpoint_t *) thing) == NULL ) { + return; + } + + if( (id = (char *) vid) == NULL ) { + id = "missing"; + } + + fprintf( stderr, "[INFO] RMR sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n", + (long long) time( NULL ), + id, + ep->name, + ep->open, + ep->scounts[EPSC_GOOD], + ep->scounts[EPSC_FAIL] + ep->scounts[EPSC_TRANS], + ep->scounts[EPSC_FAIL], + ep->scounts[EPSC_TRANS] ); } /* @@ -124,6 +152,19 @@ static void rt_stats( route_table_t* rt ) { free( counter ); } +/* + Given a route table, cause endpoint counters to be written to stderr. The id + parm is written as the "source" in the output. +*/ +static void rt_epcounts( route_table_t* rt, char* id ) { + if( rt == NULL ) { + fprintf( stderr, "[INFO] RMR endpoint: no counts: empty table\n" ); + return; + } + + rmr_sym_foreach_class( rt->hash, 1, ep_counts, id ); // run endpoints in the active table +} + // ------------------------------------------------------------------------------------------------ /* @@ -864,6 +905,7 @@ static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) { ep->addr = uta_h2ip( ep_name ); ep->name = strdup( ep_name ); pthread_mutex_init( &ep->gate, NULL ); // init with default attrs + memset( &ep->scounts[0], 0, sizeof( ep->scounts ) ); rmr_sym_put( rt->hash, ep_name, 1, ep ); } diff --git a/src/rmr/common/src/rtc_static.c b/src/rmr/common/src/rtc_static.c index 013dc8d..cce8ddc 100644 --- a/src/rmr/common/src/rtc_static.c +++ b/src/rmr/common/src/rtc_static.c @@ -73,6 +73,11 @@ records, but each buffer must be less than 4K in length, and the last record in a buffere may NOT be split across buffers. + Other chores: + In addition to the primary task of getting, vetting, and installing a new route table, or + updates to the existing table, this thread will periodically cause the send counts for each + endpoint known to be written to standard error. The frequency is once every 180 seconds, and + more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0. */ static void* rtc( void* vctx ) { uta_ctx_t* ctx; // context user has -- where we pin the route table @@ -99,6 +104,13 @@ static void* rtc( void* vctx ) { int vfd = -1; // verbose file des if we have one int vlevel = 0; // how chatty we should be 0== no nattering allowed char* eptr; + int epfd = -1; // fd for epoll so we can multi-task + struct epoll_event events[1]; // list of events to give to epoll; we only have one we care about + struct epoll_event epe; // event definition for event to listen to + int rcv_fd = -1; // pollable file des from NNG to use for timeout + int count_delay = 30; // number of seconds between writing count info; initially every 30s + int bump_freq = 0; // time at which we will bump count frequency to every 5 minutes + if( (ctx = (uta_ctx_t *) vctx) == NULL ) { fprintf( stderr, "[CRI] rmr_rtc: internal mishap: context passed in was nil\n" ); @@ -113,7 +125,7 @@ static void* rtc( void* vctx ) { read( vfd, wbuf, 10 ); vlevel = atoi( wbuf ); } - } + } read_static_rt( ctx, vlevel ); // seed the route table if one provided @@ -146,24 +158,67 @@ static void* rtc( void* vctx ) { if( (pvt_cx = init( port, MAX_RTG_MSG_SZ, FL_NOTHREAD )) == NULL ) { // open a private context fprintf( stderr, "[CRI] rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" ); - free( fport ); + + while( TRUE ) { // no listen port, just dump counts now and then + sleep( count_delay ); + rt_epcounts( ctx->rtable, ctx->my_name ); + } + + free( fport ); // parinoid free and return return NULL; } + if( (rcv_fd = rmr_get_rcvfd( pvt_cx )) >= 0 ) { // get the epoll fd for the rtg socket + if( rcv_fd < 0 ) { + fprintf( stderr, "[WARN] cannot get epoll fd for rtg session; stats will generate only after update from rt manager\n" ); + } else { + if( (epfd = epoll_create1( 0 )) < 0 ) { + fprintf( stderr, "[WARN] stats will generate only after rt manager update; unable to create epoll fd for rtg session: %s\n", strerror( errno ) ); + rcv_fd = -1; + } else { + epe.events = EPOLLIN; + epe.data.fd = rcv_fd; + + if( epoll_ctl( epfd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 ) { + fprintf( stderr, "[WARN] stats will generate only after rt manager update; unable to init epoll_ctl: %s\n", strerror( errno ) ); + rcv_fd = -1; + } + } + } + } + if( DEBUG ) fprintf( stderr, "[DBUG] rtc thread is running and listening; listening for rtg conns on %s\n", port ); free( fport ); // future: if we need to register with the rtg, then build a message and send it through a wormhole here + bump_freq = time( NULL ) + 300; // after 5 minutes we decrease the count frequency blabber = 0; while( 1 ) { // until the cows return, pigs fly, or somesuch event likely not to happen - if( raw_interface ) { - msg = (rmr_mbuf_t *) rcv_payload( pvt_cx, msg ); // receive from non-RMr sender - } else { - msg = rmr_rcv_msg( pvt_cx, msg ); // receive from an RMr sender + while( msg == NULL || msg->len <= 0 ) { // until we actually have something from the other side + if( rcv_fd < 0 || epoll_wait( epfd, events, 1, 1000 ) > 0 ) { // skip epoll if init failed, else block for max 1 sec + if( raw_interface ) { + msg = (rmr_mbuf_t *) rcv_payload( pvt_cx, msg ); // receive from non-RMr sender + } else { + msg = rmr_rcv_msg( pvt_cx, msg ); // receive from an RMr sender + } + } else { // no msg, do extra tasks + if( msg != NULL ) { // if we were working with a message; ensure no len + msg->len = 0; + msg->state = RMR_ERR_TIMEOUT; + } + } + + if( time( NULL ) > blabber ) { + blabber = time( NULL ) + count_delay; // set next time to blabber, then do so + if( blabber > bump_freq ) { + count_delay = 300; + } + rt_epcounts( ctx->rtable, ctx->my_name ); + } } - if( vfd >= 0 ) { // if changed since last go round + if( vfd >= 0 ) { // if file open, check for change to vlevel wbuf[0] = 0; lseek( vfd, 0, 0 ); read( vfd, wbuf, 10 ); @@ -212,11 +267,8 @@ static void* rtc( void* vctx ) { if( ctx->shutdown ) { // mostly for testing, but allows user app to close us down if rmr_*() function sets this break; } - } else { - if( time( NULL ) > blabber ) { - fprintf( stderr, "[WRN] rmr_rtc: nil buffer, or 0 len msg, received from rtg\n" ); - blabber = time( NULL ) + 180; // limit to 1 every 3 min or so - } + + msg->len = 0; // force back into the listen loop } } diff --git a/src/rmr/nng/include/rmr_nng_private.h b/src/rmr/nng/include/rmr_nng_private.h index 9e93c9f..efb2daa 100644 --- a/src/rmr/nng/include/rmr_nng_private.h +++ b/src/rmr/nng/include/rmr_nng_private.h @@ -1,4 +1,4 @@ -// : vi ts=4 sw=4 noet : +// vim: ts=4 sw=4 noet : /* ================================================================================== Copyright (c) 2019 Nokia @@ -44,6 +44,7 @@ struct endpoint { nng_dialer dialer; // the connection specific information (retry timout etc) int open; // set to true if we've connected as socket cannot be checked directly) pthread_mutex_t gate; // we must serialise when we open/link to the endpoint + long long scounts[EPSC_SIZE]; // send counts (indexed by EPSCOUNT_* constants }; /* @@ -107,8 +108,8 @@ static void free_ctx( uta_ctx_t* ctx ); // --- rt table things --------------------------- static int uta_link2( endpoint_t* ep ); static int rt_link2_ep( endpoint_t* ep ); -static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock ); -static int uta_epsock_rr( rtable_ent_t* rte, int group, int* more, nng_socket* nn_sock ); +static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock, endpoint_t** uepp ); +static int uta_epsock_rr( rtable_ent_t* rte, int group, int* more, nng_socket* nn_sock, endpoint_t** uepp ); static rtable_ent_t* uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt ); static inline int xlate_nng_state( int state, int def_state ); diff --git a/src/rmr/nng/src/rmr_nng.c b/src/rmr/nng/src/rmr_nng.c index a4b4009..45a0e23 100644 --- a/src/rmr/nng/src/rmr_nng.c +++ b/src/rmr/nng/src/rmr_nng.c @@ -1,4 +1,4 @@ -// : vi ts=4 sw=4 noet : +// vim: ts=4 sw=4 noet : /* ================================================================================== Copyright (c) 2019 Nokia @@ -250,6 +250,7 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { char* hold_src; // we need the original source if send fails char* hold_ip; // also must hold original ip int sock_ok = 0; // true if we found a valid endpoint socket + endpoint_t* ep; // end point to track counts if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast errno = EINVAL; // if msg is null, this is their clue @@ -270,10 +271,10 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off - sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock ); // src is always used first for rts + sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep ); // src is always used first for rts if( ! sock_ok ) { if( HDR_VERSION( msg->header ) > 2 ) { // with ver2 the ip is there, try if src name not known - sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock ); + sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep ); } if( ! sock_ok ) { msg->state = RMR_ERR_NOENDPT; @@ -287,6 +288,21 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours msg = send_msg( ctx, msg, nn_sock, -1 ); if( msg ) { + if( ep != NULL ) { + switch( msg->state ) { + case RMR_OK: + ep->scounts[EPSC_GOOD]++; + break; + + case RMR_ERR_RETRY: + ep->scounts[EPSC_TRANS]++; + break; + + default: + ep->scounts[EPSC_FAIL]++; + break; + } + } strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC ); // always return original source so rts can be called again strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC ); // always return original source so rts can be called again msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source @@ -599,8 +615,8 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { int state; if( ! announced ) { - fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d (%s %s.%s.%s built: %s)\n", - RMR_MSG_VER, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ ); + fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d flg=%02x (%s %s.%s.%s built: %s)\n", + RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ ); announced = 1; } diff --git a/src/rmr/nng/src/rtable_nng_static.c b/src/rmr/nng/src/rtable_nng_static.c index 4f413be..7cd3e20 100644 --- a/src/rmr/nng/src/rtable_nng_static.c +++ b/src/rmr/nng/src/rtable_nng_static.c @@ -1,4 +1,4 @@ -// : vi ts=4 sw=4 noet : +// vim: ts=4 sw=4 noet : /* ================================================================================== Copyright (c) 2019 Nokia @@ -59,7 +59,6 @@ get things into a bad state if we allow a collision here. The lock grab only happens on the intial session setup. */ -//static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer, pthread_mutex* gate ) { static int uta_link2( endpoint_t* ep ) { static int flags = -1; @@ -218,7 +217,7 @@ extern endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n the user pointer passed in and sets the return value to true (1). If the endpoint cannot be found false (0) is returned. */ -static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock ) { +static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock, endpoint_t** uepp ) { endpoint_t* ep; int state = FALSE; @@ -227,6 +226,9 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s } ep = rmr_sym_get( rt->hash, ep_name, 1 ); + if( uepp != NULL ) { // caller needs endpoint too, give it back + *uepp = ep; + } if( ep == NULL ) { if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s not in hash!\n", ep_name ); if( ! ep_name || (ep = rt_ensure_ep( rt, ep_name)) == NULL ) { // create one if not in rt (support rts without entry in our table) @@ -279,8 +281,7 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s Both of these, in the grand scheme of things, is minor compared to the overhead of grabbing a lock on each call. */ -static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, nng_socket* nn_sock ) { - //rtable_ent_t* rte; // matching rt entry +static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, nng_socket* nn_sock, endpoint_t** uepp ) { endpoint_t* ep; // seected end point int state = FALSE; // processing state int dummy; @@ -337,6 +338,9 @@ static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, nng_socket* n break; } + if( uepp != NULL ) { // caller needs refernce to endpoint too + *uepp = ep; + } if( state ) { // end point selected, open if not, get socket either way if( ! ep->open ) { // not connected if( ep->addr == NULL ) { // name didn't resolve before, try again @@ -384,4 +388,35 @@ static inline rtable_ent_t* uta_get_rte( route_table_t *rt, int sid, int mtype, return rte; } +/* + Return a string of count information. E.g.: + : + + Caller must free the string allocated if a buffer was not provided. + + Pointer returned is to a freshly allocated string, or the user buffer + for convenience. + + If the endpoint passed is a nil pointer, then we return a nil -- caller + must check! +*/ +static inline char* get_ep_counts( endpoint_t* ep, char* ubuf, int ubuf_len ) { + char* rs; // result string + + if( ep == NULL ) { + return NULL; + } + + if( ubuf != NULL ) { + rs = ubuf; + } else { + ubuf_len = 256; + rs = malloc( sizeof( char ) * ubuf_len ); + } + + snprintf( rs, ubuf_len, "%s %lld %lld %lld", ep->name, ep->scounts[EPSC_GOOD], ep->scounts[EPSC_FAIL], ep->scounts[EPSC_TRANS] ); + + return rs; +} + #endif diff --git a/src/rmr/nng/src/sr_nng_static.c b/src/rmr/nng/src/sr_nng_static.c index 5ca4403..3435cb3 100644 --- a/src/rmr/nng/src/sr_nng_static.c +++ b/src/rmr/nng/src/sr_nng_static.c @@ -1,4 +1,4 @@ -// : vi ts=4 sw=4 noet 2 +// vim: ts=4 sw=4 noet : /* ================================================================================== Copyright (c) 2019 Nokia @@ -530,7 +530,7 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock uta_mhdr_t* hdr; int nng_flags = NNG_FLAG_NONBLOCK; // if we need to set any nng flags (zc buffer) add it to this int spin_retries = 1000; // if eagain/timeout we'll spin, at max, this many times before giving up the CPU - int tr_len; // trace len in sending message so we alloc new message with same trace size + int tr_len; // trace len in sending message so we alloc new message with same trace sizes // future: ensure that application did not overrun the XID buffer; last byte must be 0 @@ -545,6 +545,11 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC ); } + if( retries == 0 ) { + spin_retries = 100; + retries++; + } + errno = 0; msg->state = RMR_OK; if( msg->flags & MFL_ZEROCOPY ) { // faster sending with zcopy buffer @@ -634,6 +639,7 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock */ static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { + endpoint_t* ep; // end point that we're attempting to send to rtable_ent_t* rte; // the route table entry which matches the message key nng_socket nn_sock; // endpoint socket for send uta_ctx_t* ctx; @@ -656,7 +662,7 @@ static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { errno = 0; // clear; nano might set, but ensure it's not left over if it doesn't if( msg->header == NULL ) { - fprintf( stderr, "rmr_send_msg: ERROR: message had no header\n" ); + fprintf( stderr, "rmr_mtosend_msg: ERROR: message had no header\n" ); msg->state = RMR_ERR_NOHDR; errno = EBADMSG; // must ensure it's not eagain msg->tp_state = errno; @@ -680,7 +686,7 @@ static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { send_again = 1; // force loop entry group = 0; // always start with group 0 while( send_again ) { - sock_ok = uta_epsock_rr( rte, group, &send_again, &nn_sock ); // select endpt from rr group and set again if more groups + sock_ok = uta_epsock_rr( rte, group, &send_again, &nn_sock, &ep ); // select endpt from rr group and set again if more groups if( DEBUG ) fprintf( stderr, "[DBUG] mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n", msg->flags, msg->mtype, send_again, group, msg->len, sock_ok ); @@ -719,6 +725,22 @@ static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) { } } } + + if( ep != NULL && msg != NULL ) { + switch( msg->state ) { + case RMR_OK: + ep->scounts[EPSC_GOOD]++; + break; + + case RMR_ERR_RETRY: + ep->scounts[EPSC_TRANS]++; + break; + + default: + ep->scounts[EPSC_FAIL]++; + break; + } + } } else { if( ctx->flags & CTXFL_WARN ) { fprintf( stderr, "[WARN] invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id ); diff --git a/test/rt_static_test.c b/test/rt_static_test.c index 6d416ef..87f9931 100644 --- a/test/rt_static_test.c +++ b/test/rt_static_test.c @@ -250,8 +250,10 @@ static int rt_test( ) { ep = uta_get_ep( rt, "bad_name:4560" ); errors += fail_not_nil( ep, "end point (fetch by name with bad name)" ); - state = uta_epsock_byname( rt, "localhost:4561", &nn_sock ); // this should be found + ep = NULL; + state = uta_epsock_byname( rt, "localhost:4561", &nn_sock, &ep ); // this should be found errors += fail_if_equal( state, 0, "socket (by name)" ); + errors += fail_if_nil( ep, "epsock_byname did not populate endpoint pointer when expected to" ); //alt_value = uta_epsock_byname( rt, "localhost:4562" ); // we might do a memcmp on the two structs, but for now nothing //errors += fail_if_equal( value, alt_value, "app1/app2 sockets" ); @@ -290,7 +292,7 @@ static int rt_test( ) { rte = uta_get_rte( rt, 0, 1, FALSE ); // get an rte for the next loop if( rte ) { for( i = 0; i < 10; i++ ) { // round robin return value should be different each time - value = uta_epsock_rr( rte, 0, &more, &nn_sock ); // msg type 1, group 1 + value = uta_epsock_rr( rte, 0, &more, &nn_sock, &ep ); // msg type 1, group 1 errors += fail_if_equal( value, alt_value, "round robiin sockets with multiple end points" ); errors += fail_if_false( more, "more for mtype==1" ); alt_value = value; @@ -301,7 +303,7 @@ static int rt_test( ) { rte = uta_get_rte( rt, 0, 3, FALSE ); // get an rte for the next loop if( rte ) { for( i = 0; i < 10; i++ ) { // this mtype has only one endpoint, so rr should be same each time - value = uta_epsock_rr( rte, 0, NULL, &nn_sock ); // also test ability to deal properly with nil more pointer + value = uta_epsock_rr( rte, 0, NULL, &nn_sock, &ep ); // also test ability to deal properly with nil more pointer if( i ) { errors += fail_not_equal( value, alt_value, "round robin sockets with one endpoint" ); errors += fail_not_equal( more, -1, "more value changed in single group instance" ); @@ -311,12 +313,12 @@ static int rt_test( ) { } rte = uta_get_rte( rt, 11, 3, TRUE ); - state = uta_epsock_rr( rte, 22, NULL, NULL ); + state = uta_epsock_rr( rte, 22, NULL, NULL, &ep ); errors += fail_if_true( state, "uta_epsock_rr returned bad (non-zero) state when given nil socket pointer" ); uta_rt_clone( NULL ); // verify null parms don't crash things uta_rt_drop( NULL ); - uta_epsock_rr( NULL, 0, &more, &nn_sock ); // drive null case for coverage + uta_epsock_rr( NULL, 0, &more, &nn_sock, &ep ); // drive null case for coverage uta_add_rte( NULL, 99, 1 ); uta_get_rte( NULL, 0, 1000, TRUE ); -- 2.16.6