Changes to fd2ep for locking and disconnect locking
[ric-plt/lib/rmr.git] / src / rmr / si / src / mt_call_si_static.c
index 862b554..2894cdb 100644 (file)
@@ -118,7 +118,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
                if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
                        return SI_RET_OK;
                }
-       
+
                if( fd >= ctx->nrivers || fd < 0 ) {
                        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
                        return SI_RET_OK;
@@ -148,7 +148,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
        while( remain > 0 ) {                                                           // until we've done something with all bytes passed in
                if( DEBUG )  rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
 
-               // FIX ME: size in the message  needs to be network byte order  
+               // FIX ME: size in the message  needs to be network byte order
                if( river->msg_size <= 0 ) {                            // don't have a size yet
                                                                                                        // FIX ME: we need a frame indicator to ensure alignment
                        need = sizeof( int ) - river->ipt;                                                      // what we need from transport buffer
@@ -165,7 +165,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
                                river->ipt += need;
                                bidx += need;
                                remain -= need;
-                               river->msg_size = *((int *) river->accum);                              
+                               river->msg_size = *((int *) river->accum);              
                                if( DEBUG > 1 ) {
                                        rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size );
                                        if( river->msg_size > 500 ) {
@@ -197,7 +197,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
                                buf2mbuf( ctx, river->accum, river->nbytes, fd );                                       // build an RMR mbuf and queue
                                river->accum = (char *) malloc( sizeof( char ) *  river->nbytes );      // fresh accumulator
                        } else {
-                               if( !(river->flags & RF_NOTIFIED) ) {   
+                               if( !(river->flags & RF_NOTIFIED) ) {
                                        rmr_vlog( RMR_VL_WARN, "message larger than max (%d) have arrived on fd %d\n", river->nbytes, fd );
                                        river->flags |= RF_NOTIFIED;
                                }
@@ -206,7 +206,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
                        river->msg_size = -1;
                        river->ipt = 0;
                        bidx += need;
-                       remain -= need; 
+                       remain -= need;
                }
        }
 
@@ -215,8 +215,8 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
 }
 
 /*
-       Callback driven on a disconnect notification. We will attempt to find the related 
-       endpoint via the fd2ep hash maintained in the context. If we find it, then we 
+       Callback driven on a disconnect notification. We will attempt to find the related
+       endpoint via the fd2ep hash maintained in the context. If we find it, then we
        remove it from the hash, and mark the endpoint as closed so that the next attempt
        to send forces a reconnect attempt.
 
@@ -230,10 +230,12 @@ static int mt_disc_cb( void* vctx, int fd ) {
                return SI_RET_OK;
        }
 
-       ep = fd2ep_del( ctx, fd );              // find ep and remote the fd from the hash
-       if( ep ) {
+       ep = fd2ep_del( ctx, fd );              // find ep and remove the fd from the hash
+       if( ep != NULL ) {
+       pthread_mutex_lock( &ep->gate );            // wise to lock this
                ep->open = FALSE;
                ep->nn_sock = -1;
+       pthread_mutex_unlock( &ep->gate );
        }
 
        return SI_RET_OK;