Fix crash under SI95 with multiple receive threads 48/2548/2 3.2.2
authorE. Scott Daniels <daniels@research.att.com>
Thu, 20 Feb 2020 19:43:35 +0000 (14:43 -0500)
committerE. Scott Daniels <daniels@research.att.com>
Thu, 20 Feb 2020 20:05:56 +0000 (15:05 -0500)
When the user application was using multiple threads to receive
messages it was alsmost certain to crash with a segfault.
This change corrects a bug in the ring handler which was
allowing a premature release of the ring lock.

A verification cookie has been added to the msg buffer to
facilitate validation of a message buffer when examining
crash output.

Issue-ID: RIC-218

Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: Iaa6519e415dcf7f8c52312e8e02d8cf4dca5866f

CHANGES
CMakeLists.txt
src/rmr/common/include/rmr.h
src/rmr/common/src/ring_static.c
src/rmr/si/src/mt_call_si_static.c
src/rmr/si/src/rmr_si.c
src/rmr/si/src/sr_si_static.c

diff --git a/CHANGES b/CHANGES
index e01ab41..2854cd5 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -2,6 +2,9 @@
 API and build change  and fix summaries. Doc correctsions
 and/or changes are not mentioned here; see the commit messages.
 
+2020 February 20; version 3.2.2
+       Fix receive thread related core dump (ring early unlock).
+
 2020 February 19; version 3.2.1
        Added missing message types (E2-Setup)
 
index b817699..41eee9f 100644 (file)
@@ -38,7 +38,7 @@ cmake_minimum_required( VERSION 3.5 )
 
 set( major_version "3" )               # should be automatically populated from git tag later, but until CI process sets a tag we use this
 set( minor_version "2" )
-set( patch_level "1" )
+set( patch_level "2" )
 
 set( install_root "${CMAKE_INSTALL_PREFIX}" )
 set( install_inc "include/rmr" )
index 88689d5..5d4583a 100644 (file)
@@ -106,6 +106,8 @@ typedef struct {
 
        void*   ring;                           // ring this buffer should be queued back to
        int             rts_fd;                         // SI fd for return to sender
+
+       int             cookie;                         // cookie to detect user misuse of free'd msg
 } rmr_mbuf_t;
 
 
