From a7193022969bafe1ade8ee5032f1f41d79018404 Mon Sep 17 00:00:00 2001 From: "E. Scott Daniels" Date: Fri, 31 May 2019 18:50:57 +0000 Subject: [PATCH] fix(internal): Correct potential nil pointer err With multiple user threads, if concurrently sending to the same round robin group, there was a potential for a nil pointer to be selected representing an endpoint. Signed-off-by: E. Scott Daniels Change-Id: I1ee57ff5d17998a5d097e583d497158f19eb300b --- CMakeLists.txt | 2 +- src/rmr/common/include/rmr_agnostic.h | 2 +- src/rmr/nanomsg/src/rtable_static.c | 16 +++++++++++----- src/rmr/nng/src/mt_call_nng_static.c | 9 ++------- src/rmr/nng/src/rmr_nng.c | 8 ++++++++ src/rmr/nng/src/rtable_nng_static.c | 29 ++++++++++++++++++----------- 6 files changed, 41 insertions(+), 25 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 4199147..b30c364 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -23,7 +23,7 @@ cmake_minimum_required( VERSION 3.5 ) set( major_version "1" ) # should be automatically populated from git tag later, but until CI process sets a tag we use this set( minor_version "0" ) -set( patch_level "26" ) +set( patch_level "27" ) set( install_root "${CMAKE_INSTALL_PREFIX}" ) set( install_lib "lib" ) diff --git a/src/rmr/common/include/rmr_agnostic.h b/src/rmr/common/include/rmr_agnostic.h index 0d4458a..c037900 100644 --- a/src/rmr/common/include/rmr_agnostic.h +++ b/src/rmr/common/include/rmr_agnostic.h @@ -165,7 +165,7 @@ typedef struct { // old (inflexible) v1 header Round robin group. */ typedef struct { - int ep_idx; // next endpoint to send to + uint16_t ep_idx; // next endpoint to send to int nused; // number of endpoints in the list int nendpts; // number allocated endpoint_t **epts; // the list of endpoints that we RR over diff --git a/src/rmr/nanomsg/src/rtable_static.c b/src/rmr/nanomsg/src/rtable_static.c index cfcb27e..643f00a 100644 --- a/src/rmr/nanomsg/src/rtable_static.c +++ b/src/rmr/nanomsg/src/rtable_static.c @@ -203,6 +203,13 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name ) { with group 0. If more is set, the caller may increase the group number and invoke this function again to make a selection against that group. If there are no more groups, more is set to 0. + + NOTE: The round robin selection index increment might collide with other + threads if multiple threads are attempting to send to the same round + robin group; the consequences are small and avoid locking. The only side + effect is either sending two messages in a row to, or skipping, an endpoint. + Both of these, in the grand scheme of things, is minor compared to the + overhead of grabbing a lock on each call. */ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more ) { rtable_ent_t* rte; // matching rt entry @@ -210,6 +217,7 @@ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more int nn_sock = -2; int dummy; rrgroup_t* rrg; + int idx; if( ! more ) { // eliminate checks each time we need to use @@ -252,11 +260,9 @@ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more break; default: // need to pick one and adjust rr counts - ep = rrg->epts[rrg->ep_idx]; - nn_sock = rrg->epts[rrg->ep_idx++]->nn_sock; - if( rrg->ep_idx >= rrg->nused ) { - rrg->ep_idx = 0; - } + idx = rrg->ep_idx++ % rrg->nused; // see note above + ep = rrg->epts[idx]; + nn_sock = ep->nn_sock; break; } diff --git a/src/rmr/nng/src/mt_call_nng_static.c b/src/rmr/nng/src/mt_call_nng_static.c index 2ef21f6..325b313 100644 --- a/src/rmr/nng/src/mt_call_nng_static.c +++ b/src/rmr/nng/src/mt_call_nng_static.c @@ -91,13 +91,8 @@ static void* mt_receive( void* vctx ) { queue_normal( ctx, mbuf ); } else { chute = &ctx->chutes[call_id]; - if( memcmp( mbuf->xaction, chute->expect, RMR_MAX_XID ) == 0 ) { // match - chute->mbuf = mbuf; - sem_post( &chute->barrier ); - } else { - rmr_free_msg( mbuf ); - mbuf = NULL; - } + chute->mbuf = mbuf; + sem_post( &chute->barrier ); // the call function can vet xaction id in their own thread } } } diff --git a/src/rmr/nng/src/rmr_nng.c b/src/rmr/nng/src/rmr_nng.c index 582cbb0..77f539e 100644 --- a/src/rmr/nng/src/rmr_nng.c +++ b/src/rmr/nng/src/rmr_nng.c @@ -978,6 +978,14 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m if( state < 0 && errno == EINTR ) { // interrupted go back and wait; all other errors cause exit errno = 0; } + + if( chute->mbuf != NULL ) { // offload receiver thread and check xaction buffer here + if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) { + rmr_free_msg( chute->mbuf ); + chute->mbuf = NULL; + errno = 0; + } + } } if( state < 0 ) { diff --git a/src/rmr/nng/src/rtable_nng_static.c b/src/rmr/nng/src/rtable_nng_static.c index d903de3..9f3092a 100644 --- a/src/rmr/nng/src/rtable_nng_static.c +++ b/src/rmr/nng/src/rtable_nng_static.c @@ -54,15 +54,15 @@ user supplied pointer so that a success/fail code is returned directly. Return value is 0 (false) on failure, 1 (true) on success. - In order to support multi-threaded user applications we must hold a lock before - we attempt to create a dialer and connect. NNG is thread safe, but we can + In order to support multi-threaded user applications we must hold a lock before + we attempt to create a dialer and connect. NNG is thread safe, but we can get things into a bad state if we allow a collision here. The lock grab only happens on the intial session setup. */ //static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer, pthread_mutex* gate ) { static int uta_link2( endpoint_t* ep ) { - char* target; - nng_socket* nn_sock; + char* target; + nng_socket* nn_sock; nng_dialer* dialer; char conn_info[NNG_MAXADDRLEN]; // string to give to nano to make the connection char* addr; @@ -91,7 +91,7 @@ static int uta_link2( endpoint_t* ep ) { pthread_mutex_unlock( &ep->gate ); return TRUE; } - + if( nng_push0_open( nn_sock ) != 0 ) { // and assign the mode pthread_mutex_unlock( &ep->gate ); @@ -259,6 +259,13 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s We return the index+1 from the round robin table on success so that we can verify during test that different entries are being seleted; we cannot depend on the nng socket being different as we could with nano. + + NOTE: The round robin selection index increment might collide with other + threads if multiple threads are attempting to send to the same round + robin group; the consequences are small and avoid locking. The only side + effect is either sending two messages in a row to, or skipping, an endpoint. + Both of these, in the grand scheme of things, is minor compared to the + overhead of grabbing a lock on each call. */ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, nng_socket* nn_sock ) { rtable_ent_t* rte; // matching rt entry @@ -266,9 +273,10 @@ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, int state = FALSE; // processing state int dummy; rrgroup_t* rrg; + int idx; - if( ! more ) { // eliminate cheks each time we need to user + if( ! more ) { // eliminate cheks each time we need to use more = &dummy; } @@ -316,12 +324,11 @@ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, break; default: // need to pick one and adjust rr counts - ep = rrg->epts[rrg->ep_idx++]; // select next endpoint + + idx = rrg->ep_idx++ % rrg->nused; // see note above + ep = rrg->epts[idx]; // select next endpoint //if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx ); - if( rrg->ep_idx >= rrg->nused ) { - rrg->ep_idx = 0; - } - state = rrg->ep_idx+1; + state = idx + 1; // unit test checks to see that we're cycling through, so must not just be TRUE break; } -- 2.16.6