Changes to fd2ep for locking and disconnect locking
[ric-plt/lib/rmr.git] / src / rmr / si / src / rtable_si_static.c
index 117cfc9..0f796e3 100644 (file)
@@ -69,9 +69,9 @@ static void uta_ep_failed( endpoint_t* ep ) {
        get things into a bad state if we allow a collision here.  The lock grab
        only happens on the intial session setup.
 */
-static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
+//static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
+static int uta_link2( uta_ctx_t *ctx, endpoint_t* ep ) {
        static int      flags = 0;
-
        char*           target;
        char            conn_info[SI_MAX_ADDR_LEN];     // string to give to nano to make the connection
        char*           addr;
@@ -101,7 +101,7 @@ static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
        snprintf( conn_info, sizeof( conn_info ), "%s", target );
        errno = 0;
        if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "link2 attempting connection with: %s\n", conn_info );
-       if( (ep->nn_sock = SIconnect( si_ctx, conn_info )) < 0 ) {
+       if( (ep->nn_sock = SIconnect( ctx->si_ctx, conn_info )) < 0 ) {
                pthread_mutex_unlock( &ep->gate );
 
                if( ep->notify ) {                                                      // need to notify if set
@@ -114,6 +114,7 @@ static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
        if( DEBUG ) rmr_vlog( RMR_VL_INFO, "rmr_si_link2: connection was successful to: %s\n", target );
 
        ep->open = TRUE;                                                // set open/notify before giving up lock
+       fd2ep_add( ctx, ep->nn_sock, ep );              // map fd to ep for disc cleanup (while we have the lock)
 
        if( ! ep->notify ) {                                            // if we yammered about a failure, indicate finally good
                rmr_vlog( RMR_VL_INFO, "rmr: link2: connection finally establisehd with target: %s\n", target );
@@ -143,10 +144,7 @@ static int rt_link2_ep( void* vctx, endpoint_t* ep ) {
                return FALSE;
        }
 
-       uta_link2( ctx->si_ctx, ep );
-       if( ep->open ) {
-               fd2ep_add( ctx, ep->nn_sock, ep );              // map fd to ep for disc cleanup
-       }
+       uta_link2( ctx, ep );
        return ep->open;
 }
 
@@ -220,7 +218,7 @@ extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n
        endpoint cannot be found false (0) is returned.
 */
 static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpoint_t** uepp ) {
-       route_table_t*  rt; 
+       route_table_t*  rt;
        si_ctx_t*               si_ctx;
        endpoint_t*             ep;
        int                             state = FALSE;
@@ -250,7 +248,7 @@ static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpo
                if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
                        ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
                }
-               if( uta_link2( si_ctx, ep ) ) {                                                                                 // find entry in table and create link
+               if( uta_link2( ctx, ep ) ) {                                                                                    // find entry in table and create link
                        state = TRUE;
                        ep->open = TRUE;
                        *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
@@ -369,7 +367,7 @@ static int uta_epsock_rr( uta_ctx_t* ctx, rtable_ent_t* rte, int group, int* mor
                                ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
                        }
 
-                       if( uta_link2( si_ctx, ep ) ) {                                                                                 // find entry in table and create link
+                       if( uta_link2( ctx, ep ) ) {                                                                                    // find entry in table and create link
                                ep->open = TRUE;
                                *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
                                fd2ep_add( ctx, ep->nn_sock, ep );                              // map fd to ep for disc cleanup
@@ -435,7 +433,7 @@ static int epsock_meid( uta_ctx_t* ctx, route_table_t *rtable, rmr_mbuf_t* msg,
                        ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
                }
 
-               if( uta_link2( si_ctx, ep ) ) {                         // find entry in table and create link
+               if( uta_link2( ctx, ep ) ) {                            // find entry in table and create link
                        ep->open = TRUE;
                        *nn_sock = ep->nn_sock;                                 // pass socket back to caller
                } else {
@@ -451,7 +449,7 @@ static int epsock_meid( uta_ctx_t* ctx, route_table_t *rtable, rmr_mbuf_t* msg,
 
 /*
        Finds the rtable entry which matches the key. Returns a nil pointer if
-       no entry is found. If try_alternate is set, then we will attempt 
+       no entry is found. If try_alternate is set, then we will attempt
        to find the entry with a key based only on the message type.
 */
 static inline rtable_ent_t*  uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt ) {
@@ -511,26 +509,44 @@ static inline char* get_ep_counts( endpoint_t* ep, char* ubuf, int ubuf_len ) {
 
 /*
        Create the hash which maps file descriptors to endpoints. We need this
-       to easily mark an endpoint as disconnected when we are notified.
+       to easily mark an endpoint as disconnected when we are notified. Thus we
+       expect these to be driven very seldomly; locking should not be an issue.
+       Locking is needed to prevent problems when the user application is multi-
+       threaded and attempting to (re)connect from concurrent threads.
 */
 static void fd2ep_init( uta_ctx_t* ctx ) {
+
        if( ctx  && ! ctx->fd2ep ) {
-               ctx->fd2ep = rmr_sym_alloc( 129 );              
+               ctx->fd2ep = rmr_sym_alloc( 129 );
+
+               if( ctx->fd2ep_gate == NULL ) {
+                       ctx->fd2ep_gate = (pthread_mutex_t *) malloc( sizeof( *ctx->fd2ep_gate ) );
+                       if( ctx->fd2ep_gate != NULL ) {
+                               pthread_mutex_init( ctx->fd2ep_gate, NULL );
+                       }
+               }
        }
 }
 
 /*
-       Add an entry into the fd2ep hash which points to the given ep.
+       Add an entry into the fd2ep hash to map the FD to the endpoint.
 */
 static void fd2ep_add( uta_ctx_t* ctx, int fd, endpoint_t* ep ) {
        if( ctx && ctx->fd2ep ) {
+               pthread_mutex_lock( ctx->fd2ep_gate );
+
                rmr_sym_map( ctx->fd2ep, (uint64_t) fd, (void *) ep );
+
+               pthread_mutex_unlock( ctx->fd2ep_gate );
        }
 }
 
 /*
-       Given a file descriptor fetches the related endpoint from the hash and 
-       deletes the entry from the hash (when we detect a disconnect).  
+       Given a file descriptor this fetches the related endpoint from the hash and
+       deletes the entry from the hash (when we detect a disconnect).
+
+       This will also set the state on the ep open to false, and revoke the
+       FD (nn_socket).
 */
 static endpoint_t*  fd2ep_del( uta_ctx_t* ctx, int fd ) {
        endpoint_t* ep = NULL;
@@ -538,7 +554,11 @@ static endpoint_t*  fd2ep_del( uta_ctx_t* ctx, int fd ) {
        if( ctx && ctx->fd2ep ) {
                ep = rmr_sym_pull(  ctx->fd2ep, (uint64_t) fd );
                if( ep ) {
+                       pthread_mutex_lock( ctx->fd2ep_gate );
+
                        rmr_sym_ndel(  ctx->fd2ep, (uint64_t) fd );
+
+                       pthread_mutex_unlock( ctx->fd2ep_gate );
                }
        }
 
@@ -547,13 +567,17 @@ static endpoint_t*  fd2ep_del( uta_ctx_t* ctx, int fd ) {
 
 /*
        Given a file descriptor fetches the related endpoint from the hash.
-       Returns nil if there is no map.
+       Returns nil if there is no reference in the hash.
 */
 static endpoint_t*  fd2ep_get( uta_ctx_t* ctx, int fd ) {
        endpoint_t* ep = NULL;
 
        if( ctx && ctx->fd2ep ) {
+               pthread_mutex_lock( ctx->fd2ep_gate );
+
                ep = rmr_sym_pull(  ctx->fd2ep, (uint64_t) fd );
+
+               pthread_mutex_unlock( ctx->fd2ep_gate );
        }
 
        return ep;