From ce1c741c01e8387cb095dac5e36a4d8ad91d006d Mon Sep 17 00:00:00 2001 From: "E. Scott Daniels" Date: Fri, 8 Jan 2021 13:57:01 -0500 Subject: [PATCH] Correct potenital locking issue in msg allocation The message allocation when pulling a buffer from the free pool could potentially fail to own the lock. This was causing core dumps for multi-threaded processes (e.g. those using the Go wrapper). Version bump to 4.5.0 to move away from the previous release of 4.4.*. Issue-ID: RIC-732 Signed-off-by: E. Scott Daniels Change-Id: I9abc7f64391d0292c5ba1323713040faedb569a1 --- CHANGES_CORE.txt | 6 +++++- CMakeLists.txt | 4 ++-- docs/rel-notes.rst | 8 ++++++++ src/rmr/common/include/rmr_agnostic.h | 5 +++++ src/rmr/common/src/logging.c | 4 ++-- src/rmr/common/src/ring_static.c | 31 +++++++++++++++++++++++++------ src/rmr/si/src/mt_call_si_static.c | 20 ++++++++++---------- src/rmr/si/src/rmr_si.c | 1 + 8 files changed, 58 insertions(+), 21 deletions(-) diff --git a/CHANGES_CORE.txt b/CHANGES_CORE.txt index d5fafcd..5b998d9 100644 --- a/CHANGES_CORE.txt +++ b/CHANGES_CORE.txt @@ -2,9 +2,13 @@ # core RMR code and doc. Other change files exist for other # things. -# API and build change and fix summaries. Doc correctsions +# API and build change and fix summaries. Doc corrections # and/or changes are not mentioned here; see the commit messages. +2021 January 8; Version 4.5.0 + Version bump for next release tracking. + Corrected a potential locking issue in message allocation. (RIC-732) + 2020 December 4; Version 4.4.6 Correct a range check bug when cloning a route table in prep to load a new one. (RIC-720) diff --git a/CMakeLists.txt b/CMakeLists.txt index 8febe4c..7f149d0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -40,8 +40,8 @@ project( rmr LANGUAGES C ) cmake_minimum_required( VERSION 3.5 ) set( major_version "4" ) # should be automatically populated from git tag later, but until CI process sets a tag we use this -set( minor_version "4" ) -set( patch_level "6" ) +set( minor_version "5" ) +set( patch_level "0" ) set( install_root "${CMAKE_INSTALL_PREFIX}" ) set( install_inc "include/rmr" ) diff --git a/docs/rel-notes.rst b/docs/rel-notes.rst index 508708f..3b2e46f 100644 --- a/docs/rel-notes.rst +++ b/docs/rel-notes.rst @@ -22,6 +22,14 @@ the need to leap frog versions ceased, and beginning with version 4.0.0, the RMR versions should no longer skip. +2021 January 8; Version 4.5.0 +----------------------------- + +Version bump for next release tracking. Corrected a potential +locking issue in message allocation. (RIC-732) + + + Cherry Release ============== diff --git a/src/rmr/common/include/rmr_agnostic.h b/src/rmr/common/include/rmr_agnostic.h index ab718d3..31e2c4d 100644 --- a/src/rmr/common/include/rmr_agnostic.h +++ b/src/rmr/common/include/rmr_agnostic.h @@ -258,6 +258,10 @@ typedef struct { #define RING_NONE 0 // no options #define RING_RLOCK 0x01 // create/destroy the read lock on the ring #define RING_WLOCK 0x02 // create/destroy the write lockk on the ring +#define RING_FRLOCK 0x04 // read locking with no wait if locked option + + // flag values +#define RING_FL_FLOCK 0x01 // fast read lock (don't wait if locked when reading) typedef struct ring { uint16_t head; // index of the head of the ring (insert point) @@ -265,6 +269,7 @@ typedef struct ring { uint16_t nelements; // number of elements in the ring void** data; // the ring data (pointers to blobs of stuff) int pfd; // event fd for the ring for epoll + int flags; // RING_FL_* constants pthread_mutex_t* rgate; // read lock if used pthread_mutex_t* wgate; // write lock if used } ring_t; diff --git a/src/rmr/common/src/logging.c b/src/rmr/common/src/logging.c index 0407eed..56457f7 100644 --- a/src/rmr/common/src/logging.c +++ b/src/rmr/common/src/logging.c @@ -101,8 +101,8 @@ extern int rmr_vlog_init( ) { if( (data = getenv( ENV_LOG_VLEVEL )) != NULL ) { log_vlevel = atoi( data ); - if( log_vlevel < 0 ) { - log_vlevel = 0; + if( log_vlevel < -1 ) { // allow housekeeping stats to be squelched with -1 + log_vlevel = -1; } else { if( log_vlevel > RMR_VL_DEBUG ) { log_vlevel = RMR_VL_DEBUG; diff --git a/src/rmr/common/src/ring_static.c b/src/rmr/common/src/ring_static.c index 8e565fa..7170a9d 100644 --- a/src/rmr/common/src/ring_static.c +++ b/src/rmr/common/src/ring_static.c @@ -75,6 +75,7 @@ static void* uta_mk_ring( int size ) { return NULL; } + r->flags = 0; r->rgate = NULL; r->wgate = NULL; r->head = r->tail = 0; @@ -124,7 +125,7 @@ static int uta_ring_config( void* vr, int options ) { } } - if( options & RING_RLOCK ) { + if( options & (RING_RLOCK | RING_FRLOCK) ) { // read locking if( r->rgate == NULL ) { // don't realloc r->rgate = (pthread_mutex_t *) malloc( sizeof( *r->rgate ) ); if( r->rgate == NULL ) { @@ -133,6 +134,9 @@ static int uta_ring_config( void* vr, int options ) { pthread_mutex_init( r->rgate, NULL ); } + if( options & RING_FRLOCK ) { + r->flags |= RING_FL_FLOCK; + } } return 1; @@ -188,8 +192,16 @@ static inline void* uta_ring_extract( void* vr ) { return NULL; } - if( r->rgate != NULL ) { // if lock exists we must honour it - pthread_mutex_lock( r->rgate ); + if( r->rgate != NULL ) { // if lock exists we must honour it + if( r->flags & RING_FL_FLOCK ) { // fast read locking try once and return nil if we cant lock + if( pthread_mutex_trylock( r->rgate ) != 0 ) { // quick fail if not able to get a lock + return NULL; + } + } else { + if( pthread_mutex_lock( r->rgate ) != 0 ) { + return NULL; + } + } if( r->tail == r->head ) { // ensure ring didn't go empty while waiting pthread_mutex_unlock( r->rgate ); return NULL; @@ -219,9 +231,13 @@ future -- investigate if it's possible only to set/clear when empty or going to return data; } + /* Insert the pointer at the next open space in the ring. - Returns 1 if the inert was ok, and 0 if the ring is full. + Returns 1 if the inert was ok, and 0 if there is an error; + errno will be set to EXFULL if the ring is full, if the attempt + fails with anyt other error that indicates the inability to obtain + a lock on the ring. */ static inline int uta_ring_insert( void* vr, void* new_data ) { ring_t* r; @@ -236,13 +252,16 @@ static inline int uta_ring_insert( void* vr, void* new_data ) { } if( r->wgate != NULL ) { // if lock exists we must honour it - pthread_mutex_lock( r->wgate ); + if( pthread_mutex_lock( r->wgate ) != 0 ) { + return 0; // leave mutex reason in place + } } if( r->head+1 == r->tail || (r->head+1 >= r->nelements && !r->tail) ) { // ring is full - if( r->wgate != NULL ) { // ensure released if needed + if( r->wgate != NULL ) { // ensure released if needed pthread_mutex_unlock( r->wgate ); } + errno = EXFULL; return 0; } diff --git a/src/rmr/si/src/mt_call_si_static.c b/src/rmr/si/src/mt_call_si_static.c index 76b56b2..1b31bf4 100644 --- a/src/rmr/si/src/mt_call_si_static.c +++ b/src/rmr/si/src/mt_call_si_static.c @@ -178,7 +178,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { if( river->state != RS_GOOD ) { // all states which aren't good require reset first if( river->state == RS_NEW ) { memset( river, 0, sizeof( *river ) ); - river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // max inbound message size + river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // start with what user said would be the "normal" max inbound msg size river->accum = (char *) malloc( river->nbytes ); river->ipt = 0; } else { @@ -192,10 +192,10 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { while( remain > 0 ) { // until we've done something with all bytes passed in if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain ); - if( river->msg_size <= 0 ) { // don't have a size yet + if( river->msg_size <= 0 ) { // don't have a message length yet // FIX ME: we need a frame indicator to ensure alignment - need = TP_SZFIELD_LEN - river->ipt; // what we need to compute length - if( need > remain ) { // the whole size isn't there + need = TP_SZFIELD_LEN - river->ipt; // what we need to compute the total message length + if( need > remain ) { // the whole message len information isn't in this transport buf if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt ); memcpy( &river->accum[river->ipt], buf+bidx, remain ); // grab what we can and depart river->ipt += remain; @@ -224,7 +224,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size ); if( river->msg_size > river->nbytes ) { // message bigger than app max size; grab huge buffer - //river->flags |= RF_DROP; + //river->flags |= RF_DROP; // uncomment to drop large messages if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "received message is huge (%d) reallocating buffer\n", river->msg_size ); old_accum = river->accum; // need to copy any bytes we snarfed getting the size, so hold river->nbytes = river->msg_size + 128; // buffer large enough with a bit of fudge room @@ -237,23 +237,23 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { } } - if( river->msg_size > (river->ipt + remain) ) { // need more than is left in buffer + if( river->msg_size > (river->ipt + remain) ) { // need more than is left in receive buffer if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain ); - if( (river->flags & RF_DROP) == 0 ) { - memcpy( &river->accum[river->ipt], buf+bidx, remain ); // buffer and go wait for more + if( (river->flags & RF_DROP) == 0 ) { // ok to keep this message; copy bytes + memcpy( &river->accum[river->ipt], buf+bidx, remain ); // grab what is in the rcv buffer and go wait for more } river->ipt += remain; remain = 0; } else { need = river->msg_size - river->ipt; // bytes from transport we need to have complete message if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d flgs=%02x\n", river->msg_size, need, remain, river->flags ); - if( (river->flags & RF_DROP) == 0 ) { + if( (river->flags & RF_DROP) == 0 ) { // keeping this message, copy and pass it on memcpy( &river->accum[river->ipt], buf+bidx, need ); // grab just what is needed (might be more) buf2mbuf( ctx, river->accum, river->nbytes, fd ); // build an RMR mbuf and queue river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // prevent huge size from persisting river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator } else { - if( !(river->flags & RF_NOTIFIED) ) { + if( !(river->flags & RF_NOTIFIED) ) { // not keeping huge messages; notify once per stream rmr_vlog( RMR_VL_WARN, "message larger than allocated buffer (%d) arrived on fd %d\n", river->nbytes, fd ); river->flags |= RF_NOTIFIED; } diff --git a/src/rmr/si/src/rmr_si.c b/src/rmr/si/src/rmr_si.c index 5aaa778..80d7408 100644 --- a/src/rmr/si/src/rmr_si.c +++ b/src/rmr/si/src/rmr_si.c @@ -628,6 +628,7 @@ static void* init( char* uproto_port, int def_msg_size, int flags ) { if( ! (flags & RMRFL_NOLOCK) ) { // user did not specifically ask that it be off; turn it on uta_ring_config( ctx->mring, RING_RLOCK ); // concurrent rcv calls require read lock uta_ring_config( ctx->zcb_mring, RING_WLOCK ); // concurrent free calls from userland require write lock + uta_ring_config( ctx->zcb_mring, RING_FRLOCK ); // concurrent message allocatieon calls from userland require read lock, but can be fast } else { rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" ); } -- 2.16.6