X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fnng%2Fsrc%2Frtable_nng_static.c;h=4f413be6b64a297b9ed1a16bcdba9e86cab11a76;hb=e21dbee1d382e73d1897f96c0bd450f216389b74;hp=6122f6930fdab4e1599bca91ec85d0ef5eff8c2f;hpb=68c1ab2191d9959fde0bd275a560f7c9cf6df485;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/nng/src/rtable_nng_static.c b/src/rmr/nng/src/rtable_nng_static.c index 6122f69..4f413be 100644 --- a/src/rmr/nng/src/rtable_nng_static.c +++ b/src/rmr/nng/src/rtable_nng_static.c @@ -53,11 +53,40 @@ Target assumed to be address:port. The new socket is returned via the user supplied pointer so that a success/fail code is returned directly. Return value is 0 (false) on failure, 1 (true) on success. + + In order to support multi-threaded user applications we must hold a lock before + we attempt to create a dialer and connect. NNG is thread safe, but we can + get things into a bad state if we allow a collision here. The lock grab + only happens on the intial session setup. */ -static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer ) { +//static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer, pthread_mutex* gate ) { +static int uta_link2( endpoint_t* ep ) { + static int flags = -1; + + char* target; + nng_socket* nn_sock; + nng_dialer* dialer; char conn_info[NNG_MAXADDRLEN]; // string to give to nano to make the connection char* addr; int state = FALSE; + char* tok; + + if( ep == NULL ) { + return FALSE; + } + + if( flags < 0 ) { + tok = getenv( "RMR_ASYNC_CONN" ); + if( tok == NULL || *tok == '1' ) { + flags = NNG_FLAG_NONBLOCK; // start dialer asynch + } else { + flags = NO_FLAGS; + } + } + + target = ep->name; // always give name to transport so chaning dest IP does not break reconnect + nn_sock = &ep->nn_sock; + dialer = &ep->dialer; if( target == NULL || (addr = strchr( target, ':' )) == NULL ) { // bad address:port fprintf( stderr, "rmr: link2: unable to create link: bad target: %s\n", target == NULL ? "" : target ); @@ -69,14 +98,23 @@ static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer ) { return FALSE; } + pthread_mutex_lock( &ep->gate ); // grab the lock + if( ep->open ) { + pthread_mutex_unlock( &ep->gate ); + return TRUE; + } + + if( nng_push0_open( nn_sock ) != 0 ) { // and assign the mode + pthread_mutex_unlock( &ep->gate ); fprintf( stderr, "[CRI] rmr: link2: unable to initialise nanomsg push socket to: %s\n", target ); return FALSE; } snprintf( conn_info, sizeof( conn_info ), "tcp://%s", target ); if( (state = nng_dialer_create( dialer, *nn_sock, conn_info )) != 0 ) { - fprintf( stderr, "[WARN] rmr: link2: unable to create dialer for link to target: %s: %d\n", target, errno ); + pthread_mutex_unlock( &ep->gate ); + fprintf( stderr, "[WRN] rmr: link2: unable to create dialer for link to target: %s: %d\n", target, errno ); nng_close( *nn_sock ); return FALSE; } @@ -84,14 +122,17 @@ static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer ) { nng_dialer_setopt_ms( *dialer, NNG_OPT_RECONNMAXT, 2000 ); // cap backoff on retries to reasonable amount (2s) nng_dialer_setopt_ms( *dialer, NNG_OPT_RECONNMINT, 100 ); // start retry 100m after last failure with 2s cap - if( (state = nng_dialer_start( *dialer, NO_FLAGS )) != 0 ) { // can fail immediatly (unlike nanomsg) - fprintf( stderr, "[WARN] rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) ); + if( (state = nng_dialer_start( *dialer, flags )) != 0 ) { // can fail immediatly (unlike nanomsg) + pthread_mutex_unlock( &ep->gate ); + fprintf( stderr, "[WRN] rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) ); nng_close( *nn_sock ); return FALSE; } if( DEBUG ) fprintf( stderr, "[INFO] rmr_link2l: dial was successful: %s\n", target ); + ep->open = TRUE; // must set before release + pthread_mutex_unlock( &ep->gate ); return TRUE; } @@ -108,7 +149,7 @@ static int rt_link2_ep( endpoint_t* ep ) { return TRUE; } - ep->open = uta_link2( ep->addr, &ep->nn_sock, &ep->dialer ); + uta_link2( ep ); return ep->open; } @@ -125,24 +166,24 @@ extern endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n rrgroup_t* rrg; // pointer at group to update if( ! rte || ! rt ) { - fprintf( stderr, "[WARN] uda_add_ep didn't get a valid rt and/or rte pointer\n" ); + fprintf( stderr, "[WRN] uda_add_ep didn't get a valid rt and/or rte pointer\n" ); return NULL; } if( rte->nrrgroups <= group ) { - fprintf( stderr, "[WARN] uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups ); + fprintf( stderr, "[WRN] uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups ); return NULL; } if( (rrg = rte->rrgroups[group]) == NULL ) { if( (rrg = (rrgroup_t *) malloc( sizeof( *rrg ) )) == NULL ) { - fprintf( stderr, "[WARN] rmr_add_ep: malloc failed for round robin group: group=%d\n", group ); + fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for round robin group: group=%d\n", group ); return NULL; } memset( rrg, 0, sizeof( *rrg ) ); if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t ) * MAX_EP_GROUP )) == NULL ) { - fprintf( stderr, "[WARN] rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group ); + fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group ); return NULL; } memset( rrg->epts, 0, sizeof( endpoint_t ) * MAX_EP_GROUP ); @@ -159,7 +200,7 @@ extern endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n if( rrg != NULL ) { if( rrg->nused >= rrg->nendpts ) { // future: reallocate - fprintf( stderr, "[WARN] endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group ); + fprintf( stderr, "[WRN] endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group ); return NULL; } @@ -196,9 +237,9 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s if( ! ep->open ) { // not open -- connect now if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s session not started... starting\n", ep_name ); if( ep->addr == NULL ) { // name didn't resolve before, try again - ep->addr = uta_h2ip( ep->name ); + ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup } - if( uta_link2( ep->addr, &ep->nn_sock, &ep->dialer ) ) { // find entry in table and create link + if( uta_link2( ep ) ) { // find entry in table and create link state = TRUE; ep->open = TRUE; *nn_sock = ep->nn_sock; // pass socket back to caller @@ -215,7 +256,7 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s /* Make a round robin selection within a round robin group for a route table entry. Returns the nanomsg socket if there is a rte for the message - type, and group is defined. Socket is returned via pointer in the parm + key, and group is defined. Socket is returned via pointer in the parm list (nn_sock). The group is the group number to select from. @@ -230,16 +271,24 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s We return the index+1 from the round robin table on success so that we can verify during test that different entries are being seleted; we cannot depend on the nng socket being different as we could with nano. + + NOTE: The round robin selection index increment might collide with other + threads if multiple threads are attempting to send to the same round + robin group; the consequences are small and avoid locking. The only side + effect is either sending two messages in a row to, or skipping, an endpoint. + Both of these, in the grand scheme of things, is minor compared to the + overhead of grabbing a lock on each call. */ -static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, nng_socket* nn_sock ) { - rtable_ent_t* rte; // matching rt entry +static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, nng_socket* nn_sock ) { + //rtable_ent_t* rte; // matching rt entry endpoint_t* ep; // seected end point int state = FALSE; // processing state int dummy; rrgroup_t* rrg; + int idx; - if( ! more ) { // eliminate cheks each time we need to user + if( ! more ) { // eliminate cheks each time we need to use more = &dummy; } @@ -249,25 +298,19 @@ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, return FALSE; } - if( rt == NULL ) { + if( rte == NULL ) { *more = 0; return FALSE; } - if( (rte = rmr_sym_pull( rt->hash, key )) == NULL ) { - *more = 0; - //if( DEBUG ) fprintf( stderr, ">>>> rte not found for type = %lu\n", key ); - return FALSE; - } - if( group < 0 || group >= rte->nrrgroups ) { - //if( DEBUG ) fprintf( stderr, ">>>> group out of range: key=%lu group=%d max=%d\n", key, group, rte->nrrgroups ); + //if( DEBUG ) fprintf( stderr, ">>>> group out of range: group=%d max=%d\n", group, rte->nrrgroups ); *more = 0; return FALSE; } if( (rrg = rte->rrgroups[group]) == NULL ) { - //if( DEBUG ) fprintf( stderr, ">>>> rrg not found for type key=%lu\n", key ); + //if( DEBUG ) fprintf( stderr, ">>>> rrg not found for group \n", group ); *more = 0; // groups are inserted contig, so nothing should be after a nil pointer return FALSE; } @@ -279,30 +322,28 @@ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, //if( DEBUG ) fprintf( stderr, ">>>> nothing allocated for the rrg\n" ); return FALSE; - case 1: // exactly one, no rr to deal with and more is not possible even if fanout > 1 - //*nn_sock = rrg->epts[0]->nn_sock; + case 1: // exactly one, no rr to deal with ep = rrg->epts[0]; //if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with one choice in group \n" ); state = TRUE; break; default: // need to pick one and adjust rr counts - ep = rrg->epts[rrg->ep_idx++]; // select next endpoint + + idx = rrg->ep_idx++ % rrg->nused; // see note above + ep = rrg->epts[idx]; // select next endpoint //if( DEBUG ) fprintf( stderr, ">>>> _rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx ); - if( rrg->ep_idx >= rrg->nused ) { - rrg->ep_idx = 0; - } - state = rrg->ep_idx+1; + state = idx + 1; // unit test checks to see that we're cycling through, so must not just be TRUE break; } if( state ) { // end point selected, open if not, get socket either way if( ! ep->open ) { // not connected if( ep->addr == NULL ) { // name didn't resolve before, try again - ep->addr = uta_h2ip( ep->name ); + ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup } - if( uta_link2( ep->addr, &ep->nn_sock, &ep->dialer ) ) { // find entry in table and create link + if( uta_link2( ep ) ) { // find entry in table and create link ep->open = TRUE; *nn_sock = ep->nn_sock; // pass socket back to caller } else { @@ -317,4 +358,30 @@ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, return state; } +/* + Finds the rtable entry which matches the key. Returns a nil pointer if + no entry is found. If try_alternate is set, then we will attempt + to find the entry with a key based only on the message type. +*/ +static inline rtable_ent_t* uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt ) { + uint64_t key; // key is sub id and mtype banged together + rtable_ent_t* rte; // the entry we found + + if( rt == NULL || rt->hash == NULL ) { + return NULL; + } + + key = build_rt_key( sid, mtype ); // first try with a 'full' key + if( ((rte = rmr_sym_pull( rt->hash, key )) != NULL) || ! try_alt ) { // found or not allowed to try the alternate, return what we have + return rte; + } + + if( sid != UNSET_SUBID ) { // not found, and allowed to try alternate; and the sub_id was set + key = build_rt_key( UNSET_SUBID, mtype ); // rebuild key + rte = rmr_sym_pull( rt->hash, key ); // see what we get with this + } + + return rte; +} + #endif