X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fnng%2Fsrc%2Frtable_nng_static.c;h=2cec490a0d469aac99b1b458ceaad68efbed1ea3;hb=99584a241c64d29fc20e74a4b4e01427d0f00e73;hp=da4101bccd34398bfa8e4d70032e154b4844c0aa;hpb=6511ac74cdc367a94bffeb3743624775acd52c5b;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/nng/src/rtable_nng_static.c b/src/rmr/nng/src/rtable_nng_static.c index da4101b..2cec490 100644 --- a/src/rmr/nng/src/rtable_nng_static.c +++ b/src/rmr/nng/src/rtable_nng_static.c @@ -1,4 +1,4 @@ -// : vi ts=4 sw=4 noet : +// vim: ts=4 sw=4 noet : /* ================================================================================== Copyright (c) 2019 Nokia @@ -59,20 +59,31 @@ get things into a bad state if we allow a collision here. The lock grab only happens on the intial session setup. */ -//static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer, pthread_mutex* gate ) { static int uta_link2( endpoint_t* ep ) { + static int flags = -1; + char* target; nng_socket* nn_sock; nng_dialer* dialer; char conn_info[NNG_MAXADDRLEN]; // string to give to nano to make the connection char* addr; int state = FALSE; + char* tok; if( ep == NULL ) { return FALSE; } - target = ep->addr; + if( flags < 0 ) { + tok = getenv( "RMR_ASYNC_CONN" ); + if( tok == NULL || *tok == '1' ) { + flags = NNG_FLAG_NONBLOCK; // start dialer asynch + } else { + flags = NO_FLAGS; + } + } + + target = ep->name; // always give name to transport so chaning dest IP does not break reconnect nn_sock = &ep->nn_sock; dialer = &ep->dialer; @@ -95,14 +106,14 @@ static int uta_link2( endpoint_t* ep ) { if( nng_push0_open( nn_sock ) != 0 ) { // and assign the mode pthread_mutex_unlock( &ep->gate ); - fprintf( stderr, "[CRI] rmr: link2: unable to initialise nanomsg push socket to: %s\n", target ); + rmr_vlog( RMR_VL_CRIT, "rmr: link2: unable to initialise nanomsg push socket to: %s\n", target ); return FALSE; } snprintf( conn_info, sizeof( conn_info ), "tcp://%s", target ); if( (state = nng_dialer_create( dialer, *nn_sock, conn_info )) != 0 ) { pthread_mutex_unlock( &ep->gate ); - fprintf( stderr, "[WRN] rmr: link2: unable to create dialer for link to target: %s: %d\n", target, errno ); + rmr_vlog( RMR_VL_WARN, "rmr: link2: unable to create dialer for link to target: %s: %d\n", target, errno ); nng_close( *nn_sock ); return FALSE; } @@ -110,14 +121,14 @@ static int uta_link2( endpoint_t* ep ) { nng_dialer_setopt_ms( *dialer, NNG_OPT_RECONNMAXT, 2000 ); // cap backoff on retries to reasonable amount (2s) nng_dialer_setopt_ms( *dialer, NNG_OPT_RECONNMINT, 100 ); // start retry 100m after last failure with 2s cap - if( (state = nng_dialer_start( *dialer, NO_FLAGS )) != 0 ) { // can fail immediatly (unlike nanomsg) + if( (state = nng_dialer_start( *dialer, flags )) != 0 ) { // can fail immediatly (unlike nanomsg) pthread_mutex_unlock( &ep->gate ); - fprintf( stderr, "[WRN] rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) ); + rmr_vlog( RMR_VL_WARN, "rmr: unable to create link to target: %s: %s\n", target, nng_strerror( state ) ); nng_close( *nn_sock ); return FALSE; } - if( DEBUG ) fprintf( stderr, "[INFO] rmr_link2l: dial was successful: %s\n", target ); + if( DEBUG ) rmr_vlog( RMR_VL_INFO, "rmr_link2l: dial was successful: %s\n", target ); ep->open = TRUE; // must set before release pthread_mutex_unlock( &ep->gate ); @@ -127,8 +138,10 @@ static int uta_link2( endpoint_t* ep ) { /* This provides a protocol independent mechanism for establishing the connection to an endpoint. Return is true (1) if the link was opened; false on error. + + For some flavours, the context is needed by this function, but not for nng. */ -static int rt_link2_ep( endpoint_t* ep ) { +static int rt_link2_ep( void* vctx, endpoint_t* ep ) { if( ep == NULL ) { return FALSE; } @@ -154,24 +167,24 @@ extern endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n rrgroup_t* rrg; // pointer at group to update if( ! rte || ! rt ) { - fprintf( stderr, "[WRN] uda_add_ep didn't get a valid rt and/or rte pointer\n" ); + rmr_vlog( RMR_VL_WARN, "uda_add_ep didn't get a valid rt and/or rte pointer\n" ); return NULL; } if( rte->nrrgroups <= group ) { - fprintf( stderr, "[WRN] uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups ); + rmr_vlog( RMR_VL_WARN, "uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups ); return NULL; } if( (rrg = rte->rrgroups[group]) == NULL ) { if( (rrg = (rrgroup_t *) malloc( sizeof( *rrg ) )) == NULL ) { - fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for round robin group: group=%d\n", group ); + rmr_vlog( RMR_VL_WARN, "rmr_add_ep: malloc failed for round robin group: group=%d\n", group ); return NULL; } memset( rrg, 0, sizeof( *rrg ) ); if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t ) * MAX_EP_GROUP )) == NULL ) { - fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group ); + rmr_vlog( RMR_VL_WARN, "rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group ); return NULL; } memset( rrg->epts, 0, sizeof( endpoint_t ) * MAX_EP_GROUP ); @@ -188,7 +201,7 @@ extern endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n if( rrg != NULL ) { if( rrg->nused >= rrg->nendpts ) { // future: reallocate - fprintf( stderr, "[WRN] endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group ); + rmr_vlog( RMR_VL_WARN, "endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group ); return NULL; } @@ -196,7 +209,7 @@ extern endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n rrg->nused++; } - if( DEBUG > 1 ) fprintf( stderr, "[DBUG] endpoint added to mtype/group: %d/%d %s\n", rte->mtype, group, ep_name ); + if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "endpoint added to mtype/group: %d/%d %s\n", rte->mtype, group, ep_name ); return ep; } @@ -206,7 +219,7 @@ extern endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n the user pointer passed in and sets the return value to true (1). If the endpoint cannot be found false (0) is returned. */ -static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock ) { +static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock, endpoint_t** uepp ) { endpoint_t* ep; int state = FALSE; @@ -215,15 +228,18 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s } ep = rmr_sym_get( rt->hash, ep_name, 1 ); + if( uepp != NULL ) { // caller needs endpoint too, give it back + *uepp = ep; + } if( ep == NULL ) { - if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s not in hash!\n", ep_name ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "get ep by name for %s not in hash!\n", ep_name ); if( ! ep_name || (ep = rt_ensure_ep( rt, ep_name)) == NULL ) { // create one if not in rt (support rts without entry in our table) return FALSE; } } if( ! ep->open ) { // not open -- connect now - if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s session not started... starting\n", ep_name ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "get ep by name for %s session not started... starting\n", ep_name ); if( ep->addr == NULL ) { // name didn't resolve before, try again ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup } @@ -232,7 +248,7 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s ep->open = TRUE; *nn_sock = ep->nn_sock; // pass socket back to caller } - if( DEBUG ) fprintf( stderr, "[DBUG] epsock_bn: connection state: %s %s\n", state ? "[OK]" : "[FAIL]", ep->name ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_bn: connection state: %s %s\n", state ? "[OK]" : "[FAIL]", ep->name ); } else { *nn_sock = ep->nn_sock; state = TRUE; @@ -267,8 +283,7 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s Both of these, in the grand scheme of things, is minor compared to the overhead of grabbing a lock on each call. */ -static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, nng_socket* nn_sock ) { - //rtable_ent_t* rte; // matching rt entry +static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, nng_socket* nn_sock, endpoint_t** uepp ) { endpoint_t* ep; // seected end point int state = FALSE; // processing state int dummy; @@ -325,6 +340,9 @@ static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, nng_socket* n break; } + if( uepp != NULL ) { // caller needs refernce to endpoint too + *uepp = ep; + } if( state ) { // end point selected, open if not, get socket either way if( ! ep->open ) { // not connected if( ep->addr == NULL ) { // name didn't resolve before, try again @@ -337,7 +355,7 @@ static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, nng_socket* n } else { state = FALSE; } - if( DEBUG ) fprintf( stderr, "[DBUG] epsock_rr: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" ); } else { *nn_sock = ep->nn_sock; } @@ -372,4 +390,87 @@ static inline rtable_ent_t* uta_get_rte( route_table_t *rt, int sid, int mtype, return rte; } +/* + Return a string of count information. E.g.: + : + + Caller must free the string allocated if a buffer was not provided. + + Pointer returned is to a freshly allocated string, or the user buffer + for convenience. + + If the endpoint passed is a nil pointer, then we return a nil -- caller + must check! +*/ +static inline char* get_ep_counts( endpoint_t* ep, char* ubuf, int ubuf_len ) { + char* rs; // result string + + if( ep == NULL ) { + return NULL; + } + + if( ubuf != NULL ) { + rs = ubuf; + } else { + ubuf_len = 256; + rs = malloc( sizeof( char ) * ubuf_len ); + } + + snprintf( rs, ubuf_len, "%s %lld %lld %lld", ep->name, ep->scounts[EPSC_GOOD], ep->scounts[EPSC_FAIL], ep->scounts[EPSC_TRANS] ); + + return rs; +} + +/* + Given a message, use the meid field to find the owner endpoint for the meid. + The owner ep is then used to extract the socket through which the message + is sent. This returns TRUE if we found a socket and it was written to the + nn_sock pointer; false if we didn't. + + We've been told that the meid is a string, thus we count on it being a nil + terminated set of bytes. +*/ +static int epsock_meid( route_table_t *rtable, rmr_mbuf_t* msg, nng_socket* nn_sock, endpoint_t** uepp ) { + endpoint_t* ep; // seected end point + int state = FALSE; // processing state + char* meid; + + + errno = 0; + if( ! nn_sock || msg == NULL || rtable == NULL ) { // missing stuff; bail fast + errno = EINVAL; + return FALSE; + } + + meid = ((uta_mhdr_t *) msg->header)->meid; + + if( (ep = get_meid_owner( rtable, meid )) == NULL ) { + if( uepp != NULL ) { // caller needs refernce to endpoint too + *uepp = NULL; + } + + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_meid: no ep in hash for (%s)\n", meid ); + return FALSE; + } + + state = TRUE; + if( ! ep->open ) { // not connected + if( ep->addr == NULL ) { // name didn't resolve before, try again + ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup + } + + if( uta_link2( ep ) ) { // find entry in table and create link + ep->open = TRUE; + *nn_sock = ep->nn_sock; // pass socket back to caller + } else { + state = FALSE; + } + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_meid: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" ); + } else { + *nn_sock = ep->nn_sock; + } + + return state; +} + #endif