X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fsi%2Fsrc%2Frtable_si_static.c;h=86f50fe7062e1dfac2c807db836f4ef65f6bda42;hb=refs%2Ftags%2F3.3.0;hp=6215176a87d3dc47223c0d76bf77fc43702b013d;hpb=99584a241c64d29fc20e74a4b4e01427d0f00e73;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/si/src/rtable_si_static.c b/src/rmr/si/src/rtable_si_static.c index 6215176..86f50fe 100644 --- a/src/rmr/si/src/rtable_si_static.c +++ b/src/rmr/si/src/rtable_si_static.c @@ -69,9 +69,9 @@ static void uta_ep_failed( endpoint_t* ep ) { get things into a bad state if we allow a collision here. The lock grab only happens on the intial session setup. */ -static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) { +//static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) { +static int uta_link2( uta_ctx_t *ctx, endpoint_t* ep ) { static int flags = 0; - char* target; char conn_info[SI_MAX_ADDR_LEN]; // string to give to nano to make the connection char* addr; @@ -101,7 +101,7 @@ static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) { snprintf( conn_info, sizeof( conn_info ), "%s", target ); errno = 0; if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "link2 attempting connection with: %s\n", conn_info ); - if( (ep->nn_sock = SIconnect( si_ctx, conn_info )) < 0 ) { + if( (ep->nn_sock = SIconnect( ctx->si_ctx, conn_info )) < 0 ) { pthread_mutex_unlock( &ep->gate ); if( ep->notify ) { // need to notify if set @@ -114,6 +114,7 @@ static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) { if( DEBUG ) rmr_vlog( RMR_VL_INFO, "rmr_si_link2: connection was successful to: %s\n", target ); ep->open = TRUE; // set open/notify before giving up lock + fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to ep for disc cleanup (while we have the lock) if( ! ep->notify ) { // if we yammered about a failure, indicate finally good rmr_vlog( RMR_VL_INFO, "rmr: link2: connection finally establisehd with target: %s\n", target ); @@ -143,10 +144,7 @@ static int rt_link2_ep( void* vctx, endpoint_t* ep ) { return FALSE; } - uta_link2( ctx->si_ctx, ep ); - if( ep->open ) { - fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to ep for disc cleanup - } + uta_link2( ctx, ep ); return ep->open; } @@ -220,13 +218,14 @@ extern endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n endpoint cannot be found false (0) is returned. */ static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpoint_t** uepp ) { - route_table_t* rt; + route_table_t* rt; si_ctx_t* si_ctx; endpoint_t* ep; int state = FALSE; if( PARINOID_CHECKS ) { if( ctx == NULL || (rt = ctx->rtable) == NULL || (si_ctx = ctx->si_ctx) == NULL ) { + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_byname: parinoia check pop ctx=%p rt=%p\n", ctx, rt ); return FALSE; } } else { @@ -235,6 +234,7 @@ static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpo } ep = rmr_sym_get( rt->hash, ep_name, 1 ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_byname: ep not found: %s\n", ep_name ); if( uepp != NULL ) { // caller needs endpoint too, give it back *uepp = ep; } @@ -250,7 +250,7 @@ static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpo if( ep->addr == NULL ) { // name didn't resolve before, try again ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup } - if( uta_link2( si_ctx, ep ) ) { // find entry in table and create link + if( uta_link2( ctx, ep ) ) { // find entry in table and create link state = TRUE; ep->open = TRUE; *nn_sock = ep->nn_sock; // pass socket back to caller @@ -369,7 +369,7 @@ static int uta_epsock_rr( uta_ctx_t* ctx, rtable_ent_t* rte, int group, int* mor ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup } - if( uta_link2( si_ctx, ep ) ) { // find entry in table and create link + if( uta_link2( ctx, ep ) ) { // find entry in table and create link ep->open = TRUE; *nn_sock = ep->nn_sock; // pass socket back to caller fd2ep_add( ctx, ep->nn_sock, ep ); // map fd to ep for disc cleanup @@ -394,6 +394,8 @@ static int uta_epsock_rr( uta_ctx_t* ctx, rtable_ent_t* rte, int group, int* mor We've been told that the meid is a string, thus we count on it being a nil terminated set of bytes. + + If we return false the caller's ep reference may NOT be valid or even nil. */ static int epsock_meid( uta_ctx_t* ctx, route_table_t *rtable, rmr_mbuf_t* msg, int* nn_sock, endpoint_t** uepp ) { endpoint_t* ep; // seected end point @@ -417,11 +419,12 @@ static int epsock_meid( uta_ctx_t* ctx, route_table_t *rtable, rmr_mbuf_t* msg, meid = ((uta_mhdr_t *) msg->header)->meid; - if( (ep = get_meid_owner( rtable, meid )) == NULL ) { - if( uepp != NULL ) { // caller needs refernce to endpoint too - *uepp = NULL; - } + ep = get_meid_owner( rtable, meid ); + if( uepp != NULL ) { // caller needs refernce to endpoint too + *uepp = ep; + } + if( ep == NULL ) { if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_meid: no ep in hash for (%s)\n", meid ); return FALSE; } @@ -432,7 +435,7 @@ static int epsock_meid( uta_ctx_t* ctx, route_table_t *rtable, rmr_mbuf_t* msg, ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup } - if( uta_link2( si_ctx, ep ) ) { // find entry in table and create link + if( uta_link2( ctx, ep ) ) { // find entry in table and create link ep->open = TRUE; *nn_sock = ep->nn_sock; // pass socket back to caller } else { @@ -448,7 +451,7 @@ static int epsock_meid( uta_ctx_t* ctx, route_table_t *rtable, rmr_mbuf_t* msg, /* Finds the rtable entry which matches the key. Returns a nil pointer if - no entry is found. If try_alternate is set, then we will attempt + no entry is found. If try_alternate is set, then we will attempt to find the entry with a key based only on the message type. */ static inline rtable_ent_t* uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt ) { @@ -508,26 +511,44 @@ static inline char* get_ep_counts( endpoint_t* ep, char* ubuf, int ubuf_len ) { /* Create the hash which maps file descriptors to endpoints. We need this - to easily mark an endpoint as disconnected when we are notified. + to easily mark an endpoint as disconnected when we are notified. Thus we + expect these to be driven very seldomly; locking should not be an issue. + Locking is needed to prevent problems when the user application is multi- + threaded and attempting to (re)connect from concurrent threads. */ static void fd2ep_init( uta_ctx_t* ctx ) { + if( ctx && ! ctx->fd2ep ) { - ctx->fd2ep = rmr_sym_alloc( 129 ); + ctx->fd2ep = rmr_sym_alloc( 129 ); + + if( ctx->fd2ep_gate == NULL ) { + ctx->fd2ep_gate = (pthread_mutex_t *) malloc( sizeof( *ctx->fd2ep_gate ) ); + if( ctx->fd2ep_gate != NULL ) { + pthread_mutex_init( ctx->fd2ep_gate, NULL ); + } + } } } /* - Add an entry into the fd2ep hash which points to the given ep. + Add an entry into the fd2ep hash to map the FD to the endpoint. */ static void fd2ep_add( uta_ctx_t* ctx, int fd, endpoint_t* ep ) { if( ctx && ctx->fd2ep ) { + pthread_mutex_lock( ctx->fd2ep_gate ); + rmr_sym_map( ctx->fd2ep, (uint64_t) fd, (void *) ep ); + + pthread_mutex_unlock( ctx->fd2ep_gate ); } } /* - Given a file descriptor fetches the related endpoint from the hash and - deletes the entry from the hash (when we detect a disconnect). + Given a file descriptor this fetches the related endpoint from the hash and + deletes the entry from the hash (when we detect a disconnect). + + This will also set the state on the ep open to false, and revoke the + FD (nn_socket). */ static endpoint_t* fd2ep_del( uta_ctx_t* ctx, int fd ) { endpoint_t* ep = NULL; @@ -535,7 +556,11 @@ static endpoint_t* fd2ep_del( uta_ctx_t* ctx, int fd ) { if( ctx && ctx->fd2ep ) { ep = rmr_sym_pull( ctx->fd2ep, (uint64_t) fd ); if( ep ) { + pthread_mutex_lock( ctx->fd2ep_gate ); + rmr_sym_ndel( ctx->fd2ep, (uint64_t) fd ); + + pthread_mutex_unlock( ctx->fd2ep_gate ); } } @@ -544,13 +569,17 @@ static endpoint_t* fd2ep_del( uta_ctx_t* ctx, int fd ) { /* Given a file descriptor fetches the related endpoint from the hash. - Returns nil if there is no map. + Returns nil if there is no reference in the hash. */ static endpoint_t* fd2ep_get( uta_ctx_t* ctx, int fd ) { endpoint_t* ep = NULL; if( ctx && ctx->fd2ep ) { + pthread_mutex_lock( ctx->fd2ep_gate ); + ep = rmr_sym_pull( ctx->fd2ep, (uint64_t) fd ); + + pthread_mutex_unlock( ctx->fd2ep_gate ); } return ep;