X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fnng%2Fsrc%2Frtable_nng_static.c;h=fefa63de4d935a05af2df8ffc81e6d584cfb79ed;hb=3925774e739509bb51df2c81addb3ab742c1801f;hp=6122f6930fdab4e1599bca91ec85d0ef5eff8c2f;hpb=68c1ab2191d9959fde0bd275a560f7c9cf6df485;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/nng/src/rtable_nng_static.c b/src/rmr/nng/src/rtable_nng_static.c index 6122f69..fefa63d 100644 --- a/src/rmr/nng/src/rtable_nng_static.c +++ b/src/rmr/nng/src/rtable_nng_static.c @@ -53,12 +53,29 @@ Target assumed to be address:port. The new socket is returned via the user supplied pointer so that a success/fail code is returned directly. Return value is 0 (false) on failure, 1 (true) on success. + + In order to support multi-threaded user applications we must hold a lock before + we attempt to create a dialer and connect. NNG is thread safe, but we can + get things into a bad state if we allow a collision here. The lock grab + only happens on the intial session setup. */ -static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer ) { +//static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer, pthread_mutex* gate ) { +static int uta_link2( endpoint_t* ep ) { + char* target; + nng_socket* nn_sock; + nng_dialer* dialer; char conn_info[NNG_MAXADDRLEN]; // string to give to nano to make the connection char* addr; int state = FALSE; + if( ep == NULL ) { + return FALSE; + } + + target = ep->addr; + nn_sock = &ep->nn_sock; + dialer = &ep->dialer; + if( target == NULL || (addr = strchr( target, ':' )) == NULL ) { // bad address:port fprintf( stderr, "rmr: link2: unable to create link: bad target: %s\n", target == NULL ? "" : target ); return FALSE; @@ -69,13 +86,22 @@ static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer ) { return FALSE; } + pthread_mutex_lock( &ep->gate ); // grab the lock + if( ep->open ) { + pthread_mutex_unlock( &ep->gate ); + return TRUE; + } + + if( nng_push0_open( nn_sock ) != 0 ) { // and assign the mode + pthread_mutex_unlock( &ep->gate ); fprintf( stderr, "[CRI] rmr: link2: unable to initialise nanomsg push socket to: %s\n", target ); return FALSE; } snprintf( conn_info, sizeof( conn_info ), "tcp://%s", target ); if( (state = nng_dialer_create( dialer, *nn_sock, conn_info )) != 0 ) { + pthread_mutex_unlock( &ep->gate ); fprintf( stderr, "[WARN] rmr: link2: unable to create dialer for link to target: %s: %d\n", target, errno ); nng_close( *nn_sock ); return FALSE; @@ -85,6 +111,7 @@ static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer ) { nng_dialer_setopt_ms( *dialer, NNG_OPT_RECONNMINT, 100 ); // start retry 100m after last failure with 2s cap if( (state = nng_dialer_start( *dialer, NO_FLAGS )) != 0 ) { // can fail immediatly (unlike nanomsg) + pthread_mutex_unlock( &ep->gate ); fprintf( stderr, "[WARN] rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) ); nng_close( *nn_sock ); return FALSE; @@ -92,6 +119,8 @@ static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer ) { if( DEBUG ) fprintf( stderr, "[INFO] rmr_link2l: dial was successful: %s\n", target ); + ep->open = TRUE; // must set before release + pthread_mutex_unlock( &ep->gate ); return TRUE; } @@ -108,7 +137,7 @@ static int rt_link2_ep( endpoint_t* ep ) { return TRUE; } - ep->open = uta_link2( ep->addr, &ep->nn_sock, &ep->dialer ); + uta_link2( ep ); return ep->open; } @@ -196,9 +225,9 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s if( ! ep->open ) { // not open -- connect now if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s session not started... starting\n", ep_name ); if( ep->addr == NULL ) { // name didn't resolve before, try again - ep->addr = uta_h2ip( ep->name ); + ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup } - if( uta_link2( ep->addr, &ep->nn_sock, &ep->dialer ) ) { // find entry in table and create link + if( uta_link2( ep ) ) { // find entry in table and create link state = TRUE; ep->open = TRUE; *nn_sock = ep->nn_sock; // pass socket back to caller @@ -230,6 +259,13 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s We return the index+1 from the round robin table on success so that we can verify during test that different entries are being seleted; we cannot depend on the nng socket being different as we could with nano. + + NOTE: The round robin selection index increment might collide with other + threads if multiple threads are attempting to send to the same round + robin group; the consequences are small and avoid locking. The only side + effect is either sending two messages in a row to, or skipping, an endpoint. + Both of these, in the grand scheme of things, is minor compared to the + overhead of grabbing a lock on each call. */ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, nng_socket* nn_sock ) { rtable_ent_t* rte; // matching rt entry @@ -237,9 +273,10 @@ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, int state = FALSE; // processing state int dummy; rrgroup_t* rrg; + int idx; - if( ! more ) { // eliminate cheks each time we need to user + if( ! more ) { // eliminate cheks each time we need to use more = &dummy; } @@ -287,22 +324,21 @@ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, break; default: // need to pick one and adjust rr counts - ep = rrg->epts[rrg->ep_idx++]; // select next endpoint + + idx = rrg->ep_idx++ % rrg->nused; // see note above + ep = rrg->epts[idx]; // select next endpoint //if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx ); - if( rrg->ep_idx >= rrg->nused ) { - rrg->ep_idx = 0; - } - state = rrg->ep_idx+1; + state = idx + 1; // unit test checks to see that we're cycling through, so must not just be TRUE break; } if( state ) { // end point selected, open if not, get socket either way if( ! ep->open ) { // not connected if( ep->addr == NULL ) { // name didn't resolve before, try again - ep->addr = uta_h2ip( ep->name ); + ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup } - if( uta_link2( ep->addr, &ep->nn_sock, &ep->dialer ) ) { // find entry in table and create link + if( uta_link2( ep ) ) { // find entry in table and create link ep->open = TRUE; *nn_sock = ep->nn_sock; // pass socket back to caller } else {