Changes to fd2ep for locking and disconnect locking 02/2602/1
authorE. Scott Daniels <daniels@research.att.com>
Wed, 26 Feb 2020 14:20:45 +0000 (09:20 -0500)
committerE. Scott Daniels <daniels@research.att.com>
Wed, 26 Feb 2020 14:20:45 +0000 (09:20 -0500)
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: I4c3d9d4191d0c994246c076861b52973413404a6

src/rmr/si/include/rmr_si_private.h
src/rmr/si/src/mt_call_si_static.c
src/rmr/si/src/rmr_si.c
src/rmr/si/src/rtable_si_static.c

index 99b5b20..ddc9c3f 100644 (file)
@@ -150,7 +150,8 @@ struct uta_ctx {
        river_t*        rivers;                 // inbound flows (index is the socket fd)
        int                     max_ibm;                // max size of an inbound message (river accum alloc size)
        void*           zcb_mring;              // zero copy buffer mbuf ring
-       void*           fd2ep;                  // the symtab mapping file des to endpoints for cleanup on disconnect
+       void*           fd2ep;                          // the symtab mapping file des to endpoints for cleanup on disconnect
+       pthread_mutex_t *fd2ep_gate;    // we must gate add/deletes to the fd2 symtab
 };
 
 typedef uta_ctx_t uta_ctx;
@@ -167,7 +168,8 @@ static void free_ctx( uta_ctx_t* ctx );
 
 // --- rt table things ---------------------------
 static void uta_ep_failed( endpoint_t* ep );
-static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep );
+static int uta_link2( uta_ctx_t *ctx, endpoint_t* ep );
+
 static int rt_link2_ep( void* vctx, endpoint_t* ep );
 static rtable_ent_t* uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt );
 static inline int xlate_si_state( int state, int def_state );
index 862b554..2894cdb 100644 (file)
@@ -118,7 +118,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
                if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
                        return SI_RET_OK;
                }
