From fd4477a9698a46ce5755d614b663c18ceadf43c4 Mon Sep 17 00:00:00 2001 From: "E. Scott Daniels" Date: Thu, 21 Jan 2021 11:17:33 -0500 Subject: [PATCH] Correct excessive TCP connection bug This change corrects a bug in the handling of sends on a blocked TCP session that was causing a session reconnect to the endpoint, and resulted in an excessive number of open file descriptors leading to a segmentation fault. The change also includes a better warning message when a receiving process is failing to keep up; the warning is issued every 60 seconds, with a count, when messages are being dropped. Issue-ID: RIC-735 Signed-off-by: E. Scott Daniels Change-Id: Id43b1c8c4766e5a89158fb31b05df547603c3e13 --- CHANGES_CORE.txt | 5 +++ CMakeLists.txt | 2 +- docs/rel-notes.rst | 18 ++++++++++ src/rmr/common/src/mbuf_api.c | 8 +++-- src/rmr/si/include/rmr_si_private.h | 2 ++ src/rmr/si/src/mt_call_si_static.c | 66 ++++++++++++++++++++++++++++--------- src/rmr/si/src/rmr_si.c | 27 ++++++++++++--- src/rmr/si/src/si95/sisendt.c | 26 +++++++-------- src/rmr/si/src/sr_si_static.c | 7 ++-- 9 files changed, 122 insertions(+), 39 deletions(-) diff --git a/CHANGES_CORE.txt b/CHANGES_CORE.txt index 0c96513..e59da47 100644 --- a/CHANGES_CORE.txt +++ b/CHANGES_CORE.txt @@ -5,6 +5,11 @@ # API and build change and fix summaries. Doc corrections # and/or changes are not mentioned here; see the commit messages. +2021 January21; Version 4.5.2 + Fixes the excessive TCP session bug when sending to a slow receiver + and a related segment fault because of too many open file descriptors. + (RIC-735) + 2021 January 19; Version 4.5.1 Version bump to work round a CI job bug preventing push of the 4.5.0 packages from staging to release in package cloud. (RIC-732) diff --git a/CMakeLists.txt b/CMakeLists.txt index edb8f40..5cd177e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -41,7 +41,7 @@ cmake_minimum_required( VERSION 3.5 ) set( major_version "4" ) # should be automatically populated from git tag later, but until CI process sets a tag we use this set( minor_version "5" ) -set( patch_level "1" ) +set( patch_level "2" ) set( install_root "${CMAKE_INSTALL_PREFIX}" ) set( install_inc "include/rmr" ) diff --git a/docs/rel-notes.rst b/docs/rel-notes.rst index 3b2e46f..d07ada2 100644 --- a/docs/rel-notes.rst +++ b/docs/rel-notes.rst @@ -22,6 +22,24 @@ the need to leap frog versions ceased, and beginning with version 4.0.0, the RMR versions should no longer skip. +2021 January21; Version 4.5.2 +----------------------------- + +Fixes the excessive TCP session bug when sending to a slow +receiver and a related segment fault because of too many open +file descriptors. (RIC-735) + + + +2021 January 19; Version 4.5.1 +------------------------------ + +Version bump to work round a CI job bug preventing push of +the 4.5.0 packages from staging to release in package cloud. +(RIC-732) + + + 2021 January 8; Version 4.5.0 ----------------------------- diff --git a/src/rmr/common/src/mbuf_api.c b/src/rmr/common/src/mbuf_api.c index 6dbc71a..58225c4 100644 --- a/src/rmr/common/src/mbuf_api.c +++ b/src/rmr/common/src/mbuf_api.c @@ -284,6 +284,10 @@ extern unsigned char* rmr_get_meid( rmr_mbuf_t* mbuf, unsigned char* dest ) { The return value is the number of bytes actually coppied. If 0 bytes are coppied errno should indicate the reason. If 0 is returned and errno is 0, then size passed was 0. The state in the message is left UNCHANGED. + + Regardless of action taken (actual realloc or not) the caller's reference to mbuf + is still valid follwing the call and will point to the correct spot (same tp + buffer if no realloc needed, or the new one if there was). */ extern int rmr_set_trace( rmr_mbuf_t* msg, unsigned const char* data, int size ) { uta_mhdr_t* hdr; @@ -312,7 +316,7 @@ extern int rmr_set_trace( rmr_mbuf_t* msg, unsigned const char* data, int size ) if( len != size ) { // different sized trace data, must realloc the buffer nm = rmr_realloc_msg( msg, size ); // realloc with changed trace size - old_tp_buf = msg->tp_buf; + old_tp_buf = msg->tp_buf; // hold to repoint new mbuf at small buffer old_hdr = msg->header; msg->tp_buf = nm->tp_buf; // reference the reallocated buffer @@ -321,7 +325,7 @@ extern int rmr_set_trace( rmr_mbuf_t* msg, unsigned const char* data, int size ) msg->xaction = nm->xaction; msg->payload = nm->payload; - nm->tp_buf = old_tp_buf; // set to free + nm->tp_buf = old_tp_buf; // set to free; point to the small buffer nm->header = old_hdr; // nano frees on hdr, so must set both rmr_free_msg( nm ); diff --git a/src/rmr/si/include/rmr_si_private.h b/src/rmr/si/include/rmr_si_private.h index f125efb..9816c16 100644 --- a/src/rmr/si/include/rmr_si_private.h +++ b/src/rmr/si/include/rmr_si_private.h @@ -56,6 +56,7 @@ #define SI_MAX_ADDR_LEN 512 +#define MAX_RIVERS 1024 // max number of directly mapped rivers /* Manages a river of inbound bytes. @@ -153,6 +154,7 @@ struct uta_ctx { si_ctx_t* si_ctx; // the socket context int nrivers; // allocated rivers river_t* rivers; // inbound flows (index is the socket fd) + void* river_hash; // flows with fd values > nrivers must be mapped through the hash int max_ibm; // max size of an inbound message (river accum alloc size) void* zcb_mring; // zero copy buffer mbuf ring void* fd2ep; // the symtab mapping file des to endpoints for cleanup on disconnect diff --git a/src/rmr/si/src/mt_call_si_static.c b/src/rmr/si/src/mt_call_si_static.c index 1b31bf4..ec86a82 100644 --- a/src/rmr/si/src/mt_call_si_static.c +++ b/src/rmr/si/src/mt_call_si_static.c @@ -1,4 +1,4 @@ -// : vi ts=4 sw=4 noet: + // : vi ts=4 sw=4 noet2 /* ================================================================================== Copyright (c) 2020 Nokia @@ -35,14 +35,18 @@ #include static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) { - static int warned = 0; + static time_t last_warning = 0; + static long dcount = 0; + chute_t* chute; if( ! uta_ring_insert( ctx->mring, mbuf ) ) { rmr_free_msg( mbuf ); // drop if ring is full - if( !warned ) { - rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; messages dropping\n" ); - warned++; + dcount++; + if( time( NULL ) > last_warning + 60 ) { // issue warning no more frequently than every 60 sec + rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; %ld msgs dropped since last warning\n", dcount ); + last_warning = time( NULL ); + dcount = 0; } return; @@ -96,6 +100,8 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd } } } + } else { + free( raw_msg ); } } @@ -153,7 +159,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { unsigned char* old_accum; // old accumulator reference should we need to realloc int bidx = 0; // transport buffer index int remain; // bytes in transport buf that need to be moved - int* mlen; // pointer to spot in buffer for conversion to int + int* mlen; // pointer to spot in buffer for conversion to int int need; // bytes needed for something int i; @@ -161,22 +167,31 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { if( (ctx = (uta_ctx_t *) vctx) == NULL ) { return SI_RET_OK; } - - if( fd >= ctx->nrivers || fd < 0 ) { - if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers ); - return SI_RET_OK; - } } else { ctx = (uta_ctx_t *) vctx; } - if( buflen <= 0 ) { + if( buflen <= 0 || fd < 0 ) { // no buffer or invalid fd return SI_RET_OK; } - river = &ctx->rivers[fd]; + if( fd >= ctx->nrivers ) { + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers ); + if( (river = (river_t *) rmr_sym_pull( ctx->river_hash, (uint64_t) fd )) == NULL ) { + river = (river_t *) malloc( sizeof( *river ) ); + memset( river, 0, sizeof( *river ) ); + rmr_sym_map( ctx->river_hash, (uint64_t) fd, river ); + river->state = RS_NEW; + } + } else { + river = &ctx->rivers[fd]; // quick index for fd values < MAX_FD + } + if( river->state != RS_GOOD ) { // all states which aren't good require reset first if( river->state == RS_NEW ) { + if( river->accum != NULL ) { + free( river->accum ); + } memset( river, 0, sizeof( *river ) ); river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // start with what user said would be the "normal" max inbound msg size river->accum = (char *) malloc( river->nbytes ); @@ -281,17 +296,36 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { static int mt_disc_cb( void* vctx, int fd ) { uta_ctx_t* ctx; endpoint_t* ep; + river_t* river = NULL; if( (ctx = (uta_ctx_t *) vctx) == NULL ) { return SI_RET_OK; } + if( fd < ctx->nrivers && fd >= 0 ) { + river = &ctx->rivers[fd]; + } else { + if( fd > 0 ) { + river = rmr_sym_pull( ctx->river_hash, (uint64_t) fd ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "river reset on disconnect: fd=%d\n", fd ); + } + } + + if( river != NULL ) { + river->state = RS_NEW; // if one connects here later; ensure it's new + if( river->accum != NULL ) { + free( river->accum ); + river->accum = NULL; + river->state = RS_NEW; // force realloc if the fd is used again + } + } + ep = fd2ep_del( ctx, fd ); // find ep and remove the fd from the hash if( ep != NULL ) { - pthread_mutex_lock( &ep->gate ); // wise to lock this + pthread_mutex_lock( &ep->gate ); // wise to lock this ep->open = FALSE; ep->nn_sock = -1; - pthread_mutex_unlock( &ep->gate ); + pthread_mutex_unlock( &ep->gate ); } return SI_RET_OK; @@ -326,7 +360,7 @@ static void* mt_receive( void* vctx ) { return NULL; } - if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mt_receive: registering SI95 data callback and waiting\n" ); + rmr_vlog( RMR_VL_INFO, "mt_receive: pid=%lld registering SI95 data callback and waiting\n", (long long) pthread_self() ); SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx ); // our callback called only for "cooked" (tcp) data SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx ); // our callback for handling disconnects diff --git a/src/rmr/si/src/rmr_si.c b/src/rmr/si/src/rmr_si.c index 80d7408..625a58d 100644 --- a/src/rmr/si/src/rmr_si.c +++ b/src/rmr/si/src/rmr_si.c @@ -187,6 +187,10 @@ extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned This provides an external path to the realloc static function as it's called by an outward facing mbuf api function. Used to reallocate a message with a different trace data size. + + User programmes must use this with CAUTION! The mbuf passed in is NOT freed and + is still valid following this call. The caller is reponsible for maintainting + a pointer to both old and new messages and invoking rmr_free_msg() on both! */ extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) { return realloc_msg( msg, new_tr_size ); @@ -198,9 +202,11 @@ extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) { */ extern void rmr_free_msg( rmr_mbuf_t* mbuf ) { if( mbuf == NULL ) { + fprintf( stderr, ">>>FREE nil buffer\n" ); return; } +#ifdef KEEP if( mbuf->flags & MFL_HUGE || // don't cache oversized messages ! mbuf->ring || // cant cache if no ring ! uta_ring_insert( mbuf->ring, mbuf ) ) { // or ring is full @@ -213,6 +219,16 @@ extern void rmr_free_msg( rmr_mbuf_t* mbuf ) { mbuf->cookie = 0; // should signal a bad mbuf (if not reallocated) free( mbuf ); } +#else + // always free, never manage a pool + if( mbuf->tp_buf ) { + free( mbuf->tp_buf ); + mbuf->tp_buf = NULL; // just in case user tries to reuse this mbuf; this will be an NPE + } + + mbuf->cookie = 0; // should signal a bad mbuf (if not reallocated) + free( mbuf ); +#endif } /* @@ -563,7 +579,7 @@ extern int rmr_set_rtimeout( void* vctx, int time ) { that we know about. The _user_ should ensure that the supplied length also includes the trace data length maximum as they are in control of that. */ -static void* init( char* uproto_port, int def_msg_size, int flags ) { +static void* init( char* uproto_port, int def_msg_size, int flags ) { static int announced = 0; uta_ctx_t* ctx = NULL; char bind_info[256]; // bind info @@ -583,8 +599,8 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) { if( ! announced ) { rmr_set_vlevel( RMR_VL_INFO ); // we WILL announce our version - rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/g mv=%d flg=%02x (%s %s.%s.%s built: %s)\n", - RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ ); + rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95 p=%s mv=%d flg=%02x (%s %s.%s.%s built: %s)\n", + uproto_port, RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ ); announced = 1; rmr_set_vlevel( old_vlevel ); // return logging to the desired state @@ -594,6 +610,7 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) { errno = 0; if( uproto_port == NULL ) { proto_port = strdup( DEF_COMM_PORT ); + rmr_vlog( RMR_VL_WARN, "user passed nil as the listen port, using default: %s\n", proto_port ); } else { proto_port = strdup( uproto_port ); // so we can modify it } @@ -610,8 +627,9 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) { memset( ctx, 0, sizeof( uta_ctx_t ) ); if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" ); - ctx->nrivers = 256; // number of input flows we'll manage + ctx->nrivers = MAX_RIVERS; // the array allows for fast index mapping for fd values < max ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers ); + ctx->river_hash = rmr_sym_alloc( 129 ); // connections with fd values > FD_MAX have to e hashed memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers ); for( i = 0; i < ctx->nrivers; i++ ) { ctx->rivers[i].state = RS_NEW; // force allocation of accumulator on first received packet @@ -657,6 +675,7 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) { } else { port = proto_port; // assume something like "1234" was passed } + rmr_vlog( RMR_VL_INFO, "listen port = %s\n", port ); if( (tok = getenv( ENV_RTG_PORT )) != NULL && atoi( tok ) < 0 ) { // must check here -- if < 0 then we just start static file 'listener' static_rtc = 1; diff --git a/src/rmr/si/src/si95/sisendt.c b/src/rmr/si/src/si95/sisendt.c index 7935251..26c5f93 100644 --- a/src/rmr/si/src/si95/sisendt.c +++ b/src/rmr/si/src/si95/sisendt.c @@ -28,13 +28,13 @@ * * Date: 27 March 1995 * Author: E. Scott Daniels -* Mod: 22 Feb 2002 - To better process queued data +* Mod: 22 Feb 2002 - To better process queued data * 14 Feb 2020 - To fix index bug if fd < 0. * ***************************************************************************** */ -#include "sisetup.h" // get setup stuff +#include "sisetup.h" // get setup stuff #include "sitransport.h" /* @@ -48,11 +48,11 @@ //extern int SIsendt_nq( struct ginfo_blk *gptr, int fd, char *ubuf, int ulen ) { extern int SIsendt( struct ginfo_blk *gptr, int fd, char *ubuf, int ulen ) { int status = SI_ERROR; // assume we fail - fd_set writefds; // local write fdset to check blockage - fd_set execpfds; // exception fdset to check errors - struct tp_blk *tpptr; // pointer at the tp_blk for the session - struct ioq_blk *qptr; // pointer at i/o queue block - struct timeval time; // delay time parameter for select call + fd_set writefds; // local write fdset to check blockage + fd_set execpfds; // exception fdset to check errors + struct tp_blk *tpptr; // pointer at the tp_blk for the session + struct ioq_blk *qptr; // pointer at i/o queue block + struct timeval time; // delay time parameter for select call int sidx = 0; // send index errno = EINVAL; @@ -76,18 +76,18 @@ extern int SIsendt( struct ginfo_blk *gptr, int fd, char *ubuf, int ulen ) { tpptr->sent++; // investigate: this may over count - FD_ZERO( &writefds ); // clear for select call - FD_SET( fd, &writefds ); // set to see if this one was writable - FD_ZERO( &execpfds ); // clear and set execptions fdset + FD_ZERO( &writefds ); // clear for select call + FD_SET( fd, &writefds ); // set to see if this one was writable + FD_ZERO( &execpfds ); // clear and set execptions fdset FD_SET( fd, &execpfds ); time.tv_sec = 0; // set both to 0 if we just want a poll, else we block at max this amount time.tv_usec = 1; // small pause on check to help drain things if( select( fd + 1, NULL, &writefds, &execpfds, &time ) > 0 ) { // would block if <= 0 - if( FD_ISSET( fd, &execpfds ) ) { // error? + if( FD_ISSET( fd, &execpfds ) ) { // error? errno = EBADFD; - SIterm( gptr, tpptr ); // mark block for deletion when safe + SIterm( gptr, tpptr ); // mark block for deletion when safe return SI_ERROR; // and bail from this sinking ship } else { errno = 0; @@ -110,7 +110,7 @@ extern int SIsendt( struct ginfo_blk *gptr, int fd, char *ubuf, int ulen ) { status = SI_ERR_BLOCKED; } } else { - errno = EBADFD; // fd in a bad state (probably losed) + errno = EBADFD; // fd in a bad state (probably lost) } return status; diff --git a/src/rmr/si/src/sr_si_static.c b/src/rmr/si/src/sr_si_static.c index b56b6dc..bc3b53e 100644 --- a/src/rmr/si/src/sr_si_static.c +++ b/src/rmr/si/src/sr_si_static.c @@ -673,11 +673,12 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int r rmr_free_msg( msg ); // not wanting a meessage back, trash this one return NULL; } - } else { // send failed -- return original message - if( msg->state == 98 ) { // FIX ME: this is just broken, but needs SI changes to work correctly for us + } else { // send failed or would block -- return original message + if( state == SI_ERR_BLOCKED || errno == EAGAIN ) { errno = EAGAIN; - msg->state = RMR_ERR_RETRY; // errno will have nano reason + msg->state = RMR_ERR_RETRY; } else { + rmr_vlog( RMR_VL_WARN, "send failed: mt=%d errno=%d %s\n", msg->mtype, errno, strerror( errno ) ); msg->state = RMR_ERR_SENDFAILED; } -- 2.16.6