index 37bb4db..b54d815 100644 (file)
@@ -166,6 +166,7 @@ static inline void* uta_ring_extract( void* vr ) {
        ring_t*         r;
        uint16_t        ti;             // real index in data
        int64_t ctr;            // pfd counter
+       void*           data;
 
        if( !RING_FAST ) {                                                              // compiler should drop the conditional when always false
                if( (r = (ring_t*) vr) == NULL ) {
@@ -199,10 +200,14 @@ future -- investigate if it's possible only to set/clear when empty or going to
        }
 */
 
+       data = r->data[ti];                                             // secure data and clear before letting go of the lock
+       r->data[ti] = NULL;
+
        if( r->rgate != NULL ) {                                                        // if locked above...
                pthread_mutex_unlock( r->rgate );
        }
-       return r->data[ti];
+
+       return data;
 }
 
 /*
@@ -232,6 +237,12 @@ static inline int uta_ring_insert( void* vr, void* new_data ) {
                return 0;
        }
 
+       r->data[r->head] = new_data;
+       r->head++;
+       if( r->head >= r->nelements ) {
+               r->head = 0;
+       }
+
        write( r->pfd, &inc, sizeof( inc ) );
 /*
 future -- investigate if it's possible only to set/clear when empty or going to empty
@@ -239,12 +250,6 @@ future -- investigate if it's possible only to set/clear when empty or going to
        }
 */
 
-       r->data[r->head] = new_data;
-       r->head++;
-       if( r->head >= r->nelements ) {
-               r->head = 0;
-       }
-
        if( r->wgate != NULL ) {                                                // if lock exists we must unlock before going
                pthread_mutex_unlock( r->wgate );
        }
index d6279fd..862b554 100644 (file)
@@ -134,7 +134,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
                if( river->state == RS_NEW ) {
                        memset( river, 0, sizeof( *river ) );
                        //river->nbytes = sizeof( char ) * (8 * 1024);
-                       river->nbytes = sizeof( char ) * ctx->max_ibm;                  // max inbound message size
+                       river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024);         // max inbound message size
                        river->accum = (char *) malloc( river->nbytes );
                        river->ipt = 0;
                } else {
@@ -194,7 +194,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
                        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain );
                        if( (river->flags & RF_DROP) == 0  ) {
                                memcpy( &river->accum[river->ipt], buf+bidx, need );                            // grab just what is needed (might be more)
-                               buf2mbuf( ctx, river->accum, river->msg_size, fd );                                     // build an RMR mbuf and queue
+                               buf2mbuf( ctx, river->accum, river->nbytes, fd );                                       // build an RMR mbuf and queue
                                river->accum = (char *) malloc( sizeof( char ) *  river->nbytes );      // fresh accumulator
                        } else {
                                if( !(river->flags & RF_NOTIFIED) ) {   
index 7fa066b..0a4f806 100644 (file)
@@ -185,7 +185,10 @@ extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
        if( !mbuf->ring || ! uta_ring_insert( mbuf->ring, mbuf ) ) {                    // just queue, free if ring is full
                if( mbuf->tp_buf ) {
                        free( mbuf->tp_buf );
+                       mbuf->tp_buf = NULL;            // just in case user tries to reuse this mbuf; this will be an NPE
                }
+
+               mbuf->cookie = 0;                       // should signal a bad mbuf (if not reallocated)
                free( mbuf );
        }
 }
@@ -549,7 +552,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        rmr_set_vlevel( RMR_VL_INFO );          // we WILL announce our version etc
 
        if( ! announced ) {
-               rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/e mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
+               rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/f mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
                        RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
                announced = 1;
        }
@@ -579,7 +582,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
        ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
        ctx->max_ibm = max_msg_size < 1024 ? 1024 : max_msg_size;                                       // larger than their request doesn't hurt
-       ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + 64;          // add in our header size and a bit of fudge
+       ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + TP_HDR_LEN + 64;             // add in header size, transport hdr, and a bit of fudge
 
        ctx->mring = uta_mk_ring( 4096 );                               // message ring is always on for si
        ctx->zcb_mring = uta_mk_ring( 128 );                    // zero copy buffer mbuf ring to reduce malloc/free calls
index 4dd32ea..0b726dc 100644 (file)
@@ -143,7 +143,7 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
 
 /*
        memset( msg->tp_buf, 0, mlen );    // NOT for production (debug only)   valgrind will complain about uninitalised use if we don't set
-       memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 );                // NOT for production -- debugging eyecatcher
+       memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", TPHDR_LEN );         // NOT for production -- debugging eyecatcher
 */
        alen = (int *) msg->tp_buf;
        *alen = mlen;                                           // FIX ME: need a stuct to go in these first bytes, not just dummy len
@@ -159,6 +159,7 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
                //SET_HDR_D2_LEN( hdr, ctx->d2_len );                           // future
        }
        msg->len = 0;                                                                                   // length of data in the payload
+       msg->cookie = 0x4942;
        msg->alloc_len = mlen;                                                                  // length of allocated transport buffer (caller size + rmr header)
        msg->sub_id = UNSET_SUBID;
        msg->mtype = UNSET_MSGTYPE;
@@ -197,6 +198,7 @@ static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
 
        memset( msg, 0, sizeof( *msg ) );
 
+       msg->cookie = 0x4942;
        msg->sub_id = UNSET_SUBID;
        msg->mtype = UNSET_MSGTYPE;
        msg->tp_buf = NULL;
@@ -588,6 +590,9 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int r
        msg->state = RMR_OK;
        do {
                tot_len = msg->len + PAYLOAD_OFFSET( hdr ) + TP_HDR_LEN;                        // we only send what was used + header lengths
+               if( tot_len > msg->alloc_len ) {
+                       tot_len = msg->alloc_len;                                                                       // likely bad length from user :(
+               }
                *((int*) msg->tp_buf) = tot_len;
 
                if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg: ending %d (%x) bytes  usr_len=%d alloc=%d retries=%d\n", tot_len, tot_len, msg->len, msg->alloc_len, retries );