-       
+
                if( fd >= ctx->nrivers || fd < 0 ) {
                        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
                        return SI_RET_OK;
@@ -148,7 +148,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
        while( remain > 0 ) {                                                           // until we've done something with all bytes passed in
                if( DEBUG )  rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
 
-               // FIX ME: size in the message  needs to be network byte order  
+               // FIX ME: size in the message  needs to be network byte order
                if( river->msg_size <= 0 ) {                            // don't have a size yet
                                                                                                        // FIX ME: we need a frame indicator to ensure alignment
                        need = sizeof( int ) - river->ipt;                                                      // what we need from transport buffer
@@ -165,7 +165,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
                                river->ipt += need;
                                bidx += need;
                                remain -= need;
-                               river->msg_size = *((int *) river->accum);                              
+                               river->msg_size = *((int *) river->accum);              
                                if( DEBUG > 1 ) {
                                        rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size );
                                        if( river->msg_size > 500 ) {
@@ -197,7 +197,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
                                buf2mbuf( ctx, river->accum, river->nbytes, fd );                                       // build an RMR mbuf and queue
                                river->accum = (char *) malloc( sizeof( char ) *  river->nbytes );      // fresh accumulator
                        } else {
-                               if( !(river->flags & RF_NOTIFIED) ) {   
+                               if( !(river->flags & RF_NOTIFIED) ) {
                                        rmr_vlog( RMR_VL_WARN, "message larger than max (%d) have arrived on fd %d\n", river->nbytes, fd );
                                        river->flags |= RF_NOTIFIED;
                                }
@@ -206,7 +206,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
                        river->msg_size = -1;
                        river->ipt = 0;
                        bidx += need;
-                       remain -= need; 
+                       remain -= need;
                }
        }
 
@@ -215,8 +215,8 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
 }
 
 /*
-       Callback driven on a disconnect notification. We will attempt to find the related 
-       endpoint via the fd2ep hash maintained in the context. If we find it, then we 
+       Callback driven on a disconnect notification. We will attempt to find the related
+       endpoint via the fd2ep hash maintained in the context. If we find it, then we
        remove it from the hash, and mark the endpoint as closed so that the next attempt
        to send forces a reconnect attempt.
 
@@ -230,10 +230,12 @@ static int mt_disc_cb( void* vctx, int fd ) {
                return SI_RET_OK;
        }
 
-       ep = fd2ep_del( ctx, fd );              // find ep and remote the fd from the hash
-       if( ep ) {
+       ep = fd2ep_del( ctx, fd );              // find ep and remove the fd from the hash
+       if( ep != NULL ) {
+       pthread_mutex_lock( &ep->gate );            // wise to lock this
                ep->open = FALSE;
                ep->nn_sock = -1;
+       pthread_mutex_unlock( &ep->gate );
        }
 
        return SI_RET_OK;
index 9c67dc3..6be4ffb 100644 (file)
@@ -555,7 +555,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        rmr_set_vlevel( RMR_VL_INFO );          // we WILL announce our version etc
 
        if( ! announced ) {
-               rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/f mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
+               rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/g mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
                        RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
                announced = 1;
        }
@@ -697,6 +697,9 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
                return NULL;
        }
 
+                                                                                               // finish all flag setting before threads to keep helgrind quiet
+       ctx->flags |= CFL_MTC_ENABLED;                          // for SI threaded receiver is the only way
+
        if( flags & FL_NOTHREAD ) {                                     // thread set to off; no rout table collector started (could be called by the rtc thread itself)
                ctx->rtable = rt_clone_space( NULL, NULL, 0 );          // creates an empty route table so that wormholes still can be used
        } else {
@@ -711,7 +714,6 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
                }
        }
 
-       ctx->flags |= CFL_MTC_ENABLED;                                                                                          // for SI threaded receiver is the only way
        if( pthread_create( &ctx->mtc_th,  NULL, mt_receive, (void *) ctx ) ) {         // so kick it
                rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
        }
index 117cfc9..0f796e3 100644 (file)
@@ -69,9 +69,9 @@ static void uta_ep_failed( endpoint_t* ep ) {
        get things into a bad state if we allow a collision here.  The lock grab
        only happens on the intial session setup.
 */
-static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
+//static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
+static int uta_link2( uta_ctx_t *ctx, endpoint_t* ep ) {
        static int      flags = 0;
-
        char*           target;
        char            conn_info[SI_MAX_ADDR_LEN];     // string to give to nano to make the connection
        char*           addr;
@@ -101,7 +101,7 @@ static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
        snprintf( conn_info, sizeof( conn_info ), "%s", target );
        errno = 0;
        if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "link2 attempting connection with: %s\n", conn_info );
-       if( (ep->nn_sock = SIconnect( si_ctx, conn_info )) < 0 ) {
+       if( (ep->nn_sock = SIconnect( ctx->si_ctx, conn_info )) < 0 ) {
                pthread_mutex_unlock( &ep->gate );
 
                if( ep->notify ) {                                                      // need to notify if set
@@ -114,6 +114,7 @@ static int uta_link2( si_ctx_t* si_ctx, endpoint_t* ep ) {
        if( DEBUG ) rmr_vlog( RMR_VL_INFO, "rmr_si_link2: connection was successful to: %s\n", target );
 
        ep->open = TRUE;                                                // set open/notify before giving up lock
+       fd2ep_add( ctx, ep->nn_sock, ep );              // map fd to ep for disc cleanup (while we have the lock)
 
        if( ! ep->notify ) {                                            // if we yammered about a failure, indicate finally good
                rmr_vlog( RMR_VL_INFO, "rmr: link2: connection finally establisehd with target: %s\n", target );
@@ -143,10 +144,7 @@ static int rt_link2_ep( void* vctx, endpoint_t* ep ) {
                return FALSE;
        }
 
-       uta_link2( ctx->si_ctx, ep );
-       if( ep->open ) {
-               fd2ep_add( ctx, ep->nn_sock, ep );              // map fd to ep for disc cleanup
-       }
+       uta_link2( ctx, ep );
        return ep->open;
 }
 
@@ -220,7 +218,7 @@ extern endpoint_t*  uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_n
        endpoint cannot be found false (0) is returned.
 */
 static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpoint_t** uepp ) {
-       route_table_t*  rt; 
+       route_table_t*  rt;
        si_ctx_t*               si_ctx;
        endpoint_t*             ep;
        int                             state = FALSE;
@@ -250,7 +248,7 @@ static int uta_epsock_byname( uta_ctx_t* ctx, char* ep_name, int* nn_sock, endpo
                if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
                        ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
                }
-               if( uta_link2( si_ctx, ep ) ) {                                                                                 // find entry in table and create link
+               if( uta_link2( ctx, ep ) ) {                                                                                    // find entry in table and create link
                        state = TRUE;
                        ep->open = TRUE;
                        *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
@@ -369,7 +367,7 @@ static int uta_epsock_rr( uta_ctx_t* ctx, rtable_ent_t* rte, int group, int* mor
                                ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
                        }
 
-                       if( uta_link2( si_ctx, ep ) ) {                                                                                 // find entry in table and create link
+                       if( uta_link2( ctx, ep ) ) {                                                                                    // find entry in table and create link
                                ep->open = TRUE;
                                *nn_sock = ep->nn_sock;                                                 // pass socket back to caller
                                fd2ep_add( ctx, ep->nn_sock, ep );                              // map fd to ep for disc cleanup
@@ -435,7 +433,7 @@ static int epsock_meid( uta_ctx_t* ctx, route_table_t *rtable, rmr_mbuf_t* msg,
                        ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
                }
 
-               if( uta_link2( si_ctx, ep ) ) {                         // find entry in table and create link
+               if( uta_link2( ctx, ep ) ) {                            // find entry in table and create link
                        ep->open = TRUE;
                        *nn_sock = ep->nn_sock;                                 // pass socket back to caller
                } else {
@@ -451,7 +449,7 @@ static int epsock_meid( uta_ctx_t* ctx, route_table_t *rtable, rmr_mbuf_t* msg,
 
 /*
        Finds the rtable entry which matches the key. Returns a nil pointer if
-       no entry is found. If try_alternate is set, then we will attempt 
+       no entry is found. If try_alternate is set, then we will attempt
        to find the entry with a key based only on the message type.
 */
 static inline rtable_ent_t*  uta_get_rte( route_table_t *rt, int sid, int mtype, int try_alt ) {
@@ -511,26 +509,44 @@ static inline char* get_ep_counts( endpoint_t* ep, char* ubuf, int ubuf_len ) {
 
 /*
        Create the hash which maps file descriptors to endpoints. We need this
-       to easily mark an endpoint as disconnected when we are notified.
+       to easily mark an endpoint as disconnected when we are notified. Thus we
+       expect these to be driven very seldomly; locking should not be an issue.
+       Locking is needed to prevent problems when the user application is multi-
+       threaded and attempting to (re)connect from concurrent threads.
 */
 static void fd2ep_init( uta_ctx_t* ctx ) {
+
        if( ctx  && ! ctx->fd2ep ) {
-               ctx->fd2ep = rmr_sym_alloc( 129 );              
+               ctx->fd2ep = rmr_sym_alloc( 129 );
+
+               if( ctx->fd2ep_gate == NULL ) {
+                       ctx->fd2ep_gate = (pthread_mutex_t *) malloc( sizeof( *ctx->fd2ep_gate ) );
+                       if( ctx->fd2ep_gate != NULL ) {
+                               pthread_mutex_init( ctx->fd2ep_gate, NULL );
+                       }
+               }
        }
 }
 
 /*
-       Add an entry into the fd2ep hash which points to the given ep.
+       Add an entry into the fd2ep hash to map the FD to the endpoint.
 */
 static void fd2ep_add( uta_ctx_t* ctx, int fd, endpoint_t* ep ) {
        if( ctx && ctx->fd2ep ) {
+               pthread_mutex_lock( ctx->fd2ep_gate );
+
                rmr_sym_map( ctx->fd2ep, (uint64_t) fd, (void *) ep );
+
+               pthread_mutex_unlock( ctx->fd2ep_gate );
        }
 }
 
 /*
-       Given a file descriptor fetches the related endpoint from the hash and 
-       deletes the entry from the hash (when we detect a disconnect).  
+       Given a file descriptor this fetches the related endpoint from the hash and
+       deletes the entry from the hash (when we detect a disconnect).
+
+       This will also set the state on the ep open to false, and revoke the
+       FD (nn_socket).
 */
 static endpoint_t*  fd2ep_del( uta_ctx_t* ctx, int fd ) {
        endpoint_t* ep = NULL;
@@ -538,7 +554,11 @@ static endpoint_t*  fd2ep_del( uta_ctx_t* ctx, int fd ) {
        if( ctx && ctx->fd2ep ) {
                ep = rmr_sym_pull(  ctx->fd2ep, (uint64_t) fd );
                if( ep ) {
+                       pthread_mutex_lock( ctx->fd2ep_gate );
+
                        rmr_sym_ndel(  ctx->fd2ep, (uint64_t) fd );
+
+                       pthread_mutex_unlock( ctx->fd2ep_gate );
                }
        }
 
@@ -547,13 +567,17 @@ static endpoint_t*  fd2ep_del( uta_ctx_t* ctx, int fd ) {
 
 /*
        Given a file descriptor fetches the related endpoint from the hash.
-       Returns nil if there is no map.
+       Returns nil if there is no reference in the hash.
 */
 static endpoint_t*  fd2ep_get( uta_ctx_t* ctx, int fd ) {
        endpoint_t* ep = NULL;
 
        if( ctx && ctx->fd2ep ) {
+               pthread_mutex_lock( ctx->fd2ep_gate );
+
                ep = rmr_sym_pull(  ctx->fd2ep, (uint64_t) fd );
+
+               pthread_mutex_unlock( ctx->fd2ep_gate );
        }
 
        return ep;