From 6d112571b27574ae857da7cb8dc8758ffee4ff60 Mon Sep 17 00:00:00 2001 From: "E. Scott Daniels" Date: Thu, 20 Feb 2020 14:43:35 -0500 Subject: [PATCH] Fix crash under SI95 with multiple receive threads When the user application was using multiple threads to receive messages it was alsmost certain to crash with a segfault. This change corrects a bug in the ring handler which was allowing a premature release of the ring lock. A verification cookie has been added to the msg buffer to facilitate validation of a message buffer when examining crash output. Issue-ID: RIC-218 Signed-off-by: E. Scott Daniels Change-Id: Iaa6519e415dcf7f8c52312e8e02d8cf4dca5866f --- CHANGES | 3 +++ CMakeLists.txt | 2 +- src/rmr/common/include/rmr.h | 2 ++ src/rmr/common/src/ring_static.c | 19 ++++++++++++------- src/rmr/si/src/mt_call_si_static.c | 4 ++-- src/rmr/si/src/rmr_si.c | 7 +++++-- src/rmr/si/src/sr_si_static.c | 7 ++++++- 7 files changed, 31 insertions(+), 13 deletions(-) diff --git a/CHANGES b/CHANGES index e01ab41..2854cd5 100644 --- a/CHANGES +++ b/CHANGES @@ -2,6 +2,9 @@ API and build change and fix summaries. Doc correctsions and/or changes are not mentioned here; see the commit messages. +2020 February 20; version 3.2.2 + Fix receive thread related core dump (ring early unlock). + 2020 February 19; version 3.2.1 Added missing message types (E2-Setup) diff --git a/CMakeLists.txt b/CMakeLists.txt index b817699..41eee9f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -38,7 +38,7 @@ cmake_minimum_required( VERSION 3.5 ) set( major_version "3" ) # should be automatically populated from git tag later, but until CI process sets a tag we use this set( minor_version "2" ) -set( patch_level "1" ) +set( patch_level "2" ) set( install_root "${CMAKE_INSTALL_PREFIX}" ) set( install_inc "include/rmr" ) diff --git a/src/rmr/common/include/rmr.h b/src/rmr/common/include/rmr.h index 88689d5..5d4583a 100644 --- a/src/rmr/common/include/rmr.h +++ b/src/rmr/common/include/rmr.h @@ -106,6 +106,8 @@ typedef struct { void* ring; // ring this buffer should be queued back to int rts_fd; // SI fd for return to sender + + int cookie; // cookie to detect user misuse of free'd msg } rmr_mbuf_t; diff --git a/src/rmr/common/src/ring_static.c b/src/rmr/common/src/ring_static.c index 37bb4db..b54d815 100644 --- a/src/rmr/common/src/ring_static.c +++ b/src/rmr/common/src/ring_static.c @@ -166,6 +166,7 @@ static inline void* uta_ring_extract( void* vr ) { ring_t* r; uint16_t ti; // real index in data int64_t ctr; // pfd counter + void* data; if( !RING_FAST ) { // compiler should drop the conditional when always false if( (r = (ring_t*) vr) == NULL ) { @@ -199,10 +200,14 @@ future -- investigate if it's possible only to set/clear when empty or going to } */ + data = r->data[ti]; // secure data and clear before letting go of the lock + r->data[ti] = NULL; + if( r->rgate != NULL ) { // if locked above... pthread_mutex_unlock( r->rgate ); } - return r->data[ti]; + + return data; } /* @@ -232,6 +237,12 @@ static inline int uta_ring_insert( void* vr, void* new_data ) { return 0; } + r->data[r->head] = new_data; + r->head++; + if( r->head >= r->nelements ) { + r->head = 0; + } + write( r->pfd, &inc, sizeof( inc ) ); /* future -- investigate if it's possible only to set/clear when empty or going to empty @@ -239,12 +250,6 @@ future -- investigate if it's possible only to set/clear when empty or going to } */ - r->data[r->head] = new_data; - r->head++; - if( r->head >= r->nelements ) { - r->head = 0; - } - if( r->wgate != NULL ) { // if lock exists we must unlock before going pthread_mutex_unlock( r->wgate ); } diff --git a/src/rmr/si/src/mt_call_si_static.c b/src/rmr/si/src/mt_call_si_static.c index d6279fd..862b554 100644 --- a/src/rmr/si/src/mt_call_si_static.c +++ b/src/rmr/si/src/mt_call_si_static.c @@ -134,7 +134,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { if( river->state == RS_NEW ) { memset( river, 0, sizeof( *river ) ); //river->nbytes = sizeof( char ) * (8 * 1024); - river->nbytes = sizeof( char ) * ctx->max_ibm; // max inbound message size + river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // max inbound message size river->accum = (char *) malloc( river->nbytes ); river->ipt = 0; } else { @@ -194,7 +194,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain ); if( (river->flags & RF_DROP) == 0 ) { memcpy( &river->accum[river->ipt], buf+bidx, need ); // grab just what is needed (might be more) - buf2mbuf( ctx, river->accum, river->msg_size, fd ); // build an RMR mbuf and queue + buf2mbuf( ctx, river->accum, river->nbytes, fd ); // build an RMR mbuf and queue river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator } else { if( !(river->flags & RF_NOTIFIED) ) { diff --git a/src/rmr/si/src/rmr_si.c b/src/rmr/si/src/rmr_si.c index 7fa066b..0a4f806 100644 --- a/src/rmr/si/src/rmr_si.c +++ b/src/rmr/si/src/rmr_si.c @@ -185,7 +185,10 @@ extern void rmr_free_msg( rmr_mbuf_t* mbuf ) { if( !mbuf->ring || ! uta_ring_insert( mbuf->ring, mbuf ) ) { // just queue, free if ring is full if( mbuf->tp_buf ) { free( mbuf->tp_buf ); + mbuf->tp_buf = NULL; // just in case user tries to reuse this mbuf; this will be an NPE } + + mbuf->cookie = 0; // should signal a bad mbuf (if not reallocated) free( mbuf ); } } @@ -549,7 +552,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { rmr_set_vlevel( RMR_VL_INFO ); // we WILL announce our version etc if( ! announced ) { - rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/e mv=%d flg=%02x (%s %s.%s.%s built: %s)\n", + rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/f mv=%d flg=%02x (%s %s.%s.%s built: %s)\n", RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ ); announced = 1; } @@ -579,7 +582,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { ctx->send_retries = 1; // default is not to sleep at all; RMr will retry about 10K times before returning ctx->d1_len = 4; // data1 space in header -- 4 bytes for now ctx->max_ibm = max_msg_size < 1024 ? 1024 : max_msg_size; // larger than their request doesn't hurt - ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + 64; // add in our header size and a bit of fudge + ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + TP_HDR_LEN + 64; // add in header size, transport hdr, and a bit of fudge ctx->mring = uta_mk_ring( 4096 ); // message ring is always on for si ctx->zcb_mring = uta_mk_ring( 128 ); // zero copy buffer mbuf ring to reduce malloc/free calls diff --git a/src/rmr/si/src/sr_si_static.c b/src/rmr/si/src/sr_si_static.c index 4dd32ea..0b726dc 100644 --- a/src/rmr/si/src/sr_si_static.c +++ b/src/rmr/si/src/sr_si_static.c @@ -143,7 +143,7 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s /* memset( msg->tp_buf, 0, mlen ); // NOT for production (debug only) valgrind will complain about uninitalised use if we don't set - memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 ); // NOT for production -- debugging eyecatcher + memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", TPHDR_LEN ); // NOT for production -- debugging eyecatcher */ alen = (int *) msg->tp_buf; *alen = mlen; // FIX ME: need a stuct to go in these first bytes, not just dummy len @@ -159,6 +159,7 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s //SET_HDR_D2_LEN( hdr, ctx->d2_len ); // future } msg->len = 0; // length of data in the payload + msg->cookie = 0x4942; msg->alloc_len = mlen; // length of allocated transport buffer (caller size + rmr header) msg->sub_id = UNSET_SUBID; msg->mtype = UNSET_MSGTYPE; @@ -197,6 +198,7 @@ static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) { memset( msg, 0, sizeof( *msg ) ); + msg->cookie = 0x4942; msg->sub_id = UNSET_SUBID; msg->mtype = UNSET_MSGTYPE; msg->tp_buf = NULL; @@ -588,6 +590,9 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int r msg->state = RMR_OK; do { tot_len = msg->len + PAYLOAD_OFFSET( hdr ) + TP_HDR_LEN; // we only send what was used + header lengths + if( tot_len > msg->alloc_len ) { + tot_len = msg->alloc_len; // likely bad length from user :( + } *((int*) msg->tp_buf) = tot_len; if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg: ending %d (%x) bytes usr_len=%d alloc=%d retries=%d\n", tot_len, tot_len, msg->len, msg->alloc_len, retries ); -- 2.16.6