X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=src%2Frmr%2Fnng%2Fsrc%2Frtable_nng_static.c;h=875cfe62eaf5f46978228fba2cb44a87c12f2687;hb=f68c2ced7de2bdfc475a9282cde91a67d83325de;hp=d903de3ed570e88f4d7b81a0587a232805fb88a8;hpb=412d53dfa2f9b5b56a448797d0dfec3b0f11f666;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/nng/src/rtable_nng_static.c b/src/rmr/nng/src/rtable_nng_static.c index d903de3..875cfe6 100644 --- a/src/rmr/nng/src/rtable_nng_static.c +++ b/src/rmr/nng/src/rtable_nng_static.c @@ -54,15 +54,15 @@ user supplied pointer so that a success/fail code is returned directly. Return value is 0 (false) on failure, 1 (true) on success. - In order to support multi-threaded user applications we must hold a lock before - we attempt to create a dialer and connect. NNG is thread safe, but we can + In order to support multi-threaded user applications we must hold a lock before + we attempt to create a dialer and connect. NNG is thread safe, but we can get things into a bad state if we allow a collision here. The lock grab only happens on the intial session setup. */ //static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer, pthread_mutex* gate ) { static int uta_link2( endpoint_t* ep ) { - char* target; - nng_socket* nn_sock; + char* target; + nng_socket* nn_sock; nng_dialer* dialer; char conn_info[NNG_MAXADDRLEN]; // string to give to nano to make the connection char* addr; @@ -91,7 +91,7 @@ static int uta_link2( endpoint_t* ep ) { pthread_mutex_unlock( &ep->gate ); return TRUE; } - + if( nng_push0_open( nn_sock ) != 0 ) { // and assign the mode pthread_mutex_unlock( &ep->gate ); @@ -102,7 +102,7 @@ static int uta_link2( endpoint_t* ep ) { snprintf( conn_info, sizeof( conn_info ), "tcp://%s", target ); if( (state = nng_dialer_create( dialer, *nn_sock, conn_info )) != 0 ) { pthread_mutex_unlock( &ep->gate ); - fprintf( stderr, "[WARN] rmr: link2: unable to create dialer for link to target: %s: %d\n", target, errno ); + fprintf( stderr, "[WRN] rmr: link2: unable to create dialer for link to target: %s: %d\n", target, errno ); nng_close( *nn_sock ); return FALSE; } @@ -112,7 +112,7 @@ static int uta_link2( endpoint_t* ep ) { if( (state = nng_dialer_start( *dialer, NO_FLAGS )) != 0 ) { // can fail immediatly (unlike nanomsg) pthread_mutex_unlock( &ep->gate ); - fprintf( stderr, "[WARN] rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) ); + fprintf( stderr, "[WRN] rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) ); nng_close( *nn_sock ); return FALSE; } @@ -154,24 +154,24 @@ extern endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n rrgroup_t* rrg; // pointer at group to update if( ! rte || ! rt ) { - fprintf( stderr, "[WARN] uda_add_ep didn't get a valid rt and/or rte pointer\n" ); + fprintf( stderr, "[WRN] uda_add_ep didn't get a valid rt and/or rte pointer\n" ); return NULL; } if( rte->nrrgroups <= group ) { - fprintf( stderr, "[WARN] uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups ); + fprintf( stderr, "[WRN] uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups ); return NULL; } if( (rrg = rte->rrgroups[group]) == NULL ) { if( (rrg = (rrgroup_t *) malloc( sizeof( *rrg ) )) == NULL ) { - fprintf( stderr, "[WARN] rmr_add_ep: malloc failed for round robin group: group=%d\n", group ); + fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for round robin group: group=%d\n", group ); return NULL; } memset( rrg, 0, sizeof( *rrg ) ); if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t ) * MAX_EP_GROUP )) == NULL ) { - fprintf( stderr, "[WARN] rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group ); + fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group ); return NULL; } memset( rrg->epts, 0, sizeof( endpoint_t ) * MAX_EP_GROUP ); @@ -188,7 +188,7 @@ extern endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n if( rrg != NULL ) { if( rrg->nused >= rrg->nendpts ) { // future: reallocate - fprintf( stderr, "[WARN] endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group ); + fprintf( stderr, "[WRN] endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group ); return NULL; } @@ -225,7 +225,7 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s if( ! ep->open ) { // not open -- connect now if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s session not started... starting\n", ep_name ); if( ep->addr == NULL ) { // name didn't resolve before, try again - ep->addr = uta_h2ip( ep->name ); + ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup } if( uta_link2( ep ) ) { // find entry in table and create link state = TRUE; @@ -259,6 +259,13 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s We return the index+1 from the round robin table on success so that we can verify during test that different entries are being seleted; we cannot depend on the nng socket being different as we could with nano. + + NOTE: The round robin selection index increment might collide with other + threads if multiple threads are attempting to send to the same round + robin group; the consequences are small and avoid locking. The only side + effect is either sending two messages in a row to, or skipping, an endpoint. + Both of these, in the grand scheme of things, is minor compared to the + overhead of grabbing a lock on each call. */ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, nng_socket* nn_sock ) { rtable_ent_t* rte; // matching rt entry @@ -266,9 +273,10 @@ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, int state = FALSE; // processing state int dummy; rrgroup_t* rrg; + int idx; - if( ! more ) { // eliminate cheks each time we need to user + if( ! more ) { // eliminate cheks each time we need to use more = &dummy; } @@ -316,19 +324,18 @@ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, break; default: // need to pick one and adjust rr counts - ep = rrg->epts[rrg->ep_idx++]; // select next endpoint + + idx = rrg->ep_idx++ % rrg->nused; // see note above + ep = rrg->epts[idx]; // select next endpoint //if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx ); - if( rrg->ep_idx >= rrg->nused ) { - rrg->ep_idx = 0; - } - state = rrg->ep_idx+1; + state = idx + 1; // unit test checks to see that we're cycling through, so must not just be TRUE break; } if( state ) { // end point selected, open if not, get socket either way if( ! ep->open ) { // not connected if( ep->addr == NULL ) { // name didn't resolve before, try again - ep->addr = uta_h2ip( ep->name ); + ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup } if( uta_link2( ep ) ) { // find entry in table and create link