X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fsi%2Fsrc%2Frtable_si_static.c;h=719053a7acc37eda5fb43825ce7a27d1ce0c2ee8;hb=9c923bcc9322c22220b574671c7b46f10008c614;hp=cf83590e50841d6c42ad0c0e8786caa95595868e;hpb=ec88d3c0563eeb6ae5f73427edb0b3c4d7acf299;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/si/src/rtable_si_static.c b/src/rmr/si/src/rtable_si_static.c index cf83590..719053a 100644 --- a/src/rmr/si/src/rtable_si_static.c +++ b/src/rmr/si/src/rtable_si_static.c @@ -53,8 +53,8 @@ */ static void uta_ep_failed( endpoint_t* ep ) { if( ep != NULL ) { - if( DEBUG ) fprintf( stderr, "[DBUG] connection to %s was closed\n", ep->name ); - ep->open = 0; + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "connection to %s was closed\n", ep->name ); + ep->open = FALSE; } } @@ -69,9 +69,9 @@ static void uta_ep_failed( endpoint_t* ep ) { get things into a bad state if we allow a collision here. The lock grab only happens on the intial session setup. */ -static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) { +//static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) { +static int uta_link2( uta_ctx_t *ctx, endpoint_t* ep ) { static int flags = 0; - char* target; char conn_info[SI_MAX_ADDR_LEN]; // string to give to nano to make the connection char* addr; @@ -79,14 +79,14 @@ static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) { char* tok; if( ep == NULL ) { - if( DEBUG ) fprintf( stderr, "[DBUG] link2 ep was nil!\n" ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "link2 ep was nil!\n" ); return FALSE; } target = ep->name; // always give name to transport so changing dest IP does not break reconnect if( target == NULL || (addr = strchr( target, ':' )) == NULL ) { // bad address:port if( ep->notify ) { - fprintf( stderr, "[WARN] rmr: link2: unable to create link: bad target: %s\n", target == NULL ? "" : target ); + rmr_vlog( RMR_VL_WARN, "rmr: link2: unable to create link: bad target: %s\n", target == NULL ? "" : target ); ep->notify = 0; } return FALSE; @@ -100,24 +100,24 @@ static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) { snprintf( conn_info, sizeof( conn_info ), "%s", target ); errno = 0; - if( DEBUG > 1 ) fprintf( stderr, "[DBUG] link2 attempting connection with: %s\n", conn_info ); - if( (ep->nn_sock = SIconnect( si_ctx, conn_info )) < 0 ) { + if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "link2 attempting connection with: %s\n", conn_info ); + if( (ep->nn_sock = SIconnect( ctx->si_ctx, conn_info )) < 0 ) { pthread_mutex_unlock( &ep->gate ); if( ep->notify ) { // need to notify if set - fprintf( stderr, "[WRN] rmr: link2: unable to connect to target: %s: %d %s\n", target, errno, strerror( errno ) ); + rmr_vlog( RMR_VL_WARN, "rmr: link2: unable to connect to target: %s: %d %s\n", target, errno, strerror( errno ) ); ep->notify = 0; } - //nng_close( *nn_sock ); return FALSE; } - if( DEBUG ) fprintf( stderr, "[INFO] rmr_si_link2: connection was successful to: %s\n", target ); + if( DEBUG ) rmr_vlog( RMR_VL_INFO, "rmr_si_link2: connection was successful to: %s\n", target ); ep->open = TRUE; // set open/notify before giving up lock + fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to ep for disc cleanup (while we have the lock) if( ! ep->notify ) { // if we yammered about a failure, indicate finally good - fprintf( stderr, "[INFO] rmr: link2: connection finally establisehd with target: %s\n", target ); + rmr_vlog( RMR_VL_INFO, "rmr: link2: connection finally establisehd with target: %s\n", target ); ep->notify = 1; } @@ -144,7 +144,7 @@ static int rt_link2_ep( void* vctx, endpoint_t* ep ) { return FALSE; } - uta_link2( ctx->si_ctx, ep ); + uta_link2( ctx, ep ); return ep->open; } @@ -161,25 +161,25 @@ extern endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n rrgroup_t* rrg; // pointer at group to update if( ! rte || ! rt ) { - fprintf( stderr, "[WRN] uda_add_ep didn't get a valid rt and/or rte pointer\n" ); + rmr_vlog( RMR_VL_WARN, "uda_add_ep didn't get a valid rt and/or rte pointer\n" ); return NULL; } if( rte->nrrgroups <= group || group < 0 ) { - fprintf( stderr, "[WRN] uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups ); + rmr_vlog( RMR_VL_WARN, "uda_add_ep group out of range: %d (max == %d)\n", group, rte->nrrgroups ); return NULL; } //fprintf( stderr, ">>>> add ep grp=%d to rte @ 0x%p rrg=%p\n", group, rte, rte->rrgroups[group] ); if( (rrg = rte->rrgroups[group]) == NULL ) { if( (rrg = (rrgroup_t *) malloc( sizeof( *rrg ) )) == NULL ) { - fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for round robin group: group=%d\n", group ); + rmr_vlog( RMR_VL_WARN, "rmr_add_ep: malloc failed for round robin group: group=%d\n", group ); return NULL; } memset( rrg, 0, sizeof( *rrg ) ); if( (rrg->epts = (endpoint_t **) malloc( sizeof( endpoint_t ) * MAX_EP_GROUP )) == NULL ) { - fprintf( stderr, "[WRN] rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group ); + rmr_vlog( RMR_VL_WARN, "rmr_add_ep: malloc failed for group endpoint array: group=%d\n", group ); return NULL; } memset( rrg->epts, 0, sizeof( endpoint_t ) * MAX_EP_GROUP ); @@ -191,7 +191,7 @@ extern endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n rrg->nused = 0; // number populated rrg->nendpts = MAX_EP_GROUP; // number allocated - if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rrg added to rte: mtype=%d group=%d\n", rte->mtype, group ); + if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rrg added to rte: mtype=%d group=%d\n", rte->mtype, group ); } ep = rt_ensure_ep( rt, ep_name ); // get the ep and create one if not known @@ -199,7 +199,7 @@ extern endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n if( rrg != NULL ) { if( rrg->nused >= rrg->nendpts ) { // future: reallocate - fprintf( stderr, "[WRN] endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group ); + rmr_vlog( RMR_VL_WARN, "endpoint array for mtype/group %d/%d is full!\n", rte->mtype, group ); return NULL; } @@ -207,7 +207,7 @@ extern endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n rrg->nused++; } - if( DEBUG > 1 ) fprintf( stderr, "[DBUG] endpoint added to mtype/group: %d/%d %s nused=%d\n", rte->mtype, group, ep_name, rrg->nused ); + if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "endpoint added to mtype/group: %d/%d %s nused=%d\n", rte->mtype, group, ep_name, rrg->nused ); return ep; } @@ -217,36 +217,46 @@ extern endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n the user pointer passed in and sets the return value to true (1). If the endpoint cannot be found false (0) is returned. */ -static int uta_epsock_byname( route_table_t* rt, char* ep_name, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx ) { - endpoint_t* ep; - int state = FALSE; - - if( rt == NULL ) { - return FALSE; +static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpoint_t** uepp ) { + route_table_t* rt = NULL; + si_ctx_t* si_ctx; + endpoint_t* ep; + int state = FALSE; + + if( PARANOID_CHECKS ) { + if( ctx == NULL || (rt = ctx->rtable) == NULL || (si_ctx = ctx->si_ctx) == NULL ) { + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_byname: parinoia check pop ctx=%p rt=%p\n", ctx, rt ); + return FALSE; + } + } else { + rt = ctx->rtable; // faster but more risky + si_ctx = ctx->si_ctx; } ep = rmr_sym_get( rt->hash, ep_name, 1 ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_byname: ep not found: %s\n", ep_name ); if( uepp != NULL ) { // caller needs endpoint too, give it back *uepp = ep; } if( ep == NULL ) { - if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s not in hash!\n", ep_name ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "get ep by name for %s not in hash!\n", ep_name ); if( ! ep_name || (ep = rt_ensure_ep( rt, ep_name)) == NULL ) { // create one if not in rt (support rts without entry in our table) return FALSE; } } if( ! ep->open ) { // not open -- connect now - if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s session not started... starting\n", ep_name ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "get ep by name for %s session not started... starting\n", ep_name ); if( ep->addr == NULL ) { // name didn't resolve before, try again ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup } - if( uta_link2( si_ctx, ep ) ) { // find entry in table and create link + if( uta_link2( ctx, ep ) ) { // find entry in table and create link state = TRUE; ep->open = TRUE; *nn_sock = ep->nn_sock; // pass socket back to caller + fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to this ep for disc cleanup } - if( DEBUG ) fprintf( stderr, "[DBUG] epsock_bn: connection state: %s %s\n", state ? "[OK]" : "[FAIL]", ep->name ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_bn: connection state: %s %s\n", state ? "[OK]" : "[FAIL]", ep->name ); } else { *nn_sock = ep->nn_sock; state = TRUE; @@ -281,13 +291,22 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, int* nn_sock, en Both of these, in the grand scheme of things, is minor compared to the overhead of grabbing a lock on each call. */ -static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, int* nn_sock, endpoint_t** uepp, si_ctx_t* si_ctx ) { - endpoint_t* ep; // seected end point +static int uta_epsock_rr( uta_ctx_t* ctx, rtable_ent_t* rte, int group, int* more, int* nn_sock, endpoint_t** uepp ) { + si_ctx_t* si_ctx; + endpoint_t* ep; // selected end point int state = FALSE; // processing state int dummy; rrgroup_t* rrg; int idx; + if( PARANOID_CHECKS ) { + if( ctx == NULL || (si_ctx = ctx->si_ctx) == NULL ) { + return FALSE; + } + } else { + si_ctx = ctx->si_ctx; + } + //fprintf( stderr, ">>>> epsock_rr selecting: grp=%d mtype=%d ngrps=%d\n", group, rte->mtype, rte->nrrgroups ); if( ! more ) { // eliminate cheks each time we need to use @@ -295,26 +314,26 @@ static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, int* nn_sock, } if( ! nn_sock ) { // user didn't supply a pointer - if( DEBUG ) fprintf( stderr, "[DBUG] epsock_rr invalid nnsock pointer\n" ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr invalid nnsock pointer\n" ); errno = EINVAL; *more = 0; return FALSE; } if( rte == NULL ) { - if( DEBUG ) fprintf( stderr, "[DBUG] epsock_rr rte was nil; nothing selected\n" ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr rte was nil; nothing selected\n" ); *more = 0; return FALSE; } if( group < 0 || group >= rte->nrrgroups ) { - if( DEBUG > 1 ) fprintf( stderr, "[DBUG] group out of range: group=%d max=%d\n", group, rte->nrrgroups ); + if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "group out of range: group=%d max=%d\n", group, rte->nrrgroups ); *more = 0; return FALSE; } if( (rrg = rte->rrgroups[group]) == NULL ) { - if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rrg not found for group %d (ptr rrgroups[g] == nil)\n", group ); + if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rrg not found for group %d (ptr rrgroups[g] == nil)\n", group ); *more = 0; // groups are inserted contig, so nothing should be after a nil pointer return FALSE; } @@ -323,19 +342,19 @@ static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, int* nn_sock, switch( rrg->nused ) { case 0: // nothing allocated, just punt - if( DEBUG > 1 ) fprintf( stderr, "[DBUG] nothing allocated for the rrg\n" ); + if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "nothing allocated for the rrg\n" ); return FALSE; case 1: // exactly one, no rr to deal with ep = rrg->epts[0]; - if( DEBUG > 1 ) fprintf( stderr, "[DBUG] _rr returning socket with one choice in group \n" ); + if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "_rr returning socket with one choice in group \n" ); state = TRUE; break; default: // need to pick one and adjust rr counts idx = rrg->ep_idx++ % rrg->nused; // see note above ep = rrg->epts[idx]; // select next endpoint - if( DEBUG > 1 ) fprintf( stderr, "[DBUG] _rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx ); + if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "_rr returning socket with multiple choices in group idx=%d \n", rrg->ep_idx ); state = idx + 1; // unit test checks to see that we're cycling through, so must not just be TRUE break; } @@ -345,30 +364,94 @@ static int uta_epsock_rr( rtable_ent_t *rte, int group, int* more, int* nn_sock, } if( state ) { // end point selected, open if not, get socket either way if( ! ep->open ) { // not connected - if( DEBUG ) fprintf( stderr, "[DBUG] epsock_rr selected endpoint not yet open; opening %s\n", ep->name ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr selected endpoint not yet open; opening %s\n", ep->name ); if( ep->addr == NULL ) { // name didn't resolve before, try again ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup } - if( uta_link2( si_ctx, ep ) ) { // find entry in table and create link + if( uta_link2( ctx, ep ) ) { // find entry in table and create link ep->open = TRUE; *nn_sock = ep->nn_sock; // pass socket back to caller + fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to ep for disc cleanup } else { state = FALSE; } - if( DEBUG ) fprintf( stderr, "[DBUG] epsock_rr: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" ); } else { *nn_sock = ep->nn_sock; } } - if( DEBUG > 1 ) fprintf( stderr, "[DBUG] epsock_rr returns state=%d\n", state ); + if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "epsock_rr returns state=%d\n", state ); + return state; +} + +/* + Given a message, use the meid field to find the owner endpoint for the meid. + The owner ep is then used to extract the socket through which the message + is sent. This returns TRUE if we found a socket and it was written to the + nn_sock pointer; false if we didn't. + + We've been told that the meid is a string, thus we count on it being a nil + terminated set of bytes. + + If we return false the caller's ep reference may NOT be valid or even nil. +*/ +static int epsock_meid( uta_ctx_t* ctx, route_table_t *rtable, rmr_mbuf_t* msg, int* nn_sock, endpoint_t** uepp ) { + endpoint_t* ep; // seected end point + int state = FALSE; // processing state + char* meid; + si_ctx_t* si_ctx; + + if( PARANOID_CHECKS ) { + if( ctx == NULL || (si_ctx = ctx->si_ctx) == NULL ) { + return FALSE; + } + } else { + si_ctx = ctx->si_ctx; + } + + errno = 0; + if( ! nn_sock || msg == NULL || rtable == NULL ) { // missing stuff; bail fast + errno = EINVAL; + return FALSE; + } + + meid = ((uta_mhdr_t *) msg->header)->meid; + + ep = get_meid_owner( rtable, meid ); + if( uepp != NULL ) { // caller needs refernce to endpoint too + *uepp = ep; + } + + if( ep == NULL ) { + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_meid: no ep in hash for (%s)\n", meid ); + return FALSE; + } + + state = TRUE; + if( ! ep->open ) { // not connected + if( ep->addr == NULL ) { // name didn't resolve before, try again + ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup + } + + if( uta_link2( ctx, ep ) ) { // find entry in table and create link + ep->open = TRUE; + *nn_sock = ep->nn_sock; // pass socket back to caller + } else { + state = FALSE; + } + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_meid: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" ); + } else { + *nn_sock = ep->nn_sock; + } + return state; } /* Finds the rtable entry which matches the key. Returns a nil pointer if - no entry is found. If try_alternate is set, then we will attempt + no entry is found. If try_alternate is set, then we will attempt to find the entry with a key based only on the message type. */ static inline rtable_ent_t* uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt ) { @@ -423,4 +506,84 @@ static inline char* get_ep_counts( endpoint_t* ep, char* ubuf, int ubuf_len ) { return rs; } + +// ---- fd to ep functions -------------------------------------------------------------------------- + +/* + Create the hash which maps file descriptors to endpoints. We need this + to easily mark an endpoint as disconnected when we are notified. Thus we + expect these to be driven very seldomly; locking should not be an issue. + Locking is needed to prevent problems when the user application is multi- + threaded and attempting to (re)connect from concurrent threads. +*/ +static void fd2ep_init( uta_ctx_t* ctx ) { + + if( ctx && ! ctx->fd2ep ) { + ctx->fd2ep = rmr_sym_alloc( 129 ); + + if( ctx->fd2ep_gate == NULL ) { + ctx->fd2ep_gate = (pthread_mutex_t *) malloc( sizeof( *ctx->fd2ep_gate ) ); + if( ctx->fd2ep_gate != NULL ) { + pthread_mutex_init( ctx->fd2ep_gate, NULL ); + } + } + } +} + +/* + Add an entry into the fd2ep hash to map the FD to the endpoint. +*/ +static void fd2ep_add( uta_ctx_t* ctx, int fd, endpoint_t* ep ) { + if( ctx && ctx->fd2ep ) { + pthread_mutex_lock( ctx->fd2ep_gate ); + + rmr_sym_map( ctx->fd2ep, (uint64_t) fd, (void *) ep ); + + pthread_mutex_unlock( ctx->fd2ep_gate ); + } +} + +/* + Given a file descriptor this fetches the related endpoint from the hash and + deletes the entry from the hash (when we detect a disconnect). + + This will also set the state on the ep open to false, and revoke the + FD (nn_socket). +*/ +static endpoint_t* fd2ep_del( uta_ctx_t* ctx, int fd ) { + endpoint_t* ep = NULL; + + if( ctx && ctx->fd2ep ) { + ep = rmr_sym_pull( ctx->fd2ep, (uint64_t) fd ); + if( ep ) { + pthread_mutex_lock( ctx->fd2ep_gate ); + + rmr_sym_ndel( ctx->fd2ep, (uint64_t) fd ); + + pthread_mutex_unlock( ctx->fd2ep_gate ); + } + } + + return ep; +} + +/* + Given a file descriptor fetches the related endpoint from the hash. + Returns nil if there is no reference in the hash. +*/ +static endpoint_t* fd2ep_get( uta_ctx_t* ctx, int fd ) { + endpoint_t* ep = NULL; + + if( ctx && ctx->fd2ep ) { + pthread_mutex_lock( ctx->fd2ep_gate ); + + ep = rmr_sym_pull( ctx->fd2ep, (uint64_t) fd ); + + pthread_mutex_unlock( ctx->fd2ep_gate ); + } + + return ep; +} + + #endif