Correct excessive TCP connection bug 89/5489/1 4.5.2
authorE. Scott Daniels <daniels@research.att.com>
Thu, 21 Jan 2021 16:17:33 +0000 (11:17 -0500)
committerE. Scott Daniels <daniels@research.att.com>
Thu, 21 Jan 2021 16:17:33 +0000 (11:17 -0500)
This change corrects a bug in the handling of sends on a blocked
TCP session that was causing a session reconnect to the endpoint,
and resulted in an excessive number of open file descriptors leading
to a segmentation fault.

The change also includes a better warning message when a receiving
process is failing to keep up; the warning is issued every 60
seconds, with a count, when messages are being dropped.

Issue-ID: RIC-735

Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: Id43b1c8c4766e5a89158fb31b05df547603c3e13

CHANGES_CORE.txt
CMakeLists.txt
docs/rel-notes.rst
src/rmr/common/src/mbuf_api.c
src/rmr/si/include/rmr_si_private.h
src/rmr/si/src/mt_call_si_static.c
src/rmr/si/src/rmr_si.c
src/rmr/si/src/si95/sisendt.c
src/rmr/si/src/sr_si_static.c

index 0c96513..e59da47 100644 (file)
@@ -5,6 +5,11 @@
 # API and build change  and fix summaries. Doc corrections
 # and/or changes are not mentioned here; see the commit messages.
 
+2021 January21; Version 4.5.2
+       Fixes the excessive TCP session bug when sending to a slow receiver
+       and a related segment fault because of too many open file descriptors.
+       (RIC-735)
+
 2021 January 19; Version 4.5.1
        Version bump to work round a CI job bug preventing push of the 4.5.0
        packages from staging to release in package cloud.  (RIC-732)
index edb8f40..5cd177e 100644 (file)
@@ -41,7 +41,7 @@ cmake_minimum_required( VERSION 3.5 )
 
 set( major_version "4" )               # should be automatically populated from git tag later, but until CI process sets a tag we use this
 set( minor_version "5" )
-set( patch_level "1" )
+set( patch_level "2" )
 
 set( install_root "${CMAKE_INSTALL_PREFIX}" )
 set( install_inc "include/rmr" )
index 3b2e46f..d07ada2 100644 (file)
@@ -22,6 +22,24 @@ the need to leap frog versions ceased, and beginning with
 version 4.0.0, the RMR versions should no longer skip.
 
 
+2021 January21; Version 4.5.2
+-----------------------------
+
+Fixes the excessive TCP session bug when sending to a slow
+receiver and a related segment fault because of too many open
+file descriptors. (RIC-735)
+
+
+
+2021 January 19; Version 4.5.1
+------------------------------
+
+Version bump to work round a CI job bug preventing push of
+the 4.5.0 packages from staging to release in package cloud.
+(RIC-732)
+
+
+
 2021 January 8; Version 4.5.0
 -----------------------------
 
