Add ability to track send counts for an endpoint 76/1076/1 1.10.0
authorE. Scott Daniels <daniels@research.att.com>
Wed, 2 Oct 2019 14:21:24 +0000 (10:21 -0400)
committerE. Scott Daniels <daniels@research.att.com>
Sun, 6 Oct 2019 03:11:52 +0000 (23:11 -0400)
Send counts, success, failure and transient failure, are being
collected for each endpoint. The route table collector thread
will write counts to standard error on a periodic basis.

Enabled forced minimum retry.

Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: I3c315d86e6db79cb1404788ef954a0a664084631

Fix context used for RTC epoll

Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: Ie4883e98237eb87b53b24f68bd434edcf533c92f

Fix bug when table received from RTG

Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: I66dd44ab7cde715d6daf144253ff5a2bd5861ae3

After stress testing and tweaks

Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: I0e814bf25844c19ab9f9f6c38eca50a1a66db309

Update unit tests and resulting fixes

Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: Ifd780cfaaf924e202407f822551ca3e61c84c63d

Set minimum retry

Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: I17f1c096106c96d31b919bef6af759b3f4ec72c3

CMakeLists.txt
src/rmr/common/include/rmr_agnostic.h
src/rmr/common/src/rt_generic_static.c
src/rmr/common/src/rtc_static.c
src/rmr/nng/include/rmr_nng_private.h
src/rmr/nng/src/rmr_nng.c
src/rmr/nng/src/rtable_nng_static.c
src/rmr/nng/src/sr_nng_static.c
test/rt_static_test.c

index 6be67f7..8bccdd4 100644 (file)
@@ -35,8 +35,8 @@ project( rmr LANGUAGES C )
 cmake_minimum_required( VERSION 3.5 )
 
 set( major_version "1" )               # should be automatically populated from git tag later, but until CI process sets a tag we use this
-set( minor_version "9" )
-set( patch_level "1" )
+set( minor_version "10" )
+set( patch_level "0" )
 
 set( install_root "${CMAKE_INSTALL_PREFIX}" )
 set( install_inc "include/rmr" )
index e83a951..8694574 100644 (file)
@@ -86,6 +86,12 @@ typedef struct uta_ctx  uta_ctx_t;
 #define UNSET_SUBID            (-1)                    // initial value on msg allocation indicating not set
 #define UNSET_MSGTYPE  (-1)
 
+                                                                               // index values into the send counters for an enpoint
+#define EPSC_GOOD              0                               // successful send
+#define EPSC_FAIL              1                               // hard failurs
+#define EPSC_TRANS             2                               // transient/soft faiures
+#define EPSC_SIZE              3                               // number of counters
+
 // -- header length/offset macros must ensure network conversion ----
 #define RMR_HDR_LEN(h)         (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3)) // ALL things, not just formal struct
 #define RMR_TR_LEN(h)          (ntohl(((uta_mhdr_t *)h)->len1))
