Correct potenital locking issue in msg allocation 99/5399/1
authorE. Scott Daniels <daniels@research.att.com>
Fri, 8 Jan 2021 18:57:01 +0000 (13:57 -0500)
committerE. Scott Daniels <daniels@research.att.com>
Fri, 8 Jan 2021 18:57:01 +0000 (13:57 -0500)
The message allocation when pulling a buffer from the free pool could
potentially fail to own the lock. This was causing core dumps for
multi-threaded processes (e.g. those using the Go wrapper).

Version bump to 4.5.0 to move away from the previous release of
4.4.*.

Issue-ID: RIC-732

Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: I9abc7f64391d0292c5ba1323713040faedb569a1

CHANGES_CORE.txt
CMakeLists.txt
docs/rel-notes.rst
src/rmr/common/include/rmr_agnostic.h
src/rmr/common/src/logging.c
src/rmr/common/src/ring_static.c
src/rmr/si/src/mt_call_si_static.c
src/rmr/si/src/rmr_si.c

index d5fafcd..5b998d9 100644 (file)
@@ -2,9 +2,13 @@
 # core RMR code and doc.  Other change files exist for other
 # things.
 
-# API and build change  and fix summaries. Doc correctsions
+# API and build change  and fix summaries. Doc corrections
 # and/or changes are not mentioned here; see the commit messages.
 
+2021 January 8; Version 4.5.0
+       Version bump for next release tracking.
+       Corrected a potential locking issue in message allocation. (RIC-732)
+
 2020 December 4; Version 4.4.6
        Correct a range check bug when cloning a route table in prep
        to load a new one. (RIC-720)
index 8febe4c..7f149d0 100644 (file)
@@ -40,8 +40,8 @@ project( rmr LANGUAGES C )
 cmake_minimum_required( VERSION 3.5 )
 
 set( major_version "4" )               # should be automatically populated from git tag later, but until CI process sets a tag we use this
-set( minor_version "4" )
-set( patch_level "6" )
+set( minor_version "5" )
+set( patch_level "0" )
 
 set( install_root "${CMAKE_INSTALL_PREFIX}" )
 set( install_inc "include/rmr" )
index 508708f..3b2e46f 100644 (file)
@@ -22,6 +22,14 @@ the need to leap frog versions ceased, and beginning with
 version 4.0.0, the RMR versions should no longer skip.
 
 
+2021 January 8; Version 4.5.0
+-----------------------------
+
+Version bump for next release tracking. Corrected a potential
+locking issue in message allocation. (RIC-732)
+
+
+
 Cherry Release
 ==============
 
