fix(internal): Correct potential nil pointer err 34/234/1
authorE. Scott Daniels <daniels@research.att.com>
Fri, 31 May 2019 18:50:57 +0000 (18:50 +0000)
committerE. Scott Daniels <daniels@research.att.com>
Fri, 31 May 2019 19:03:44 +0000 (19:03 +0000)
With multiple user threads, if concurrently sending to the same
round robin group, there was a potential for a nil pointer
to be selected representing an endpoint.

Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: I1ee57ff5d17998a5d097e583d497158f19eb300b

CMakeLists.txt
src/rmr/common/include/rmr_agnostic.h
src/rmr/nanomsg/src/rtable_static.c
src/rmr/nng/src/mt_call_nng_static.c
src/rmr/nng/src/rmr_nng.c
src/rmr/nng/src/rtable_nng_static.c

index 4199147..b30c364 100644 (file)
@@ -23,7 +23,7 @@ cmake_minimum_required( VERSION 3.5 )
 
 set( major_version "1" )               # should be automatically populated from git tag later, but until CI process sets a tag we use this
 set( minor_version "0" )
 
 set( major_version "1" )               # should be automatically populated from git tag later, but until CI process sets a tag we use this
 set( minor_version "0" )
-set( patch_level "26" )
+set( patch_level "27" )
 
 set( install_root "${CMAKE_INSTALL_PREFIX}" )
 set( install_lib "lib" )
 
 set( install_root "${CMAKE_INSTALL_PREFIX}" )
 set( install_lib "lib" )
index 0d4458a..c037900 100644 (file)
@@ -165,7 +165,7 @@ typedef struct {                                            // old (inflexible) v1 header
        Round robin group.
 */
 typedef struct {
        Round robin group.
 */
 typedef struct {
-       int     ep_idx;                         // next endpoint to send to
+       uint16_t        ep_idx;         // next endpoint to send to
        int nused;                              // number of endpoints in the list
        int nendpts;                    // number allocated
        endpoint_t **epts;              // the list of endpoints that we RR over
        int nused;                              // number of endpoints in the list
        int nendpts;                    // number allocated
        endpoint_t **epts;              // the list of endpoints that we RR over
index cfcb27e..643f00a 100644 (file)
@@ -203,6 +203,13 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name ) {
        with group 0. If more is set, the caller may increase the group number and
        invoke this function again to make a selection against that group. If there
        are no more groups, more is set to 0.
        with group 0. If more is set, the caller may increase the group number and
        invoke this function again to make a selection against that group. If there
        are no more groups, more is set to 0.
+
+       NOTE:   The round robin selection index increment might collide with other
+               threads if multiple threads are attempting to send to the same round
+               robin group; the consequences are small and avoid locking. The only side
+               effect is either sending two messages in a row to, or skipping, an endpoint.
+               Both of these, in the grand scheme of things, is minor compared to the
+               overhead of grabbing a lock on each call.
 */
 static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more ) {
        rtable_ent_t* rte;                      // matching rt entry
 */
 static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more ) {
        rtable_ent_t* rte;                      // matching rt entry
@@ -210,6 +217,7 @@ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more
        int nn_sock = -2;
        int dummy;
        rrgroup_t* rrg;
        int nn_sock = -2;
        int dummy;
        rrgroup_t* rrg;
+       int     idx;
 
 
        if( ! more ) {                          // eliminate checks each time we need to use
 
 
        if( ! more ) {                          // eliminate checks each time we need to use
@@ -252,11 +260,9 @@ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more
                        break;
 
                default:                                                                                // need to pick one and adjust rr counts
                        break;
 
                default:                                                                                // need to pick one and adjust rr counts
-                       ep = rrg->epts[rrg->ep_idx];
-                       nn_sock = rrg->epts[rrg->ep_idx++]->nn_sock;
-                       if( rrg->ep_idx >= rrg->nused ) {
-                               rrg->ep_idx = 0;
-                       }
+                       idx = rrg->ep_idx++ % rrg->nused;                       // see note above
+                       ep = rrg->epts[idx];
+                       nn_sock = ep->nn_sock;
                        break;
        }
 
                        break;
        }
 
index 2ef21f6..325b313 100644 (file)
@@ -91,13 +91,8 @@ static void* mt_receive( void* vctx ) {
                                                queue_normal( ctx, mbuf );
                                        } else {
                                                chute = &ctx->chutes[call_id];
                                                queue_normal( ctx, mbuf );
                                        } else {
                                                chute = &ctx->chutes[call_id];
-                                               if( memcmp( mbuf->xaction, chute->expect, RMR_MAX_XID ) == 0 ) {                // match
-                                                       chute->mbuf = mbuf;
-                                                       sem_post( &chute->barrier );
-                                               } else {
-                                                       rmr_free_msg( mbuf );
-                                                       mbuf = NULL;
-                                               }
+                                               chute->mbuf = mbuf;
+                                               sem_post( &chute->barrier );                            // the call function can vet xaction id in their own thread
                                        }
                                }
                        }
                                        }
                                }
                        }