index 816e18a..d65acdf 100644 (file)
@@ -60,7 +60,8 @@ typedef struct thing_list {
 // ---- debugging/testing -------------------------------------------------------------------------
 
 /*
-       Dump stats for an endpoint in the RT.
+       Dump some stats for an endpoint in the RT. This is generally called to
+       verify endpoints after a table load/change.
 */
 static void ep_stats( void* st, void* entry, char const* name, void* thing, void* vcounter ) {
        int*    counter;
@@ -74,7 +75,34 @@ static void ep_stats( void* st, void* entry, char const* name, void* thing, void
                (*counter)++;
        }
 
-       fprintf( stderr, "[DBUG] endpoint: %s open=%d\n", ep->name, ep->open );
+       fprintf( stderr, "[DBUG] RMR sends: target=%s open=%d\n", ep->name, ep->open );
+}
+
+/*
+       Dump counts for an endpoint in the RT. The vid parm is assumed to point to
+       the 'source' information and is added to each message.
+*/
+static void ep_counts( void* st, void* entry, char const* name, void* thing, void* vid ) {
+       endpoint_t* ep;
+       char*   id;
+
+       if( (ep = (endpoint_t *) thing) == NULL ) {
+               return;
+       }
+
+       if( (id = (char *) vid) == NULL ) {
+               id = "missing";
+       }
+
+       fprintf( stderr, "[INFO] RMR sends: ts=%lld src=%s target=%s open=%d succ=%lld fail=%lld (hard=%lld soft=%lld)\n", 
+               (long long) time( NULL ), 
+               id, 
+               ep->name, 
+               ep->open, 
+               ep->scounts[EPSC_GOOD], 
+               ep->scounts[EPSC_FAIL] + ep->scounts[EPSC_TRANS], 
+               ep->scounts[EPSC_FAIL], 
+               ep->scounts[EPSC_TRANS]   );
 }
 
 /*
@@ -124,6 +152,19 @@ static void  rt_stats( route_table_t* rt ) {
        free( counter );
 }
 
+/*
+       Given a route table, cause endpoint counters to be written to stderr. The id
+       parm is written as the "source" in the output.
+*/
+static void  rt_epcounts( route_table_t* rt, char* id ) {
+       if( rt == NULL ) {
+               fprintf( stderr, "[INFO] RMR endpoint: no counts: empty table\n" );
+               return;
+       }
+
+       rmr_sym_foreach_class( rt->hash, 1, ep_counts, id );            // run endpoints in the active table
+}
+
 
 // ------------------------------------------------------------------------------------------------
 /*
@@ -864,6 +905,7 @@ static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name ) {
                ep->addr = uta_h2ip( ep_name );
                ep->name = strdup( ep_name );
                pthread_mutex_init( &ep->gate, NULL );          // init with default attrs
+               memset( &ep->scounts[0], 0, sizeof( ep->scounts ) );
 
                rmr_sym_put( rt->hash, ep_name, 1, ep );
        }
index 013dc8d..cce8ddc 100644 (file)
        records, but each buffer must be less than 4K in length, and the last record in a
        buffere may NOT be split across buffers.
 
+       Other chores:
+       In addition to the primary task of getting, vetting, and installing a new route table, or
+       updates to the existing table, this thread will periodically cause the send counts for each
+       endpoint known to be written to standard error. The frequency is once every 180 seconds, and
+       more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
 */
 static void* rtc( void* vctx ) {
        uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
@@ -99,6 +104,13 @@ static void* rtc( void* vctx ) {
        int             vfd = -1;                                       // verbose file des if we have one
        int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
        char*   eptr;
+       int             epfd = -1;                                      // fd for epoll so we can multi-task
+       struct  epoll_event events[1];          // list of events to give to epoll; we only have one we care about
+       struct  epoll_event epe;                        // event definition for event to listen to
+       int             rcv_fd = -1;                            // pollable file des from NNG to use for timeout
+       int             count_delay = 30;                       // number of seconds between writing count info; initially every 30s
+       int             bump_freq = 0;                          // time at which we will bump count frequency to every 5 minutes
+
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
                fprintf( stderr, "[CRI] rmr_rtc: internal mishap: context passed in was nil\n" );
@@ -113,7 +125,7 @@ static void* rtc( void* vctx ) {
                        read( vfd, wbuf, 10 );
                        vlevel = atoi( wbuf );
                }
-       }
+       }                
 
        read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
 
@@ -146,24 +158,67 @@ static void* rtc( void* vctx ) {
 
        if( (pvt_cx = init( port, MAX_RTG_MSG_SZ, FL_NOTHREAD )) == NULL ) {                            // open a private context
                fprintf( stderr, "[CRI] rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
-               free( fport );
+
+               while( TRUE ) {                                                                                         // no listen port, just dump counts now and then
+                       sleep( count_delay );
+                       rt_epcounts( ctx->rtable, ctx->my_name );
+               }
+               
+               free( fport );                                  // parinoid free and return
                return NULL;
        }
 
+       if( (rcv_fd = rmr_get_rcvfd( pvt_cx )) >= 0 ) {            // get the epoll fd for the rtg socket
+               if( rcv_fd < 0 ) {
+                       fprintf( stderr, "[WARN] cannot get epoll fd for rtg session; stats will generate only after update from rt manager\n" );
+               } else {
+                       if( (epfd = epoll_create1( 0 )) < 0 ) {
+                               fprintf( stderr, "[WARN] stats will generate only after rt manager update; unable to create epoll fd for rtg session: %s\n", strerror( errno ) );
+                               rcv_fd = -1;
+                       } else {
+                               epe.events = EPOLLIN;
+                               epe.data.fd = rcv_fd;
+
+                               if( epoll_ctl( epfd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
+                                       fprintf( stderr, "[WARN] stats will generate only after rt manager update; unable to init epoll_ctl: %s\n", strerror( errno ) );
+                                       rcv_fd = -1;
+                               }
+                       }
+               }
+       }
+
        if( DEBUG ) fprintf( stderr, "[DBUG] rtc thread is running and listening; listening for rtg conns on %s\n", port );
        free( fport );
 
        // future:  if we need to register with the rtg, then build a message and send it through a wormhole here
 
+       bump_freq = time( NULL ) + 300;                         // after 5 minutes we decrease the count frequency
        blabber = 0;
        while( 1 ) {                    // until the cows return, pigs fly, or somesuch event likely not to happen
-               if( raw_interface ) {
-                       msg = (rmr_mbuf_t *) rcv_payload( pvt_cx, msg );                // receive from non-RMr sender
-               } else {
-                       msg = rmr_rcv_msg( pvt_cx, msg );               // receive from an RMr sender
+               while( msg == NULL || msg->len <= 0 ) {                                                 // until we actually have something from the other side
+                       if( rcv_fd < 0 || epoll_wait( epfd, events, 1, 1000 ) > 0 )  {  // skip epoll if init failed, else block for max 1 sec
+                               if( raw_interface ) {
+                                       msg = (rmr_mbuf_t *) rcv_payload( pvt_cx, msg );                // receive from non-RMr sender
+                               } else {
+                                       msg = rmr_rcv_msg( pvt_cx, msg );               // receive from an RMr sender
+                               }
+                       } else {                                                                                                        // no msg, do extra tasks
+                               if( msg != NULL ) {                                                                             // if we were working with a message; ensure no len
+                                       msg->len = 0;
+                                       msg->state = RMR_ERR_TIMEOUT;
+                               }
+                       }
+
+                       if( time( NULL ) > blabber  ) {
+                               blabber = time( NULL ) + count_delay;                                   // set next time to blabber, then do so
+                               if( blabber > bump_freq ) {
+                                       count_delay = 300;
+                               }
+                               rt_epcounts( ctx->rtable, ctx->my_name );
+                       }
                }
 
-               if( vfd >= 0 ) {                                                        // if changed since last go round
+               if( vfd >= 0 ) {                                                        // if file open, check for change to vlevel
                        wbuf[0] = 0;
                        lseek( vfd, 0, 0 );
                        read( vfd, wbuf, 10 );
@@ -212,11 +267,8 @@ static void* rtc( void* vctx ) {
                        if( ctx->shutdown ) {           // mostly for testing, but allows user app to close us down if rmr_*() function sets this
                                break;
                        }
-               } else {
-                       if( time( NULL ) > blabber  ) {
-                               fprintf( stderr, "[WRN] rmr_rtc: nil buffer, or 0 len msg, received from rtg\n" );
-                               blabber = time( NULL ) + 180;                   // limit to 1 every 3 min or so
-                       }
+
+                       msg->len = 0;                           // force back into the listen loop
                }
        }
 
index 9e93c9f..efb2daa 100644 (file)
@@ -1,4 +1,4 @@
-// : vi ts=4 sw=4 noet :
+//  vim: ts=4 sw=4 noet :
 /*
 ==================================================================================
        Copyright (c) 2019 Nokia
@@ -44,6 +44,7 @@ struct endpoint {
        nng_dialer      dialer;         // the connection specific information (retry timout etc)
        int             open;                   // set to true if we've connected as socket cannot be checked directly)
        pthread_mutex_t gate;   // we must serialise when we open/link to the endpoint
+       long long scounts[EPSC_SIZE];           // send counts (indexed by EPSCOUNT_* constants
 };
 
 /*
@@ -107,8 +108,8 @@ static void free_ctx( uta_ctx_t* ctx );
 // --- rt table things ---------------------------
 static int uta_link2( endpoint_t* ep );
 static int rt_link2_ep( endpoint_t* ep );
-static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock );
-static int uta_epsock_rr( rtable_ent_t* rte, int group, int* more, nng_socket* nn_sock );
+static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock, endpoint_t** uepp );
+static int uta_epsock_rr( rtable_ent_t* rte, int group, int* more, nng_socket* nn_sock, endpoint_t** uepp );
 static rtable_ent_t* uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt );
 static inline int xlate_nng_state( int state, int def_state );
 
index a4b4009..45a0e23 100644 (file)
@@ -1,4 +1,4 @@
-// : vi ts=4 sw=4 noet :
+// vim: ts=4 sw=4 noet :
 /*
 ==================================================================================
        Copyright (c) 2019 Nokia
@@ -250,6 +250,7 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
        char*           hold_src;                       // we need the original source if send fails
        char*           hold_ip;                        // also must hold original ip
        int                     sock_ok = 0;            // true if we found a valid endpoint socket
+       endpoint_t*     ep;                                     // end point to track counts
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
@@ -270,10 +271,10 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
 
        ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
 
-       sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock );                        // src is always used first for rts
+       sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep );                   // src is always used first for rts
        if( ! sock_ok ) {
                if( HDR_VERSION( msg->header ) > 2 ) {                                                  // with ver2 the ip is there, try if src name not known
-                       sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock );
+                       sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep );
                }
                if( ! sock_ok ) {
                        msg->state = RMR_ERR_NOENDPT;
@@ -287,6 +288,21 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
        strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );        // must overlay the source to be ours
        msg = send_msg( ctx, msg, nn_sock, -1 );
        if( msg ) {
+               if( ep != NULL ) {
+                       switch( msg->state ) {
+                               case RMR_OK:
+                                       ep->scounts[EPSC_GOOD]++;
+                                       break;
+                       
+                               case RMR_ERR_RETRY:
+                                       ep->scounts[EPSC_TRANS]++;
+                                       break;
+
+                               default:
+                                       ep->scounts[EPSC_FAIL]++;
+                                       break;
+                       }
+               }
                strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );    // always return original source so rts can be called again
                strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );   // always return original source so rts can be called again
                msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
@@ -599,8 +615,8 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        int             state;
 
        if( ! announced ) {
-               fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d (%s %s.%s.%s built: %s)\n",
-                       RMR_MSG_VER, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
+               fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
+                       RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
                announced = 1;
        }
 
index 4f413be..7cd3e20 100644 (file)
@@ -1,4 +1,4 @@
-// : vi ts=4 sw=4 noet :
+// vim: ts=4 sw=4 noet :
 /*
 ==================================================================================
        Copyright (c) 2019 Nokia
@@ -59,7 +59,6 @@
        get things into a bad state if we allow a collision here.  The lock grab
        only happens on the intial session setup.
 */
-//static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer, pthread_mutex* gate ) {
 static int uta_link2( endpoint_t* ep ) {
        static int      flags = -1;
 
@@ -218,7 +217,7 @@ extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n
        the user pointer passed in and sets the return value to true (1). If the
        endpoint cannot be found false (0) is returned.
 */
-static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock ) {
+static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock, endpoint_t** uepp ) {
        endpoint_t* ep;
        int state = FALSE;
 
@@ -227,6 +226,9 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s
        }
 
        ep =  rmr_sym_get( rt->hash, ep_name, 1 );
+       if( uepp != NULL ) {                                                    // caller needs endpoint too, give it back
+               *uepp = ep;
+       }
        if( ep == NULL ) {
                if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s not in hash!\n", ep_name );
                if( ! ep_name || (ep = rt_ensure_ep( rt, ep_name)) == NULL ) {                          // create one if not in rt (support rts without entry in our table)
@@ -279,8 +281,7 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s
                Both of these, in the grand scheme of things, is minor compared to the
                overhead of grabbing a lock on each call.
 */
-static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, nng_socket* nn_sock ) {
-       //rtable_ent_t* rte;                    // matching rt entry
+static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, nng_socket* nn_sock, endpoint_t** uepp ) {
        endpoint_t*     ep;                             // seected end point
        int  state = FALSE;                     // processing state
        int dummy;
@@ -337,6 +338,9 @@ static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, nng_socket* n
                        break;
        }
 
+       if( uepp != NULL ) {                                                            // caller needs refernce to endpoint too
+               *uepp = ep;
+       }
        if( state ) {                                                                           // end point selected, open if not, get socket either way
                if( ! ep->open ) {                                                              // not connected
                        if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
@@ -384,4 +388,35 @@ static inline rtable_ent_t*  uta_get_rte( route_table_t *rt, int sid, int mtype,
        return rte;
 }
 
+/*
+       Return a string of count information. E.g.:
+               <ep-name>:<port> <good> <hard-fail> <soft-fail>
+
+       Caller must free the string allocated if a buffer was not provided.
+
+       Pointer returned is to a freshly allocated string, or the user buffer
+       for convenience.
+
+       If the endpoint passed is a nil pointer, then we return a nil -- caller
+       must check!
+*/
+static inline char* get_ep_counts( endpoint_t* ep, char* ubuf, int ubuf_len ) {
+       char*   rs;                     // result string
+
+       if( ep == NULL ) {
+               return NULL;
+       }
+
+       if( ubuf != NULL ) {
+               rs = ubuf;
+       } else {
+               ubuf_len = 256;
+               rs = malloc( sizeof( char ) * ubuf_len );
+       }
+
+       snprintf( rs, ubuf_len, "%s %lld %lld %lld", ep->name, ep->scounts[EPSC_GOOD], ep->scounts[EPSC_FAIL], ep->scounts[EPSC_TRANS] );
+
+       return rs;
+}
+
 #endif
index 5ca4403..3435cb3 100644 (file)
@@ -1,4 +1,4 @@
-// : vi ts=4 sw=4 noet 2
+// vim: ts=4 sw=4 noet :
 /*
 ==================================================================================
        Copyright (c) 2019 Nokia
@@ -530,7 +530,7 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock
        uta_mhdr_t*     hdr;
        int nng_flags = NNG_FLAG_NONBLOCK;              // if we need to set any nng flags (zc buffer) add it to this
        int spin_retries = 1000;                                // if eagain/timeout we'll spin, at max, this many times before giving up the CPU
-       int     tr_len;                                                         // trace len in sending message so we alloc new message with same trace size
+       int     tr_len;                                                         // trace len in sending message so we alloc new message with same trace sizes
 
        // future: ensure that application did not overrun the XID buffer; last byte must be 0
 
@@ -545,6 +545,11 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock
                strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
        }
 
+       if( retries == 0 ) {
+               spin_retries = 100;
+               retries++;
+       }
+
        errno = 0;
        msg->state = RMR_OK;
        if( msg->flags & MFL_ZEROCOPY ) {                                                                       // faster sending with zcopy buffer
@@ -634,6 +639,7 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock
 
 */
 static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
+       endpoint_t*     ep;                                     // end point that we're attempting to send to
        rtable_ent_t*   rte;                    // the route table entry which matches the message key
        nng_socket      nn_sock;                        // endpoint socket for send
        uta_ctx_t*      ctx;
@@ -656,7 +662,7 @@ static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
 
        errno = 0;                                                                                                      // clear; nano might set, but ensure it's not left over if it doesn't
        if( msg->header == NULL ) {
-               fprintf( stderr, "rmr_send_msg: ERROR: message had no header\n" );
+               fprintf( stderr, "rmr_mtosend_msg: ERROR: message had no header\n" );
                msg->state = RMR_ERR_NOHDR;
                errno = EBADMSG;                                                                                        // must ensure it's not eagain
                msg->tp_state = errno;
@@ -680,7 +686,7 @@ static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
        send_again = 1;                                                                                 // force loop entry
        group = 0;                                                                                              // always start with group 0
        while( send_again ) {
-               sock_ok = uta_epsock_rr( rte, group, &send_again, &nn_sock );                                                           // select endpt from rr group and set again if more groups
+               sock_ok = uta_epsock_rr( rte, group, &send_again, &nn_sock, &ep );              // select endpt from rr group and set again if more groups
 
                if( DEBUG ) fprintf( stderr, "[DBUG] mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
                                msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );
@@ -719,6 +725,22 @@ static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
                                        }
                                }
                        }
+
+                       if( ep != NULL && msg != NULL ) {
+                               switch( msg->state ) {
+                                       case RMR_OK:
+                                               ep->scounts[EPSC_GOOD]++;
+                                               break;
+                               
+                                       case RMR_ERR_RETRY:
+                                               ep->scounts[EPSC_TRANS]++;
+                                               break;
+
+                                       default:
+                                               ep->scounts[EPSC_FAIL]++;
+                                               break;
+                               }
+                       }
                } else {
                        if( ctx->flags & CTXFL_WARN ) {
                                fprintf( stderr, "[WARN] invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
index 6d416ef..87f9931 100644 (file)
@@ -250,8 +250,10 @@ static int rt_test( ) {
        ep = uta_get_ep( rt, "bad_name:4560" );
        errors += fail_not_nil( ep, "end point (fetch by name with bad name)" );
 
-       state = uta_epsock_byname( rt, "localhost:4561", &nn_sock );            // this should be found
+       ep = NULL;
+       state = uta_epsock_byname( rt, "localhost:4561", &nn_sock, &ep );               // this should be found
        errors += fail_if_equal( state, 0, "socket (by name)" );
+       errors += fail_if_nil( ep, "epsock_byname did not populate endpoint pointer when expected to" );
        //alt_value = uta_epsock_byname( rt, "localhost:4562" );                        // we might do a memcmp on the two structs, but for now nothing
        //errors += fail_if_equal( value, alt_value, "app1/app2 sockets" );
 
@@ -290,7 +292,7 @@ static int rt_test( ) {
        rte = uta_get_rte( rt, 0, 1, FALSE );                   // get an rte for the next loop
        if( rte ) {
                for( i = 0; i < 10; i++ ) {                                                                     // round robin return value should be different each time
-                       value = uta_epsock_rr( rte, 0, &more, &nn_sock );                       // msg type 1, group 1
+                       value = uta_epsock_rr( rte, 0, &more, &nn_sock, &ep );          // msg type 1, group 1
                        errors += fail_if_equal( value, alt_value, "round robiin sockets with multiple end points" );
                        errors += fail_if_false( more, "more for mtype==1" );
                        alt_value = value;
@@ -301,7 +303,7 @@ static int rt_test( ) {
        rte = uta_get_rte( rt, 0, 3, FALSE );                           // get an rte for the next loop
        if( rte ) {
                for( i = 0; i < 10; i++ ) {                                                             // this mtype has only one endpoint, so rr should be same each time
-                       value = uta_epsock_rr( rte, 0, NULL, &nn_sock );                // also test ability to deal properly with nil more pointer
+                       value = uta_epsock_rr( rte, 0, NULL, &nn_sock, &ep );           // also test ability to deal properly with nil more pointer
                        if( i ) {
                                errors += fail_not_equal( value, alt_value, "round robin sockets with one endpoint" );
                                errors += fail_not_equal( more, -1, "more value changed in single group instance" );
@@ -311,12 +313,12 @@ static int rt_test( ) {
        }
 
        rte = uta_get_rte( rt, 11, 3, TRUE );
-       state = uta_epsock_rr( rte, 22, NULL, NULL );
+       state = uta_epsock_rr( rte, 22, NULL, NULL, &ep );
        errors += fail_if_true( state, "uta_epsock_rr returned bad (non-zero) state when given nil socket pointer" );
 
        uta_rt_clone( NULL );                                                           // verify null parms don't crash things
        uta_rt_drop( NULL );
-       uta_epsock_rr( NULL, 0,  &more, &nn_sock );                     // drive null case for coverage
+       uta_epsock_rr( NULL, 0,  &more, &nn_sock, &ep );                        // drive null case for coverage
        uta_add_rte( NULL, 99, 1 );
        uta_get_rte( NULL, 0, 1000, TRUE );