index ab718d3..31e2c4d 100644 (file)
@@ -258,6 +258,10 @@ typedef struct {
 #define RING_NONE      0                       // no options
 #define RING_RLOCK     0x01            // create/destroy the read lock on the ring
 #define RING_WLOCK     0x02            // create/destroy the write lockk on the ring
+#define RING_FRLOCK    0x04            // read locking with no wait if locked option
+
+                                                               // flag values
+#define RING_FL_FLOCK  0x01    // fast read lock (don't wait if locked when reading)
 
 typedef struct ring {
        uint16_t head;                          // index of the head of the ring (insert point)
@@ -265,6 +269,7 @@ typedef struct ring {
        uint16_t nelements;                     // number of elements in the ring
        void**  data;                           // the ring data (pointers to blobs of stuff)
        int             pfd;                            // event fd for the ring for epoll
+       int             flags;                          // RING_FL_* constants
        pthread_mutex_t*        rgate;  // read lock if used
        pthread_mutex_t*        wgate;  // write lock if used
 } ring_t;
index 0407eed..56457f7 100644 (file)
@@ -101,8 +101,8 @@ extern int rmr_vlog_init( ) {
 
        if( (data = getenv( ENV_LOG_VLEVEL )) != NULL ) {
                log_vlevel = atoi( data );
-               if( log_vlevel < 0 ) {
-                       log_vlevel = 0;
+               if( log_vlevel < -1 ) {                                                 // allow housekeeping stats to be squelched with -1
+                       log_vlevel = -1;
                } else {
                        if( log_vlevel > RMR_VL_DEBUG ) {
                                log_vlevel = RMR_VL_DEBUG;
index 8e565fa..7170a9d 100644 (file)
@@ -75,6 +75,7 @@ static void* uta_mk_ring( int size ) {
                return NULL;
        }
 
+       r->flags = 0;
        r->rgate = NULL;
        r->wgate = NULL;
        r->head = r->tail = 0;
@@ -124,7 +125,7 @@ static int uta_ring_config( void* vr, int options ) {
                }
        }
 
-       if( options & RING_RLOCK ) {
+       if( options & (RING_RLOCK | RING_FRLOCK) ) {                            // read locking
                if( r->rgate == NULL ) {                // don't realloc
                        r->rgate = (pthread_mutex_t *) malloc( sizeof( *r->rgate ) );
                        if( r->rgate == NULL ) {
@@ -133,6 +134,9 @@ static int uta_ring_config( void* vr, int options ) {
        
                        pthread_mutex_init( r->rgate, NULL );
                }
+               if( options & RING_FRLOCK ) {
+                       r->flags |= RING_FL_FLOCK;
+               }
        }
 
        return 1;
@@ -188,8 +192,16 @@ static inline void* uta_ring_extract( void* vr ) {
                return NULL;
        }
 
-       if( r->rgate != NULL ) {                                                // if lock exists we must honour it
-               pthread_mutex_lock( r->rgate );
+       if( r->rgate != NULL ) {                                                        // if lock exists we must honour it
+               if( r->flags & RING_FL_FLOCK ) {                                // fast read locking try once and return nil if we cant lock
+                       if( pthread_mutex_trylock( r->rgate ) != 0 ) {  // quick fail if not able to get a lock
+                               return NULL;
+                       }
+               } else {
+                       if( pthread_mutex_lock( r->rgate ) != 0 ) {
+                               return NULL;
+                       }
+               }
                if( r->tail == r->head ) {                                      // ensure ring didn't go empty while waiting
                        pthread_mutex_unlock( r->rgate );
                        return NULL;
@@ -219,9 +231,13 @@ future -- investigate if it's possible only to set/clear when empty or going to
        return data;
 }
 
+
 /*
        Insert the pointer at the next open space in the ring.
-       Returns 1 if the inert was ok, and 0 if the ring is full.
+       Returns 1 if the inert was ok, and 0 if there is an error;
+       errno will be set to EXFULL if  the ring is full, if the attempt
+       fails with anyt other error that indicates the inability to obtain
+       a lock on the ring.
 */
 static inline int uta_ring_insert( void* vr, void* new_data ) {
        ring_t*         r;
@@ -236,13 +252,16 @@ static inline int uta_ring_insert( void* vr, void* new_data ) {
        }
 
        if( r->wgate != NULL ) {                                                // if lock exists we must honour it
-               pthread_mutex_lock( r->wgate );
+               if( pthread_mutex_lock( r->wgate ) != 0 ) {
+                       return 0;                                                               // leave mutex reason in place
+               }
        }
 
        if( r->head+1 == r->tail || (r->head+1 >= r->nelements && !r->tail) ) {         // ring is full
-               if( r->wgate != NULL ) {                                        // ensure released if needed
+               if( r->wgate != NULL ) {                                                                                                // ensure released if needed
                        pthread_mutex_unlock( r->wgate );
                }
+               errno = EXFULL;
                return 0;
        }
 
index 76b56b2..1b31bf4 100644 (file)
@@ -178,7 +178,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
        if( river->state != RS_GOOD ) {                         // all states which aren't good require reset first
                if( river->state == RS_NEW ) {
                        memset( river, 0, sizeof( *river ) );
-                       river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024);         // max inbound message size
+                       river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024);         // start with what user said would be the "normal" max inbound msg size
                        river->accum = (char *) malloc( river->nbytes );
                        river->ipt = 0;
                } else {
@@ -192,10 +192,10 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
        while( remain > 0 ) {                                                           // until we've done something with all bytes passed in
                if( DEBUG )  rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
 
-               if( river->msg_size <= 0 ) {                            // don't have a size yet
+               if( river->msg_size <= 0 ) {                            // don't have a message length  yet
                                                                                                        // FIX ME: we need a frame indicator to ensure alignment
-                       need = TP_SZFIELD_LEN - river->ipt;             // what we need to compute length
-                       if( need > remain ) {                                                                           // the whole size isn't there
+                       need = TP_SZFIELD_LEN - river->ipt;             // what we need to compute the total message length
+                       if( need > remain ) {                                                                           // the whole message len  information isn't in this transport buf
                                if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
                                memcpy( &river->accum[river->ipt], buf+bidx, remain );                  // grab what we can and depart
                                river->ipt += remain;
@@ -224,7 +224,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
                        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size );
 
                        if( river->msg_size > river->nbytes ) {                                         // message bigger than app max size; grab huge buffer
-                               //river->flags |= RF_DROP;
+                               //river->flags |= RF_DROP;                                                              //  uncomment to drop large messages
                                if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "received message is huge (%d) reallocating buffer\n", river->msg_size );
                                old_accum = river->accum;                                       // need to copy any bytes we snarfed getting the size, so hold
                                river->nbytes = river->msg_size + 128;                                  // buffer large enough with a bit of fudge room
@@ -237,23 +237,23 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
                        }
                }
 
-               if( river->msg_size > (river->ipt + remain) ) {                                 // need more than is left in buffer
+               if( river->msg_size > (river->ipt + remain) ) {                                 // need more than is left in receive buffer
                        if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
-                       if( (river->flags & RF_DROP) == 0  ) {
-                               memcpy( &river->accum[river->ipt], buf+bidx, remain );          // buffer and go wait for more
+                       if( (river->flags & RF_DROP) == 0  ) {                                                  // ok to keep this message; copy bytes
+                               memcpy( &river->accum[river->ipt], buf+bidx, remain );          // grab what is in the rcv buffer and go wait for more
                        }
                        river->ipt += remain;
                        remain = 0;
                } else {
                        need = river->msg_size - river->ipt;                                            // bytes from transport we need to have complete message
                        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d flgs=%02x\n", river->msg_size, need, remain, river->flags );
-                       if( (river->flags & RF_DROP) == 0  ) {
+                       if( (river->flags & RF_DROP) == 0  ) {                                                                  // keeping this message, copy and pass it on
                                memcpy( &river->accum[river->ipt], buf+bidx, need );                            // grab just what is needed (might be more)
                                buf2mbuf( ctx, river->accum, river->nbytes, fd );                                       // build an RMR mbuf and queue
                                river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024);                         // prevent huge size from persisting
                                river->accum = (char *) malloc( sizeof( char ) *  river->nbytes );      // fresh accumulator
                        } else {
-                               if( !(river->flags & RF_NOTIFIED) ) {
+                               if( !(river->flags & RF_NOTIFIED) ) {                                                           // not keeping huge messages; notify once per stream
                                        rmr_vlog( RMR_VL_WARN, "message larger than allocated buffer (%d) arrived on fd %d\n", river->nbytes, fd );
                                        river->flags |= RF_NOTIFIED;
                                }
index 5aaa778..80d7408 100644 (file)
@@ -628,6 +628,7 @@ static void* init(  char* uproto_port, int def_msg_size, int flags ) {
        if( ! (flags & RMRFL_NOLOCK) ) {                                // user did not specifically ask that it be off; turn it on
                uta_ring_config( ctx->mring, RING_RLOCK );                      // concurrent rcv calls require read lock
                uta_ring_config( ctx->zcb_mring, RING_WLOCK );          // concurrent free calls from userland require write lock
+               uta_ring_config( ctx->zcb_mring, RING_FRLOCK );         // concurrent message allocatieon calls from userland require read lock, but can be fast
        } else {
                rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" );
        }