Add first set of SI95 unit tests and health check
[ric-plt/lib/rmr.git] / src / rmr / si / src / mt_call_si_static.c
index 7a8b666..d18a8e0 100644 (file)
@@ -118,7 +118,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
                if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
                        return SI_RET_OK;
                }
-       
+
                if( fd >= ctx->nrivers || fd < 0 ) {
                        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
                        return SI_RET_OK;
@@ -134,7 +134,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
                if( river->state == RS_NEW ) {
                        memset( river, 0, sizeof( *river ) );
                        //river->nbytes = sizeof( char ) * (8 * 1024);
-                       river->nbytes = sizeof( char ) * ctx->max_ibm;                  // max inbound message size
+                       river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024);         // max inbound message size
                        river->accum = (char *) malloc( river->nbytes );
                        river->ipt = 0;
                } else {
@@ -144,20 +144,11 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
        }
 
        river->state = RS_GOOD;
-
-/*
-fprintf( stderr, "\n>>>>> data callback for %d bytes from %d\n", buflen, fd );
-for( i = 0; i < 40; i++ ) {
-       fprintf( stderr, "%02x ", (unsigned char) *(buf+i) );
-}
-fprintf( stderr, "\n" );
-*/
-
        remain = buflen;
        while( remain > 0 ) {                                                           // until we've done something with all bytes passed in
                if( DEBUG )  rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
 
-               // FIX ME: size in the message  needs to be network byte order  
+               // FIX ME: size in the message  needs to be network byte order
                if( river->msg_size <= 0 ) {                            // don't have a size yet
                                                                                                        // FIX ME: we need a frame indicator to ensure alignment
                        need = sizeof( int ) - river->ipt;                                                      // what we need from transport buffer
@@ -174,11 +165,14 @@ fprintf( stderr, "\n" );
                                river->ipt += need;
                                bidx += need;
                                remain -= need;
-                               river->msg_size = *((int *) river->accum);                              
-                               if( DEBUG > 1 ) {
+                               river->msg_size = *((int *) river->accum);              
+                               if( DEBUG ) {
                                        rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size );
-                                       if( river->msg_size > 500 ) {
-                                               dump_40( river->accum, "msg size way too large accum:"  );
+                                       if( DEBUG > 1 ) {
+                                               dump_40( river->accum, "from accumulator:"  );
+                                               if( river->msg_size > 100 ) {
+                                                       dump_40( river->accum + 50, "from rmr buf:"  );
+                                               }
                                        }
                                }
                        } else {
@@ -203,10 +197,10 @@ fprintf( stderr, "\n" );
                        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain );
                        if( (river->flags & RF_DROP) == 0  ) {
                                memcpy( &river->accum[river->ipt], buf+bidx, need );                            // grab just what is needed (might be more)
-                               buf2mbuf( ctx, river->accum, river->msg_size, fd );                                     // build an RMR mbuf and queue
+                               buf2mbuf( ctx, river->accum, river->nbytes, fd );                                       // build an RMR mbuf and queue
                                river->accum = (char *) malloc( sizeof( char ) *  river->nbytes );      // fresh accumulator
                        } else {
-                               if( !(river->flags & RF_NOTIFIED) ) {   
+                               if( !(river->flags & RF_NOTIFIED) ) {
                                        rmr_vlog( RMR_VL_WARN, "message larger than max (%d) have arrived on fd %d\n", river->nbytes, fd );
                                        river->flags |= RF_NOTIFIED;
                                }
@@ -215,7 +209,7 @@ fprintf( stderr, "\n" );
                        river->msg_size = -1;
                        river->ipt = 0;
                        bidx += need;
-                       remain -= need; 
+                       remain -= need;
                }
        }
 
@@ -224,8 +218,8 @@ fprintf( stderr, "\n" );
 }
 
 /*
-       Callback driven on a disconnect notification. We will attempt to find the related 
-       endpoint via the fd2ep hash maintained in the context. If we find it, then we 
+       Callback driven on a disconnect notification. We will attempt to find the related
+       endpoint via the fd2ep hash maintained in the context. If we find it, then we
        remove it from the hash, and mark the endpoint as closed so that the next attempt
        to send forces a reconnect attempt.
 
@@ -239,10 +233,12 @@ static int mt_disc_cb( void* vctx, int fd ) {
                return SI_RET_OK;
        }
 
-       ep = fd2ep_del( ctx, fd );              // find ep and remote the fd from the hash
-       if( ep ) {
+       ep = fd2ep_del( ctx, fd );              // find ep and remove the fd from the hash
+       if( ep != NULL ) {
+       pthread_mutex_lock( &ep->gate );            // wise to lock this
                ep->open = FALSE;
                ep->nn_sock = -1;
+       pthread_mutex_unlock( &ep->gate );
        }
 
        return SI_RET_OK;