index 582cbb0..77f539e 100644 (file)
@@ -978,6 +978,14 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
                if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
                        errno = 0;
                }
                if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
                        errno = 0;
                }
+
+               if( chute->mbuf != NULL ) {                                                                             // offload receiver thread and check xaction buffer here
+                       if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
+                               rmr_free_msg( chute->mbuf );
+                               chute->mbuf = NULL;
+                               errno = 0;
+                       }
+               }
        }
 
        if( state < 0 ) {
        }
 
        if( state < 0 ) {
index d903de3..9f3092a 100644 (file)
        user supplied pointer so that a success/fail code is returned directly.
        Return value is 0 (false) on failure, 1 (true)  on success.
 
        user supplied pointer so that a success/fail code is returned directly.
        Return value is 0 (false) on failure, 1 (true)  on success.
 
-       In order to support multi-threaded user applications we must hold a lock before 
-       we attempt to create a dialer and connect. NNG is thread safe, but we can 
+       In order to support multi-threaded user applications we must hold a lock before
+       we attempt to create a dialer and connect. NNG is thread safe, but we can
        get things into a bad state if we allow a collision here.  The lock grab
        only happens on the intial session setup.
 */
 //static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer, pthread_mutex* gate ) {
 static int uta_link2( endpoint_t* ep ) {
        get things into a bad state if we allow a collision here.  The lock grab
        only happens on the intial session setup.
 */
 //static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer, pthread_mutex* gate ) {
 static int uta_link2( endpoint_t* ep ) {
-       char*           target; 
-       nng_socket*     nn_sock; 
+       char*           target;
+       nng_socket*     nn_sock;
        nng_dialer*     dialer;
        char            conn_info[NNG_MAXADDRLEN];      // string to give to nano to make the connection
        char*           addr;
        nng_dialer*     dialer;
        char            conn_info[NNG_MAXADDRLEN];      // string to give to nano to make the connection
        char*           addr;
@@ -91,7 +91,7 @@ static int uta_link2( endpoint_t* ep ) {
                pthread_mutex_unlock( &ep->gate );
                return TRUE;
        }
                pthread_mutex_unlock( &ep->gate );
                return TRUE;
        }
-       
+
 
        if( nng_push0_open( nn_sock ) != 0 ) {                  // and assign the mode
                pthread_mutex_unlock( &ep->gate );
 
        if( nng_push0_open( nn_sock ) != 0 ) {                  // and assign the mode
                pthread_mutex_unlock( &ep->gate );
@@ -259,6 +259,13 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s
        We return the index+1 from the round robin table on success so that we can verify
        during test that different entries are being seleted; we cannot depend on the nng
        socket being different as we could with nano.
        We return the index+1 from the round robin table on success so that we can verify
        during test that different entries are being seleted; we cannot depend on the nng
        socket being different as we could with nano.
+
+       NOTE:   The round robin selection index increment might collide with other
+               threads if multiple threads are attempting to send to the same round
+               robin group; the consequences are small and avoid locking. The only side
+               effect is either sending two messages in a row to, or skipping, an endpoint.
+               Both of these, in the grand scheme of things, is minor compared to the
+               overhead of grabbing a lock on each call.
 */
 static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, nng_socket* nn_sock ) {
        rtable_ent_t* rte;                      // matching rt entry
 */
 static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, nng_socket* nn_sock ) {
        rtable_ent_t* rte;                      // matching rt entry
@@ -266,9 +273,10 @@ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more,
        int  state = FALSE;                     // processing state
        int dummy;
        rrgroup_t* rrg;
        int  state = FALSE;                     // processing state
        int dummy;
        rrgroup_t* rrg;
+       int     idx;
 
 
 
 
-       if( ! more ) {                          // eliminate cheks each time we need to user
+       if( ! more ) {                          // eliminate cheks each time we need to use
                more = &dummy;
        }
 
                more = &dummy;
        }
 
@@ -316,12 +324,11 @@ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more,
                        break;
 
                default:                                                                                // need to pick one and adjust rr counts
                        break;
 
                default:                                                                                // need to pick one and adjust rr counts
-                       ep = rrg->epts[rrg->ep_idx++];                          // select next endpoint
+
+                       idx = rrg->ep_idx++ % rrg->nused;                       // see note above
+                       ep = rrg->epts[idx];                                            // select next endpoint
                        //if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx );
                        //if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx );
-                       if( rrg->ep_idx >= rrg->nused ) {
-                               rrg->ep_idx = 0;
-                       }
-                       state = rrg->ep_idx+1;
+                       state = idx + 1;                                                        // unit test checks to see that we're cycling through, so must not just be TRUE
                        break;
        }
 
                        break;
        }