From c1f84f8a4a4e2b90ad9ec18aba2b5365d3e51386 Mon Sep 17 00:00:00 2001 From: "E. Scott Daniels" Date: Wed, 22 Jan 2020 12:31:07 -0500 Subject: [PATCH] Enable multi-thread receiver support This change implements locking on the receive and free message rings such that multiple user threads can concurrently invoke rmr receive functions. Signed-off-by: E. Scott Daniels Change-Id: If012c5699e071f1d85f604c79baf8c4e8b77e94a --- CHANGES | 3 ++ CMakeLists.txt | 2 +- src/rmr/common/include/rmr.h | 1 + src/rmr/common/include/rmr_agnostic.h | 7 ++++ src/rmr/common/src/ring_static.c | 78 ++++++++++++++++++++++++++++++++++- src/rmr/si/src/rmr_si.c | 9 +++- test/ring_static_test.c | 17 +++++++- 7 files changed, 112 insertions(+), 5 deletions(-) diff --git a/CHANGES b/CHANGES index 976169f..f5d970d 100644 --- a/CHANGES +++ b/CHANGES @@ -2,6 +2,9 @@ API and build change and fix summaries. Doc correctsions and/or changes are not mentioned here; see the commit messages. +2020 January 22; verison 3.0.3 + Enable thread support for multiple receive threads. + 2020 January 21; verison 3.0.2 Fix bug in SI95 (missing reallocate payload function). diff --git a/CMakeLists.txt b/CMakeLists.txt index ef8308b..2714703 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -38,7 +38,7 @@ cmake_minimum_required( VERSION 3.5 ) set( major_version "3" ) # should be automatically populated from git tag later, but until CI process sets a tag we use this set( minor_version "0" ) -set( patch_level "2" ) +set( patch_level "3" ) set( install_root "${CMAKE_INSTALL_PREFIX}" ) set( install_inc "include/rmr" ) diff --git a/src/rmr/common/include/rmr.h b/src/rmr/common/include/rmr.h index e3d08d9..a9805a8 100644 --- a/src/rmr/common/include/rmr.h +++ b/src/rmr/common/include/rmr.h @@ -47,6 +47,7 @@ extern "C" { #define RMRFL_MTCALL 0x02 // set up multi-threaded call support (rmr_init) #define RMRFL_AUTO_ALLOC 0x03 // send auto allocates a zerocopy buffer #define RMRFL_NAME_ONLY 0x04 // only the hostname:ip is provided as source information for rts() calls +#define RMRFL_NOLOCK 0x08 // disable receive ring locking (user app ensures single thread or provides collision protection) #define RMR_DEF_SIZE 0 // pass as size to have msg allocation use the default msg size diff --git a/src/rmr/common/include/rmr_agnostic.h b/src/rmr/common/include/rmr_agnostic.h index 86f4660..6ef3aee 100644 --- a/src/rmr/common/include/rmr_agnostic.h +++ b/src/rmr/common/include/rmr_agnostic.h @@ -238,12 +238,18 @@ typedef struct { // --------------- ring things ------------------------------------------------- +#define RING_NONE 0 // no options +#define RING_RLOCK 0x01 // create/destroy the read lock on the ring +#define RING_WLOCK 0x02 // create/destroy the write lockk on the ring + typedef struct ring { uint16_t head; // index of the head of the ring (insert point) uint16_t tail; // index of the tail (extract point) uint16_t nelements; // number of elements in the ring void** data; // the ring data (pointers to blobs of stuff) int pfd; // event fd for the ring for epoll + pthread_mutex_t* rgate; // read lock if used + pthread_mutex_t* wgate; // write lock if used } ring_t; @@ -273,6 +279,7 @@ static char* get_default_ip( if_addrs_t* iplist ); // --- message ring -------------------------- static void* uta_mk_ring( int size ); +static int uta_ring_config( void* vr, int options ); static void uta_ring_free( void* vr ); static inline void* uta_ring_extract( void* vr ); static inline int uta_ring_insert( void* vr, void* new_data ); diff --git a/src/rmr/common/src/ring_static.c b/src/rmr/common/src/ring_static.c index 93dfdb7..37bb4db 100644 --- a/src/rmr/common/src/ring_static.c +++ b/src/rmr/common/src/ring_static.c @@ -62,7 +62,10 @@ static int uta_ring_getpfd( void* vr ) { } /* - Make a new ring. + Make a new ring. The default is to NOT create a lock; if the user + wants read locking then uta_config_ring() can be used to setup the + mutex. (We use several rings internally and the assumption is that + there is no locking for these.) */ static void* uta_mk_ring( int size ) { ring_t* r; @@ -72,6 +75,8 @@ static void* uta_mk_ring( int size ) { return NULL; } + r->rgate = NULL; + r->wgate = NULL; r->head = r->tail = 0; max = (r->head - 1); @@ -90,6 +95,49 @@ static void* uta_mk_ring( int size ) { return (void *) r; } +/* + Allows for configuration of a ring after it has been allocated. + Options are RING_* options that allow for things like setting/clearing + read locking. Returns 0 for failure 1 on success. + + Options can be ORd together and all made effective at the same time, but + it will be impossible to determine a specific failure if invoked this + way. Control is returned on the first error, and no provision is made + to "undo" previously set options if an error occurs. +*/ +static int uta_ring_config( void* vr, int options ) { + ring_t* r; + + if( (r = (ring_t*) vr) == NULL ) { + errno = EINVAL; + return 0; + } + + if( options & RING_WLOCK ) { + if( r->wgate == NULL ) { // don't realloc + r->wgate = (pthread_mutex_t *) malloc( sizeof( *r->wgate ) ); + if( r->wgate == NULL ) { + return 0; + } + + pthread_mutex_init( r->wgate, NULL ); + } + } + + if( options & RING_RLOCK ) { + if( r->rgate == NULL ) { // don't realloc + r->rgate = (pthread_mutex_t *) malloc( sizeof( *r->rgate ) ); + if( r->rgate == NULL ) { + return 0; + } + + pthread_mutex_init( r->rgate, NULL ); + } + } + + return 1; +} + /* Ditch the ring. The caller is responsible for extracting any remaining pointers and freeing them as needed. @@ -108,6 +156,11 @@ static void uta_ring_free( void* vr ) { /* Pull the next data pointer from the ring; null if there isn't anything to be pulled. + + If the read lock exists for the ring, then this will BLOCK until + it gets the lock. There is always a chance that once the lock + is obtained that the ring is empty, so the caller MUST handle + a nil pointer as the return. */ static inline void* uta_ring_extract( void* vr ) { ring_t* r; @@ -122,10 +175,17 @@ static inline void* uta_ring_extract( void* vr ) { r = (ring_t*) vr; } - if( r->tail == r->head ) { // empty ring + if( r->tail == r->head ) { // empty ring we can bail out quickly return NULL; } + if( r->rgate != NULL ) { // if lock exists we must honour it + pthread_mutex_lock( r->rgate ); + if( r->tail == r->head ) { // ensure ring didn't go empty while waiting + return NULL; + } + } + ti = r->tail; r->tail++; if( r->tail >= r->nelements ) { @@ -138,6 +198,10 @@ future -- investigate if it's possible only to set/clear when empty or going to if( r->tail == r->head ) { // if this emptied the ring, turn off ready } */ + + if( r->rgate != NULL ) { // if locked above... + pthread_mutex_unlock( r->rgate ); + } return r->data[ti]; } @@ -157,7 +221,14 @@ static inline int uta_ring_insert( void* vr, void* new_data ) { r = (ring_t*) vr; } + if( r->wgate != NULL ) { // if lock exists we must honour it + pthread_mutex_lock( r->wgate ); + } + if( r->head+1 == r->tail || (r->head+1 >= r->nelements && !r->tail) ) { // ring is full + if( r->wgate != NULL ) { // ensure released if needed + pthread_mutex_unlock( r->wgate ); + } return 0; } @@ -174,6 +245,9 @@ future -- investigate if it's possible only to set/clear when empty or going to r->head = 0; } + if( r->wgate != NULL ) { // if lock exists we must unlock before going + pthread_mutex_unlock( r->wgate ); + } return 1; } diff --git a/src/rmr/si/src/rmr_si.c b/src/rmr/si/src/rmr_si.c index 6c08e83..c8c3b4a 100644 --- a/src/rmr/si/src/rmr_si.c +++ b/src/rmr/si/src/rmr_si.c @@ -570,9 +570,16 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { ctx->max_ibm = max_msg_size; // default to user supplied message size ctx->mring = uta_mk_ring( 4096 ); // message ring is always on for si + ctx->zcb_mring = uta_mk_ring( 128 ); // zero copy buffer mbuf ring to reduce malloc/free calls + + if( ! (flags & RMRFL_NOLOCK) ) { // user did not specifically ask that it be off; turn it on + uta_ring_config( ctx->mring, RING_RLOCK ); // concurrent rcv calls require read lock + uta_ring_config( ctx->zcb_mring, RING_WLOCK ); // concurrent free calls from userland require write lock + } else { + fprintf( stderr, "[INFO] receive ring locking disabled by user application\n" ); + } init_mtcall( ctx ); // set up call chutes - ctx->zcb_mring = uta_mk_ring( 128 ); // zero copy buffer mbuf ring ctx->max_plen = RMR_MAX_RCV_BYTES; // max user payload lengh if( max_msg_size > 0 ) { diff --git a/test/ring_static_test.c b/test/ring_static_test.c index bd2b428..b194928 100644 --- a/test/ring_static_test.c +++ b/test/ring_static_test.c @@ -78,6 +78,8 @@ static int ring_test( ) { int data[20]; int* dp; int size = 18; + int pfd = -1; // pollable file descriptor for the ring + int errors = 0; r = uta_mk_ring( 0 ); // should return nil if( r != NULL ) { @@ -96,6 +98,19 @@ static int ring_test( ) { return 1; } + pfd = uta_ring_getpfd( r ); // get pollable file descriptor + if( pfd < 0 ) { + fprintf( stderr, " expected a pollable file descriptor >= 0, but got: %d\n", pfd ); + errors++; + } + + pfd = uta_ring_config( r, 0x03 ); // turn on locking for reads and writes + if( pfd != 1 ) { + fprintf( stderr, " config attempt to enable locking failed\n" ); + errors++; + } + + for( i = 0; i < 20; i++ ) { // test to ensure it reports full when head/tail start at 0 data[i] = i; if( ! uta_ring_insert( r, &data[i] ) ) { @@ -161,5 +176,5 @@ static int ring_test( ) { } fprintf( stderr, " all ring tests pass\n" ); - return 0; + return errors; } -- 2.16.6