With multiple user threads, if concurrently sending to the same
round robin group, there was a potential for a nil pointer
to be selected representing an endpoint.
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: I1ee57ff5d17998a5d097e583d497158f19eb300b
set( major_version "1" ) # should be automatically populated from git tag later, but until CI process sets a tag we use this
set( minor_version "0" )
set( major_version "1" ) # should be automatically populated from git tag later, but until CI process sets a tag we use this
set( minor_version "0" )
set( install_root "${CMAKE_INSTALL_PREFIX}" )
set( install_lib "lib" )
set( install_root "${CMAKE_INSTALL_PREFIX}" )
set( install_lib "lib" )
Round robin group.
*/
typedef struct {
Round robin group.
*/
typedef struct {
- int ep_idx; // next endpoint to send to
+ uint16_t ep_idx; // next endpoint to send to
int nused; // number of endpoints in the list
int nendpts; // number allocated
endpoint_t **epts; // the list of endpoints that we RR over
int nused; // number of endpoints in the list
int nendpts; // number allocated
endpoint_t **epts; // the list of endpoints that we RR over
with group 0. If more is set, the caller may increase the group number and
invoke this function again to make a selection against that group. If there
are no more groups, more is set to 0.
with group 0. If more is set, the caller may increase the group number and
invoke this function again to make a selection against that group. If there
are no more groups, more is set to 0.
+
+ NOTE: The round robin selection index increment might collide with other
+ threads if multiple threads are attempting to send to the same round
+ robin group; the consequences are small and avoid locking. The only side
+ effect is either sending two messages in a row to, or skipping, an endpoint.
+ Both of these, in the grand scheme of things, is minor compared to the
+ overhead of grabbing a lock on each call.
*/
static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more ) {
rtable_ent_t* rte; // matching rt entry
*/
static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more ) {
rtable_ent_t* rte; // matching rt entry
int nn_sock = -2;
int dummy;
rrgroup_t* rrg;
int nn_sock = -2;
int dummy;
rrgroup_t* rrg;
if( ! more ) { // eliminate checks each time we need to use
if( ! more ) { // eliminate checks each time we need to use
break;
default: // need to pick one and adjust rr counts
break;
default: // need to pick one and adjust rr counts
- ep = rrg->epts[rrg->ep_idx];
- nn_sock = rrg->epts[rrg->ep_idx++]->nn_sock;
- if( rrg->ep_idx >= rrg->nused ) {
- rrg->ep_idx = 0;
- }
+ idx = rrg->ep_idx++ % rrg->nused; // see note above
+ ep = rrg->epts[idx];
+ nn_sock = ep->nn_sock;
queue_normal( ctx, mbuf );
} else {
chute = &ctx->chutes[call_id];
queue_normal( ctx, mbuf );
} else {
chute = &ctx->chutes[call_id];
- if( memcmp( mbuf->xaction, chute->expect, RMR_MAX_XID ) == 0 ) { // match
- chute->mbuf = mbuf;
- sem_post( &chute->barrier );
- } else {
- rmr_free_msg( mbuf );
- mbuf = NULL;
- }
+ chute->mbuf = mbuf;
+ sem_post( &chute->barrier ); // the call function can vet xaction id in their own thread
if( state < 0 && errno == EINTR ) { // interrupted go back and wait; all other errors cause exit
errno = 0;
}
if( state < 0 && errno == EINTR ) { // interrupted go back and wait; all other errors cause exit
errno = 0;
}
+
+ if( chute->mbuf != NULL ) { // offload receiver thread and check xaction buffer here
+ if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
+ rmr_free_msg( chute->mbuf );
+ chute->mbuf = NULL;
+ errno = 0;
+ }
+ }
user supplied pointer so that a success/fail code is returned directly.
Return value is 0 (false) on failure, 1 (true) on success.
user supplied pointer so that a success/fail code is returned directly.
Return value is 0 (false) on failure, 1 (true) on success.
- In order to support multi-threaded user applications we must hold a lock before
- we attempt to create a dialer and connect. NNG is thread safe, but we can
+ In order to support multi-threaded user applications we must hold a lock before
+ we attempt to create a dialer and connect. NNG is thread safe, but we can
get things into a bad state if we allow a collision here. The lock grab
only happens on the intial session setup.
*/
//static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer, pthread_mutex* gate ) {
static int uta_link2( endpoint_t* ep ) {
get things into a bad state if we allow a collision here. The lock grab
only happens on the intial session setup.
*/
//static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer, pthread_mutex* gate ) {
static int uta_link2( endpoint_t* ep ) {
- char* target;
- nng_socket* nn_sock;
+ char* target;
+ nng_socket* nn_sock;
nng_dialer* dialer;
char conn_info[NNG_MAXADDRLEN]; // string to give to nano to make the connection
char* addr;
nng_dialer* dialer;
char conn_info[NNG_MAXADDRLEN]; // string to give to nano to make the connection
char* addr;
pthread_mutex_unlock( &ep->gate );
return TRUE;
}
pthread_mutex_unlock( &ep->gate );
return TRUE;
}
if( nng_push0_open( nn_sock ) != 0 ) { // and assign the mode
pthread_mutex_unlock( &ep->gate );
if( nng_push0_open( nn_sock ) != 0 ) { // and assign the mode
pthread_mutex_unlock( &ep->gate );
We return the index+1 from the round robin table on success so that we can verify
during test that different entries are being seleted; we cannot depend on the nng
socket being different as we could with nano.
We return the index+1 from the round robin table on success so that we can verify
during test that different entries are being seleted; we cannot depend on the nng
socket being different as we could with nano.
+
+ NOTE: The round robin selection index increment might collide with other
+ threads if multiple threads are attempting to send to the same round
+ robin group; the consequences are small and avoid locking. The only side
+ effect is either sending two messages in a row to, or skipping, an endpoint.
+ Both of these, in the grand scheme of things, is minor compared to the
+ overhead of grabbing a lock on each call.
*/
static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, nng_socket* nn_sock ) {
rtable_ent_t* rte; // matching rt entry
*/
static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, nng_socket* nn_sock ) {
rtable_ent_t* rte; // matching rt entry
int state = FALSE; // processing state
int dummy;
rrgroup_t* rrg;
int state = FALSE; // processing state
int dummy;
rrgroup_t* rrg;
- if( ! more ) { // eliminate cheks each time we need to user
+ if( ! more ) { // eliminate cheks each time we need to use
break;
default: // need to pick one and adjust rr counts
break;
default: // need to pick one and adjust rr counts
- ep = rrg->epts[rrg->ep_idx++]; // select next endpoint
+
+ idx = rrg->ep_idx++ % rrg->nused; // see note above
+ ep = rrg->epts[idx]; // select next endpoint
//if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx );
//if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx );
- if( rrg->ep_idx >= rrg->nused ) {
- rrg->ep_idx = 0;
- }
- state = rrg->ep_idx+1;
+ state = idx + 1; // unit test checks to see that we're cycling through, so must not just be TRUE