index 6dbc71a..58225c4 100644 (file)
@@ -284,6 +284,10 @@ extern unsigned char*  rmr_get_meid( rmr_mbuf_t* mbuf, unsigned char* dest ) {
        The return value is the number of bytes actually coppied. If 0 bytes are coppied
        errno should indicate the reason. If 0 is returned and errno is 0, then size
        passed was 0.  The state in the message is left UNCHANGED.
+
+       Regardless of action taken (actual realloc or not) the caller's reference to mbuf
+       is still valid follwing the call and will point to the correct spot (same tp
+       buffer if no realloc needed, or the new one if there was).
 */
 extern int rmr_set_trace( rmr_mbuf_t* msg, unsigned const char* data, int size ) {
        uta_mhdr_t*     hdr;
@@ -312,7 +316,7 @@ extern int rmr_set_trace( rmr_mbuf_t* msg, unsigned const char* data, int size )
 
        if( len != size ) {                                                     // different sized trace data, must realloc the buffer
                nm = rmr_realloc_msg( msg, size );              // realloc with changed trace size
-               old_tp_buf = msg->tp_buf;
+               old_tp_buf = msg->tp_buf;                               // hold to repoint new mbuf at small buffer
                old_hdr = msg->header;
 
                msg->tp_buf = nm->tp_buf;                               // reference the reallocated buffer
@@ -321,7 +325,7 @@ extern int rmr_set_trace( rmr_mbuf_t* msg, unsigned const char* data, int size )
                msg->xaction = nm->xaction;
                msg->payload = nm->payload;
 
-               nm->tp_buf = old_tp_buf;                                // set to free
+               nm->tp_buf = old_tp_buf;                                // set to free; point to the small buffer
                nm->header = old_hdr;                                   // nano frees on hdr, so must set both
                rmr_free_msg( nm );
 
index f125efb..9816c16 100644 (file)
@@ -56,6 +56,7 @@
 
 
 #define SI_MAX_ADDR_LEN                512
+#define MAX_RIVERS                     1024    // max number of directly mapped rivers
 
 /*
        Manages a river of inbound bytes.
@@ -153,6 +154,7 @@ struct uta_ctx {
        si_ctx_t*       si_ctx;                 // the socket context
        int                     nrivers;                // allocated rivers
        river_t*        rivers;                 // inbound flows (index is the socket fd)
+       void*           river_hash;             // flows with fd values > nrivers must be mapped through the hash
        int                     max_ibm;                // max size of an inbound message (river accum alloc size)
        void*           zcb_mring;              // zero copy buffer mbuf ring
        void*           fd2ep;                          // the symtab mapping file des to endpoints for cleanup on disconnect
index 1b31bf4..ec86a82 100644 (file)
@@ -1,4 +1,4 @@
-// : vi ts=4 sw=4 noet:
+       // : vi ts=4 sw=4 noet2
 /*
 ==================================================================================
        Copyright (c) 2020 Nokia
 #include <semaphore.h>
 
 static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
-       static  int warned = 0;
+       static  time_t last_warning = 0;
+       static  long dcount = 0;
+
        chute_t*        chute;
 
        if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
                rmr_free_msg( mbuf );                                                           // drop if ring is full
-               if( !warned ) {
-                       rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
-                       warned++;
+               dcount++;
+               if( time( NULL ) > last_warning + 60 ) {                        // issue warning no more frequently than every 60 sec
+                       rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; %ld msgs dropped since last warning\n", dcount );
+                       last_warning = time( NULL );
+                       dcount = 0;
                }
 
                return;
@@ -96,6 +100,8 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd
                                }
                        }
                }
+       } else {
+               free( raw_msg );
        }
 }
 
@@ -153,7 +159,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
        unsigned char*  old_accum;              // old accumulator reference should we need to realloc
        int                             bidx = 0;               // transport buffer index
        int                             remain;                 // bytes in transport buf that need to be moved
-       int*                    mlen;                   // pointer to spot in buffer for conversion to int
+       int*                    mlen;                   // pointer to spot in buffer for conversion to int
        int                             need;                   // bytes needed for something
        int                             i;
 
@@ -161,22 +167,31 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
                if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
                        return SI_RET_OK;
                }
-
-               if( fd >= ctx->nrivers || fd < 0 ) {
-                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
-                       return SI_RET_OK;
-               }
        } else {
                ctx = (uta_ctx_t *) vctx;
        }
 
-       if( buflen <= 0 ) {
+       if( buflen <= 0 || fd < 0 ) {                   // no buffer or invalid fd
                return SI_RET_OK;
        }
 
-       river = &ctx->rivers[fd];
+       if( fd >= ctx->nrivers ) {
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
+               if( (river = (river_t *) rmr_sym_pull( ctx->river_hash, (uint64_t) fd )) == NULL ) {
+                       river = (river_t *) malloc( sizeof( *river ) );
+                       memset( river, 0, sizeof( *river ) );
+                       rmr_sym_map( ctx->river_hash, (uint64_t) fd, river );
+                       river->state = RS_NEW;
+               }
+       } else {
+               river = &ctx->rivers[fd];                               // quick index for fd values < MAX_FD
+       }
+
        if( river->state != RS_GOOD ) {                         // all states which aren't good require reset first
                if( river->state == RS_NEW ) {
+                       if( river->accum != NULL ) {
+                               free( river->accum );
+                       }
                        memset( river, 0, sizeof( *river ) );
                        river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024);         // start with what user said would be the "normal" max inbound msg size
                        river->accum = (char *) malloc( river->nbytes );
@@ -281,17 +296,36 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
 static int mt_disc_cb( void* vctx, int fd ) {
        uta_ctx_t*      ctx;
        endpoint_t*     ep;
+       river_t*        river = NULL;
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
                return SI_RET_OK;
        }
 
+       if( fd < ctx->nrivers && fd >= 0 ) {
+               river = &ctx->rivers[fd];
+       } else {
+               if( fd > 0 ) {
+                       river = rmr_sym_pull( ctx->river_hash, (uint64_t) fd );
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "river reset on disconnect: fd=%d\n", fd );
+               }
+       }
+
+       if( river != NULL ) {
+               river->state = RS_NEW;                  // if one connects here later; ensure it's new
+               if( river->accum != NULL ) {
+                       free( river->accum );
+                       river->accum = NULL;
+                       river->state = RS_NEW;          // force realloc if the fd is used again
+               }
+       }
+
        ep = fd2ep_del( ctx, fd );              // find ep and remove the fd from the hash
        if( ep != NULL ) {
-       pthread_mutex_lock( &ep->gate );            // wise to lock this
+               pthread_mutex_lock( &ep->gate );            // wise to lock this
                ep->open = FALSE;
                ep->nn_sock = -1;
-       pthread_mutex_unlock( &ep->gate );
+               pthread_mutex_unlock( &ep->gate );
        }
 
        return SI_RET_OK;
@@ -326,7 +360,7 @@ static void* mt_receive( void* vctx ) {
                return NULL;
        }
 
-       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mt_receive: registering SI95 data callback and waiting\n" );
+       rmr_vlog( RMR_VL_INFO, "mt_receive: pid=%lld  registering SI95 data callback and waiting\n", (long long) pthread_self() );
 
        SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx );                  // our callback called only for "cooked" (tcp) data
        SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx );                   // our callback for handling disconnects
index 80d7408..625a58d 100644 (file)
@@ -187,6 +187,10 @@ extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned
        This provides an external path to the realloc static function as it's called by an
        outward facing mbuf api function. Used to reallocate a message with a different
        trace data size.
+
+       User programmes must use this with CAUTION!  The mbuf passed in is NOT freed and
+       is still valid following this call. The caller is reponsible for maintainting
+       a pointer to both old and new messages and invoking rmr_free_msg() on both!
 */
 extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
        return realloc_msg( msg, new_tr_size );
@@ -198,9 +202,11 @@ extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
 */
 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
        if( mbuf == NULL ) {
+               fprintf( stderr, ">>>FREE  nil buffer\n" );
                return;
        }
 
+#ifdef KEEP
        if( mbuf->flags & MFL_HUGE ||                                                   // don't cache oversized messages
                ! mbuf->ring ||                                                                         // cant cache if no ring
                ! uta_ring_insert( mbuf->ring, mbuf ) ) {                       // or ring is full
@@ -213,6 +219,16 @@ extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
                mbuf->cookie = 0;                       // should signal a bad mbuf (if not reallocated)
                free( mbuf );
        }
+#else
+       // always free, never manage a pool
+       if( mbuf->tp_buf ) {
+               free( mbuf->tp_buf );
+               mbuf->tp_buf = NULL;            // just in case user tries to reuse this mbuf; this will be an NPE
+       }
+
+       mbuf->cookie = 0;                       // should signal a bad mbuf (if not reallocated)
+       free( mbuf );
+#endif
 }
 
 /*
@@ -563,7 +579,7 @@ extern int rmr_set_rtimeout( void* vctx, int time ) {
                                that we know about. The _user_ should ensure that the supplied length also
                                includes the trace data length maximum as they are in control of that.
 */
-static void* init(  char* uproto_port, int def_msg_size, int flags ) {
+static void* init( char* uproto_port, int def_msg_size, int flags ) {
        static  int announced = 0;
        uta_ctx_t*      ctx = NULL;
        char    bind_info[256];                         // bind info
@@ -583,8 +599,8 @@ static void* init(  char* uproto_port, int def_msg_size, int flags ) {
 
        if( ! announced ) {
                rmr_set_vlevel( RMR_VL_INFO );          // we WILL announce our version
-               rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/g mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
-                       RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
+               rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95 p=%s mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
+                       uproto_port, RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
                announced = 1;
 
                rmr_set_vlevel( old_vlevel );           // return logging to the desired state
@@ -594,6 +610,7 @@ static void* init(  char* uproto_port, int def_msg_size, int flags ) {
        errno = 0;
        if( uproto_port == NULL ) {
                proto_port = strdup( DEF_COMM_PORT );
+               rmr_vlog( RMR_VL_WARN, "user passed nil as the listen port, using default: %s\n", proto_port );
        } else {
                proto_port = strdup( uproto_port );             // so we can modify it
        }
@@ -610,8 +627,9 @@ static void* init(  char* uproto_port, int def_msg_size, int flags ) {
        memset( ctx, 0, sizeof( uta_ctx_t ) );
 
        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" );
-       ctx->nrivers = 256;                                                             // number of input flows we'll manage
+       ctx->nrivers = MAX_RIVERS;                                              // the array allows for fast index mapping for fd values < max
        ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
+       ctx->river_hash = rmr_sym_alloc( 129 );                         // connections with fd values > FD_MAX have to e hashed
        memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers );
        for( i = 0; i < ctx->nrivers; i++ ) {
                ctx->rivers[i].state = RS_NEW;                          // force allocation of accumulator on first received packet
@@ -657,6 +675,7 @@ static void* init(  char* uproto_port, int def_msg_size, int flags ) {
        } else {
                port = proto_port;                      // assume something like "1234" was passed
        }
+       rmr_vlog( RMR_VL_INFO, "listen port = %s\n", port );
 
        if( (tok = getenv( ENV_RTG_PORT )) != NULL && atoi( tok ) < 0 ) {       // must check here -- if < 0 then we just start static file 'listener'
                static_rtc = 1;
index 7935251..26c5f93 100644 (file)
 *
 *  Date:     27 March 1995
 *  Author:   E. Scott Daniels
-*  Mod:                22 Feb 2002 - To better process queued data 
+*  Mod:                22 Feb 2002 - To better process queued data
 *                      14 Feb 2020 - To fix index bug if fd < 0.
 *
 *****************************************************************************
 */
 
-#include "sisetup.h"     //  get setup stuff 
+#include "sisetup.h"     //  get setup stuff
 #include "sitransport.h"
 
 /*
 //extern int SIsendt_nq( struct ginfo_blk *gptr, int fd, char *ubuf, int ulen ) {
 extern int SIsendt( struct ginfo_blk *gptr, int fd, char *ubuf, int ulen ) {
        int status = SI_ERROR;      //  assume we fail
-       fd_set writefds;            //  local write fdset to check blockage 
-       fd_set execpfds;            //  exception fdset to check errors 
-       struct tp_blk *tpptr;       //  pointer at the tp_blk for the session 
-       struct ioq_blk *qptr;       //  pointer at i/o queue block 
-       struct timeval time;        //  delay time parameter for select call 
+       fd_set writefds;            //  local write fdset to check blockage
+       fd_set execpfds;            //  exception fdset to check errors
+       struct tp_blk *tpptr;       //  pointer at the tp_blk for the session
+       struct ioq_blk *qptr;       //  pointer at i/o queue block
+       struct timeval time;        //  delay time parameter for select call
        int     sidx = 0;                               // send index
 
        errno = EINVAL;
@@ -76,18 +76,18 @@ extern int SIsendt( struct ginfo_blk *gptr, int fd, char *ubuf, int ulen ) {
 
                tpptr->sent++;                          // investigate: this may over count
 
-               FD_ZERO( &writefds );       //  clear for select call 
-               FD_SET( fd, &writefds );    //  set to see if this one was writable 
-               FD_ZERO( &execpfds );       //  clear and set execptions fdset 
+               FD_ZERO( &writefds );       //  clear for select call
+               FD_SET( fd, &writefds );    //  set to see if this one was writable
+               FD_ZERO( &execpfds );       //  clear and set execptions fdset
                FD_SET( fd, &execpfds );
 
                time.tv_sec = 0;                        //  set both to 0 if we just want a poll, else we block at max this amount
                time.tv_usec = 1;                       // small pause on check to help drain things
 
                if( select( fd + 1, NULL, &writefds, &execpfds, &time ) > 0 ) {         //  would block if <= 0
-                       if( FD_ISSET( fd, &execpfds ) ) {       //  error? 
+                       if( FD_ISSET( fd, &execpfds ) ) {               //  error?
                                errno = EBADFD;
-                               SIterm( gptr, tpptr );                          // mark block for deletion when safe
+                               SIterm( gptr, tpptr );                          // mark block for deletion when safe
                                return SI_ERROR;                                        // and bail from this sinking ship
                        } else {
                                errno = 0;
@@ -110,7 +110,7 @@ extern int SIsendt( struct ginfo_blk *gptr, int fd, char *ubuf, int ulen ) {
                        status = SI_ERR_BLOCKED;
                }
        } else {
-               errno = EBADFD;                 // fd in a bad state (probably losed)
+               errno = EBADFD;                 // fd in a bad state (probably lost)
        }
 
        return status;
index b56b6dc..bc3b53e 100644 (file)
@@ -673,11 +673,12 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int r
                        rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
                        return NULL;
                }
-       } else {                                                                                        // send failed -- return original message
-               if( msg->state == 98 ) {                // FIX ME: this is just broken, but needs SI changes to work correctly for us
+       } else {                                                                                        // send failed or would block -- return original message
+               if( state == SI_ERR_BLOCKED || errno == EAGAIN ) {
                        errno = EAGAIN;
-                       msg->state = RMR_ERR_RETRY;                                     // errno will have nano reason
+                       msg->state = RMR_ERR_RETRY;
                } else {
+                       rmr_vlog( RMR_VL_WARN, "send failed: mt=%d errno=%d %s\n", msg->mtype, errno, strerror( errno ) );
                        msg->state = RMR_ERR_SENDFAILED;
                }