Enable multi-thread receiver support 09/2309/2 3.0.3
authorE. Scott Daniels <daniels@research.att.com>
Wed, 22 Jan 2020 17:31:07 +0000 (12:31 -0500)
committerE. Scott Daniels <daniels@research.att.com>
Wed, 22 Jan 2020 18:22:10 +0000 (13:22 -0500)
This change implements locking on the receive and free
message rings such that multiple user threads can concurrently
invoke rmr receive functions.

Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: If012c5699e071f1d85f604c79baf8c4e8b77e94a

CHANGES
CMakeLists.txt
src/rmr/common/include/rmr.h
src/rmr/common/include/rmr_agnostic.h
src/rmr/common/src/ring_static.c
src/rmr/si/src/rmr_si.c
test/ring_static_test.c

diff --git a/CHANGES b/CHANGES
index 976169f..f5d970d 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -2,6 +2,9 @@
 API and build change  and fix summaries. Doc correctsions
 and/or changes are not mentioned here; see the commit messages.
 
+2020 January 22; verison 3.0.3
+       Enable thread support for multiple receive threads.
+
 2020 January 21; verison 3.0.2
        Fix bug in SI95 (missing reallocate payload function).
 
index ef8308b..2714703 100644 (file)
@@ -38,7 +38,7 @@ cmake_minimum_required( VERSION 3.5 )
 
 set( major_version "3" )               # should be automatically populated from git tag later, but until CI process sets a tag we use this
 set( minor_version "0" )
-set( patch_level "2" )
+set( patch_level "3" )
 
 set( install_root "${CMAKE_INSTALL_PREFIX}" )
 set( install_inc "include/rmr" )
