fix(internal): Correct potential nil pointer err 34/234/1
authorE. Scott Daniels <daniels@research.att.com>
Fri, 31 May 2019 18:50:57 +0000 (18:50 +0000)
committerE. Scott Daniels <daniels@research.att.com>
Fri, 31 May 2019 19:03:44 +0000 (19:03 +0000)
With multiple user threads, if concurrently sending to the same
round robin group, there was a potential for a nil pointer
to be selected representing an endpoint.

Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: I1ee57ff5d17998a5d097e583d497158f19eb300b

CMakeLists.txt
src/rmr/common/include/rmr_agnostic.h
src/rmr/nanomsg/src/rtable_static.c
src/rmr/nng/src/mt_call_nng_static.c
src/rmr/nng/src/rmr_nng.c
src/rmr/nng/src/rtable_nng_static.c

index 4199147..b30c364 100644 (file)
@@ -23,7 +23,7 @@ cmake_minimum_required( VERSION 3.5 )
 
 set( major_version "1" )               # should be automatically populated from git tag later, but until CI process sets a tag we use this
 set( minor_version "0" )
-set( patch_level "26" )
+set( patch_level "27" )
 
 set( install_root "${CMAKE_INSTALL_PREFIX}" )
 set( install_lib "lib" )
index 0d4458a..c037900 100644 (file)
@@ -165,7 +165,7 @@ typedef struct {                                            // old (inflexible) v1 header
        Round robin group.
 */
 typedef struct {
-       int     ep_idx;                         // next endpoint to send to
+       uint16_t        ep_idx;         // next endpoint to send to
        int nused;                              // number of endpoints in the list
        int nendpts;                    // number allocated
        endpoint_t **epts;              // the list of endpoints that we RR over
index cfcb27e..643f00a 100644 (file)
@@ -203,6 +203,13 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name ) {
        with group 0. If more is set, the caller may increase the group number and
        invoke this function again to make a selection against that group. If there
        are no more groups, more is set to 0.
+
+       NOTE:   The round robin selection index increment might collide with other
+               threads if multiple threads are attempting to send to the same round
+               robin group; the consequences are small and avoid locking. The only side
+               effect is either sending two messages in a row to, or skipping, an endpoint.
+               Both of these, in the grand scheme of things, is minor compared to the
+               overhead of grabbing a lock on each call.
 */
 static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more ) {
        rtable_ent_t* rte;                      // matching rt entry
@@ -210,6 +217,7 @@ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more
        int nn_sock = -2;
        int dummy;
        rrgroup_t* rrg;
+       int     idx;
 
 
        if( ! more ) {                          // eliminate checks each time we need to use
@@ -252,11 +260,9 @@ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more
                        break;
 
                default:                                                                                // need to pick one and adjust rr counts
-                       ep = rrg->epts[rrg->ep_idx];
-                       nn_sock = rrg->epts[rrg->ep_idx++]->nn_sock;
-                       if( rrg->ep_idx >= rrg->nused ) {
-                               rrg->ep_idx = 0;
-                       }
+                       idx = rrg->ep_idx++ % rrg->nused;                       // see note above
+                       ep = rrg->epts[idx];
+                       nn_sock = ep->nn_sock;
                        break;
        }
 
index 2ef21f6..325b313 100644 (file)
@@ -91,13 +91,8 @@ static void* mt_receive( void* vctx ) {
                                                queue_normal( ctx, mbuf );
                                        } else {
                                                chute = &ctx->chutes[call_id];
-                                               if( memcmp( mbuf->xaction, chute->expect, RMR_MAX_XID ) == 0 ) {                // match
-                                                       chute->mbuf = mbuf;
-                                                       sem_post( &chute->barrier );
-                                               } else {
-                                                       rmr_free_msg( mbuf );
-                                                       mbuf = NULL;
-                                               }
+                                               chute->mbuf = mbuf;
+                                               sem_post( &chute->barrier );                            // the call function can vet xaction id in their own thread
                                        }
                                }
                        }
index 582cbb0..77f539e 100644 (file)
@@ -978,6 +978,14 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
                if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
                        errno = 0;
                }
+
+               if( chute->mbuf != NULL ) {                                                                             // offload receiver thread and check xaction buffer here
+                       if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
+                               rmr_free_msg( chute->mbuf );
+                               chute->mbuf = NULL;
+                               errno = 0;
+                       }
+               }
        }
 
        if( state < 0 ) {
index d903de3..9f3092a 100644 (file)
        user supplied pointer so that a success/fail code is returned directly.
        Return value is 0 (false) on failure, 1 (true)  on success.
 
-       In order to support multi-threaded user applications we must hold a lock before 
-       we attempt to create a dialer and connect. NNG is thread safe, but we can 
+       In order to support multi-threaded user applications we must hold a lock before
+       we attempt to create a dialer and connect. NNG is thread safe, but we can
        get things into a bad state if we allow a collision here.  The lock grab
        only happens on the intial session setup.
 */
 //static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer, pthread_mutex* gate ) {
 static int uta_link2( endpoint_t* ep ) {
-       char*           target; 
-       nng_socket*     nn_sock; 
+       char*           target;
+       nng_socket*     nn_sock;
        nng_dialer*     dialer;
        char            conn_info[NNG_MAXADDRLEN];      // string to give to nano to make the connection
        char*           addr;
@@ -91,7 +91,7 @@ static int uta_link2( endpoint_t* ep ) {
                pthread_mutex_unlock( &ep->gate );
                return TRUE;
        }
-       
+
 
        if( nng_push0_open( nn_sock ) != 0 ) {                  // and assign the mode
                pthread_mutex_unlock( &ep->gate );
@@ -259,6 +259,13 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s
        We return the index+1 from the round robin table on success so that we can verify
        during test that different entries are being seleted; we cannot depend on the nng
        socket being different as we could with nano.
+
+       NOTE:   The round robin selection index increment might collide with other
+               threads if multiple threads are attempting to send to the same round
+               robin group; the consequences are small and avoid locking. The only side
+               effect is either sending two messages in a row to, or skipping, an endpoint.
+               Both of these, in the grand scheme of things, is minor compared to the
+               overhead of grabbing a lock on each call.
 */
 static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, nng_socket* nn_sock ) {
        rtable_ent_t* rte;                      // matching rt entry
@@ -266,9 +273,10 @@ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more,
        int  state = FALSE;                     // processing state
        int dummy;
        rrgroup_t* rrg;
+       int     idx;
 
 
-       if( ! more ) {                          // eliminate cheks each time we need to user
+       if( ! more ) {                          // eliminate cheks each time we need to use
                more = &dummy;
        }
 
@@ -316,12 +324,11 @@ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more,
                        break;
 
                default:                                                                                // need to pick one and adjust rr counts
-                       ep = rrg->epts[rrg->ep_idx++];                          // select next endpoint
+
+                       idx = rrg->ep_idx++ % rrg->nused;                       // see note above
+                       ep = rrg->epts[idx];                                            // select next endpoint
                        //if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx );
-                       if( rrg->ep_idx >= rrg->nused ) {
-                               rrg->ep_idx = 0;
-                       }
-                       state = rrg->ep_idx+1;
+                       state = idx + 1;                                                        // unit test checks to see that we're cycling through, so must not just be TRUE
                        break;
        }