The message allocation when pulling a buffer from the free pool could
potentially fail to own the lock. This was causing core dumps for
multi-threaded processes (e.g. those using the Go wrapper).
Version bump to 4.5.0 to move away from the previous release of
4.4.*.
Issue-ID: RIC-732
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: I9abc7f64391d0292c5ba1323713040faedb569a1
(cherry picked from commit
ce1c741c01e8387cb095dac5e36a4d8ad91d006d)
# core RMR code and doc. Other change files exist for other
# things.
# core RMR code and doc. Other change files exist for other
# things.
-# API and build change and fix summaries. Doc correctsions
+# API and build change and fix summaries. Doc corrections
# and/or changes are not mentioned here; see the commit messages.
# and/or changes are not mentioned here; see the commit messages.
+2021 January 8; Version 4.5.0
+ Version bump for next release tracking.
+ Corrected a potential locking issue in message allocation. (RIC-732)
+
2020 December 4; Version 4.4.6
Correct a range check bug when cloning a route table in prep
to load a new one. (RIC-720)
2020 December 4; Version 4.4.6
Correct a range check bug when cloning a route table in prep
to load a new one. (RIC-720)
cmake_minimum_required( VERSION 3.5 )
set( major_version "4" ) # should be automatically populated from git tag later, but until CI process sets a tag we use this
cmake_minimum_required( VERSION 3.5 )
set( major_version "4" ) # should be automatically populated from git tag later, but until CI process sets a tag we use this
-set( minor_version "4" )
-set( patch_level "6" )
+set( minor_version "5" )
+set( patch_level "0" )
set( install_root "${CMAKE_INSTALL_PREFIX}" )
set( install_inc "include/rmr" )
set( install_root "${CMAKE_INSTALL_PREFIX}" )
set( install_inc "include/rmr" )
version 4.0.0, the RMR versions should no longer skip.
version 4.0.0, the RMR versions should no longer skip.
+2021 January 8; Version 4.5.0
+-----------------------------
+
+Version bump for next release tracking. Corrected a potential
+locking issue in message allocation. (RIC-732)
+
+
+
Cherry Release
==============
Cherry Release
==============
#define RING_NONE 0 // no options
#define RING_RLOCK 0x01 // create/destroy the read lock on the ring
#define RING_WLOCK 0x02 // create/destroy the write lockk on the ring
#define RING_NONE 0 // no options
#define RING_RLOCK 0x01 // create/destroy the read lock on the ring
#define RING_WLOCK 0x02 // create/destroy the write lockk on the ring
+#define RING_FRLOCK 0x04 // read locking with no wait if locked option
+
+ // flag values
+#define RING_FL_FLOCK 0x01 // fast read lock (don't wait if locked when reading)
typedef struct ring {
uint16_t head; // index of the head of the ring (insert point)
typedef struct ring {
uint16_t head; // index of the head of the ring (insert point)
uint16_t nelements; // number of elements in the ring
void** data; // the ring data (pointers to blobs of stuff)
int pfd; // event fd for the ring for epoll
uint16_t nelements; // number of elements in the ring
void** data; // the ring data (pointers to blobs of stuff)
int pfd; // event fd for the ring for epoll
+ int flags; // RING_FL_* constants
pthread_mutex_t* rgate; // read lock if used
pthread_mutex_t* wgate; // write lock if used
} ring_t;
pthread_mutex_t* rgate; // read lock if used
pthread_mutex_t* wgate; // write lock if used
} ring_t;
if( (data = getenv( ENV_LOG_VLEVEL )) != NULL ) {
log_vlevel = atoi( data );
if( (data = getenv( ENV_LOG_VLEVEL )) != NULL ) {
log_vlevel = atoi( data );
- if( log_vlevel < 0 ) {
- log_vlevel = 0;
+ if( log_vlevel < -1 ) { // allow housekeeping stats to be squelched with -1
+ log_vlevel = -1;
} else {
if( log_vlevel > RMR_VL_DEBUG ) {
log_vlevel = RMR_VL_DEBUG;
} else {
if( log_vlevel > RMR_VL_DEBUG ) {
log_vlevel = RMR_VL_DEBUG;
r->rgate = NULL;
r->wgate = NULL;
r->head = r->tail = 0;
r->rgate = NULL;
r->wgate = NULL;
r->head = r->tail = 0;
- if( options & RING_RLOCK ) {
+ if( options & (RING_RLOCK | RING_FRLOCK) ) { // read locking
if( r->rgate == NULL ) { // don't realloc
r->rgate = (pthread_mutex_t *) malloc( sizeof( *r->rgate ) );
if( r->rgate == NULL ) {
if( r->rgate == NULL ) { // don't realloc
r->rgate = (pthread_mutex_t *) malloc( sizeof( *r->rgate ) );
if( r->rgate == NULL ) {
pthread_mutex_init( r->rgate, NULL );
}
pthread_mutex_init( r->rgate, NULL );
}
+ if( options & RING_FRLOCK ) {
+ r->flags |= RING_FL_FLOCK;
+ }
- if( r->rgate != NULL ) { // if lock exists we must honour it
- pthread_mutex_lock( r->rgate );
+ if( r->rgate != NULL ) { // if lock exists we must honour it
+ if( r->flags & RING_FL_FLOCK ) { // fast read locking try once and return nil if we cant lock
+ if( pthread_mutex_trylock( r->rgate ) != 0 ) { // quick fail if not able to get a lock
+ return NULL;
+ }
+ } else {
+ if( pthread_mutex_lock( r->rgate ) != 0 ) {
+ return NULL;
+ }
+ }
if( r->tail == r->head ) { // ensure ring didn't go empty while waiting
pthread_mutex_unlock( r->rgate );
return NULL;
if( r->tail == r->head ) { // ensure ring didn't go empty while waiting
pthread_mutex_unlock( r->rgate );
return NULL;
/*
Insert the pointer at the next open space in the ring.
/*
Insert the pointer at the next open space in the ring.
- Returns 1 if the inert was ok, and 0 if the ring is full.
+ Returns 1 if the inert was ok, and 0 if there is an error;
+ errno will be set to EXFULL if the ring is full, if the attempt
+ fails with anyt other error that indicates the inability to obtain
+ a lock on the ring.
*/
static inline int uta_ring_insert( void* vr, void* new_data ) {
ring_t* r;
*/
static inline int uta_ring_insert( void* vr, void* new_data ) {
ring_t* r;
}
if( r->wgate != NULL ) { // if lock exists we must honour it
}
if( r->wgate != NULL ) { // if lock exists we must honour it
- pthread_mutex_lock( r->wgate );
+ if( pthread_mutex_lock( r->wgate ) != 0 ) {
+ return 0; // leave mutex reason in place
+ }
}
if( r->head+1 == r->tail || (r->head+1 >= r->nelements && !r->tail) ) { // ring is full
}
if( r->head+1 == r->tail || (r->head+1 >= r->nelements && !r->tail) ) { // ring is full
- if( r->wgate != NULL ) { // ensure released if needed
+ if( r->wgate != NULL ) { // ensure released if needed
pthread_mutex_unlock( r->wgate );
}
pthread_mutex_unlock( r->wgate );
}
if( river->state != RS_GOOD ) { // all states which aren't good require reset first
if( river->state == RS_NEW ) {
memset( river, 0, sizeof( *river ) );
if( river->state != RS_GOOD ) { // all states which aren't good require reset first
if( river->state == RS_NEW ) {
memset( river, 0, sizeof( *river ) );
- river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // max inbound message size
+ river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // start with what user said would be the "normal" max inbound msg size
river->accum = (char *) malloc( river->nbytes );
river->ipt = 0;
} else {
river->accum = (char *) malloc( river->nbytes );
river->ipt = 0;
} else {
while( remain > 0 ) { // until we've done something with all bytes passed in
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
while( remain > 0 ) { // until we've done something with all bytes passed in
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
- if( river->msg_size <= 0 ) { // don't have a size yet
+ if( river->msg_size <= 0 ) { // don't have a message length yet
// FIX ME: we need a frame indicator to ensure alignment
// FIX ME: we need a frame indicator to ensure alignment
- need = TP_SZFIELD_LEN - river->ipt; // what we need to compute length
- if( need > remain ) { // the whole size isn't there
+ need = TP_SZFIELD_LEN - river->ipt; // what we need to compute the total message length
+ if( need > remain ) { // the whole message len information isn't in this transport buf
if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
memcpy( &river->accum[river->ipt], buf+bidx, remain ); // grab what we can and depart
river->ipt += remain;
if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
memcpy( &river->accum[river->ipt], buf+bidx, remain ); // grab what we can and depart
river->ipt += remain;
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size );
if( river->msg_size > river->nbytes ) { // message bigger than app max size; grab huge buffer
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size );
if( river->msg_size > river->nbytes ) { // message bigger than app max size; grab huge buffer
- //river->flags |= RF_DROP;
+ //river->flags |= RF_DROP; // uncomment to drop large messages
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "received message is huge (%d) reallocating buffer\n", river->msg_size );
old_accum = river->accum; // need to copy any bytes we snarfed getting the size, so hold
river->nbytes = river->msg_size + 128; // buffer large enough with a bit of fudge room
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "received message is huge (%d) reallocating buffer\n", river->msg_size );
old_accum = river->accum; // need to copy any bytes we snarfed getting the size, so hold
river->nbytes = river->msg_size + 128; // buffer large enough with a bit of fudge room
- if( river->msg_size > (river->ipt + remain) ) { // need more than is left in buffer
+ if( river->msg_size > (river->ipt + remain) ) { // need more than is left in receive buffer
if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
- if( (river->flags & RF_DROP) == 0 ) {
- memcpy( &river->accum[river->ipt], buf+bidx, remain ); // buffer and go wait for more
+ if( (river->flags & RF_DROP) == 0 ) { // ok to keep this message; copy bytes
+ memcpy( &river->accum[river->ipt], buf+bidx, remain ); // grab what is in the rcv buffer and go wait for more
}
river->ipt += remain;
remain = 0;
} else {
need = river->msg_size - river->ipt; // bytes from transport we need to have complete message
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d flgs=%02x\n", river->msg_size, need, remain, river->flags );
}
river->ipt += remain;
remain = 0;
} else {
need = river->msg_size - river->ipt; // bytes from transport we need to have complete message
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d flgs=%02x\n", river->msg_size, need, remain, river->flags );
- if( (river->flags & RF_DROP) == 0 ) {
+ if( (river->flags & RF_DROP) == 0 ) { // keeping this message, copy and pass it on
memcpy( &river->accum[river->ipt], buf+bidx, need ); // grab just what is needed (might be more)
buf2mbuf( ctx, river->accum, river->nbytes, fd ); // build an RMR mbuf and queue
river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // prevent huge size from persisting
river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator
} else {
memcpy( &river->accum[river->ipt], buf+bidx, need ); // grab just what is needed (might be more)
buf2mbuf( ctx, river->accum, river->nbytes, fd ); // build an RMR mbuf and queue
river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // prevent huge size from persisting
river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator
} else {
- if( !(river->flags & RF_NOTIFIED) ) {
+ if( !(river->flags & RF_NOTIFIED) ) { // not keeping huge messages; notify once per stream
rmr_vlog( RMR_VL_WARN, "message larger than allocated buffer (%d) arrived on fd %d\n", river->nbytes, fd );
river->flags |= RF_NOTIFIED;
}
rmr_vlog( RMR_VL_WARN, "message larger than allocated buffer (%d) arrived on fd %d\n", river->nbytes, fd );
river->flags |= RF_NOTIFIED;
}
if( ! (flags & RMRFL_NOLOCK) ) { // user did not specifically ask that it be off; turn it on
uta_ring_config( ctx->mring, RING_RLOCK ); // concurrent rcv calls require read lock
uta_ring_config( ctx->zcb_mring, RING_WLOCK ); // concurrent free calls from userland require write lock
if( ! (flags & RMRFL_NOLOCK) ) { // user did not specifically ask that it be off; turn it on
uta_ring_config( ctx->mring, RING_RLOCK ); // concurrent rcv calls require read lock
uta_ring_config( ctx->zcb_mring, RING_WLOCK ); // concurrent free calls from userland require write lock
+ uta_ring_config( ctx->zcb_mring, RING_FRLOCK ); // concurrent message allocatieon calls from userland require read lock, but can be fast
} else {
rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" );
}
} else {
rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" );
}