index e3d08d9..a9805a8 100644 (file)
@@ -47,6 +47,7 @@ extern "C" {
 #define RMRFL_MTCALL           0x02    // set up multi-threaded call support (rmr_init)
 #define RMRFL_AUTO_ALLOC       0x03    // send auto allocates a zerocopy buffer
 #define RMRFL_NAME_ONLY                0x04    // only the hostname:ip is provided as source information for rts() calls
+#define RMRFL_NOLOCK           0x08    // disable receive ring locking (user app ensures single thread or provides collision protection)
 
 #define RMR_DEF_SIZE           0               // pass as size to have msg allocation use the default msg size
 
index 86f4660..6ef3aee 100644 (file)
@@ -238,12 +238,18 @@ typedef struct {
 
 
 // --------------- ring things  -------------------------------------------------
+#define RING_NONE      0                       // no options
+#define RING_RLOCK     0x01            // create/destroy the read lock on the ring
+#define RING_WLOCK     0x02            // create/destroy the write lockk on the ring
+
 typedef struct ring {
        uint16_t head;                          // index of the head of the ring (insert point)
        uint16_t tail;                          // index of the tail (extract point)
        uint16_t nelements;                     // number of elements in the ring
        void**  data;                           // the ring data (pointers to blobs of stuff)
        int             pfd;                            // event fd for the ring for epoll
+       pthread_mutex_t*        rgate;  // read lock if used
+       pthread_mutex_t*        wgate;  // write lock if used
 } ring_t;
 
 
@@ -273,6 +279,7 @@ static char* get_default_ip( if_addrs_t* iplist );
 
 // --- message ring --------------------------
 static void* uta_mk_ring( int size );
+static int uta_ring_config( void* vr, int options );
 static void uta_ring_free( void* vr );
 static inline void* uta_ring_extract( void* vr );
 static inline int uta_ring_insert( void* vr, void* new_data );
index 93dfdb7..37bb4db 100644 (file)
@@ -62,7 +62,10 @@ static int uta_ring_getpfd( void* vr ) {
 }
 
 /*
-       Make a new ring.
+       Make a new ring. The default is to NOT create a lock; if the user 
+       wants read locking then uta_config_ring() can be used to setup the
+       mutex. (We use several rings internally and the assumption is that
+       there is no locking for these.)
 */
 static void* uta_mk_ring( int size ) {
        ring_t* r;
@@ -72,6 +75,8 @@ static void* uta_mk_ring( int size ) {
                return NULL;
        }
 
+       r->rgate = NULL;
+       r->wgate = NULL;
        r->head = r->tail = 0;
 
        max = (r->head - 1);
@@ -90,6 +95,49 @@ static void* uta_mk_ring( int size ) {
        return (void *) r;
 }
 
+/*
+       Allows for configuration of a ring after it has been allocated.
+       Options are RING_* options that allow for things like setting/clearing
+       read locking. Returns 0 for failure 1 on success.
+
+       Options can be ORd together and all made effective at the same time, but
+       it will be impossible to determine a specific failure if invoked this
+       way.  Control is returned on the first error, and no provision is made
+       to "undo" previously set options if an error occurs.
+*/
+static int uta_ring_config( void* vr, int options ) {
+       ring_t* r;
+
+       if( (r = (ring_t*) vr) == NULL ) {
+               errno = EINVAL;
+               return 0;
+       }
+
+       if( options & RING_WLOCK ) {
+               if( r->wgate == NULL ) {                // don't realloc
+                       r->wgate = (pthread_mutex_t *) malloc( sizeof( *r->wgate ) );
+                       if( r->wgate == NULL ) {
+                               return 0;
+                       }
+       
+                       pthread_mutex_init( r->wgate, NULL );
+               }
+       }
+
+       if( options & RING_RLOCK ) {
+               if( r->rgate == NULL ) {                // don't realloc
+                       r->rgate = (pthread_mutex_t *) malloc( sizeof( *r->rgate ) );
+                       if( r->rgate == NULL ) {
+                               return 0;
+                       }
+       
+                       pthread_mutex_init( r->rgate, NULL );
+               }
+       }
+
+       return 1;
+}
+
 /*
        Ditch the ring. The caller is responsible for extracting any remaining
        pointers and freeing them as needed.
@@ -108,6 +156,11 @@ static void uta_ring_free( void* vr ) {
 /*
        Pull the next data pointer from the ring; null if there isn't
        anything to be pulled.
+
+       If the read lock exists for the ring, then this will BLOCK until
+       it gets the lock.  There is always a chance that once the lock
+       is obtained that the ring is empty, so the caller MUST handle
+       a nil pointer as the return.
 */
 static inline void* uta_ring_extract( void* vr ) {
        ring_t*         r;
@@ -122,10 +175,17 @@ static inline void* uta_ring_extract( void* vr ) {
                r = (ring_t*) vr;
        }
 
-       if( r->tail == r->head ) {                      // empty ring
+       if( r->tail == r->head ) {                                              // empty ring we can bail out quickly
                return NULL;
        }
 
+       if( r->rgate != NULL ) {                                                // if lock exists we must honour it
+               pthread_mutex_lock( r->rgate );
+               if( r->tail == r->head ) {                                      // ensure ring didn't go empty while waiting
+                       return NULL;
+               }
+       }
+
        ti = r->tail;
        r->tail++;
        if( r->tail >= r->nelements ) {
@@ -138,6 +198,10 @@ future -- investigate if it's possible only to set/clear when empty or going to
        if( r->tail == r->head ) {                                                              // if this emptied the ring, turn off ready
        }
 */
+
+       if( r->rgate != NULL ) {                                                        // if locked above...
+               pthread_mutex_unlock( r->rgate );
+       }
        return r->data[ti];
 }
 
@@ -157,7 +221,14 @@ static inline int uta_ring_insert( void* vr, void* new_data ) {
                r = (ring_t*) vr;
        }
 
+       if( r->wgate != NULL ) {                                                // if lock exists we must honour it
+               pthread_mutex_lock( r->wgate );
+       }
+
        if( r->head+1 == r->tail || (r->head+1 >= r->nelements && !r->tail) ) {         // ring is full
+               if( r->wgate != NULL ) {                                        // ensure released if needed
+                       pthread_mutex_unlock( r->wgate );
+               }
                return 0;
        }
 
@@ -174,6 +245,9 @@ future -- investigate if it's possible only to set/clear when empty or going to
                r->head = 0;
        }
 
+       if( r->wgate != NULL ) {                                                // if lock exists we must unlock before going
+               pthread_mutex_unlock( r->wgate );
+       }
        return 1;
 }
 
index 6c08e83..c8c3b4a 100644 (file)
@@ -570,9 +570,16 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        ctx->max_ibm = max_msg_size;                                    // default to user supplied message size
 
        ctx->mring = uta_mk_ring( 4096 );                               // message ring is always on for si
+       ctx->zcb_mring = uta_mk_ring( 128 );                    // zero copy buffer mbuf ring to reduce malloc/free calls
+
+       if( ! (flags & RMRFL_NOLOCK) ) {                                // user did not specifically ask that it be off; turn it on
+               uta_ring_config( ctx->mring, RING_RLOCK );                      // concurrent rcv calls require read lock
+               uta_ring_config( ctx->zcb_mring, RING_WLOCK );          // concurrent free calls from userland require write lock
+       } else {
+               fprintf( stderr, "[INFO] receive ring locking disabled by user application\n" );
+       }
        init_mtcall( ctx );                                                             // set up call chutes
 
-       ctx->zcb_mring = uta_mk_ring( 128 );                    // zero copy buffer mbuf ring
 
        ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
        if( max_msg_size > 0 ) {
index bd2b428..b194928 100644 (file)
@@ -78,6 +78,8 @@ static int ring_test( ) {
        int     data[20];
        int*    dp;
        int size = 18;
+       int     pfd = -1;                                       // pollable file descriptor for the ring
+       int     errors = 0;
 
        r = uta_mk_ring( 0 );                   // should return nil
        if( r != NULL ) {
@@ -96,6 +98,19 @@ static int ring_test( ) {
                return 1;
        }
 
+       pfd = uta_ring_getpfd( r );             // get pollable file descriptor
+       if( pfd < 0 ) {
+               fprintf( stderr, "<FAIL> expected a pollable file descriptor >= 0, but got: %d\n", pfd );
+               errors++;
+       }
+
+       pfd = uta_ring_config( r, 0x03 );               // turn on locking for reads and writes
+       if( pfd != 1 ) {
+               fprintf( stderr, "<FAIL> config attempt to enable locking failed\n" );
+               errors++;
+       }
+       
+
        for( i = 0; i < 20; i++ ) {             // test to ensure it reports full when head/tail start at 0
                data[i] = i;
                if( ! uta_ring_insert( r, &data[i] ) ) {
@@ -161,5 +176,5 @@ static int ring_test( ) {
        }
 
        fprintf( stderr, "<INFO> all ring tests pass\n" );
-       return 0;
+       return errors;
 }