Add first set of SI95 unit tests and health check
[ric-plt/lib/rmr.git] / src / rmr / si / src / mt_call_si_static.c
index 55636bb..d18a8e0 100644 (file)
@@ -38,7 +38,7 @@ static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
        if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
                rmr_free_msg( mbuf );                                                           // drop if ring is full
                if( !warned ) {
-                       fprintf( stderr, "[WARN] rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
+                       rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
                        warned++;
                }
 
@@ -118,9 +118,9 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
                if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
                        return SI_RET_OK;
                }
-       
+
                if( fd >= ctx->nrivers || fd < 0 ) {
-                       if( DEBUG ) fprintf( stderr, "[DBUG] callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
                        return SI_RET_OK;
                }
        }
@@ -134,7 +134,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
                if( river->state == RS_NEW ) {
                        memset( river, 0, sizeof( *river ) );
                        //river->nbytes = sizeof( char ) * (8 * 1024);
-                       river->nbytes = sizeof( char ) * ctx->max_ibm;                  // max inbound message size
+                       river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024);         // max inbound message size
                        river->accum = (char *) malloc( river->nbytes );
                        river->ipt = 0;
                } else {
@@ -144,28 +144,19 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
        }
 
        river->state = RS_GOOD;
-
-/*
-fprintf( stderr, "\n>>>>> data callback for %d bytes from %d\n", buflen, fd );
-for( i = 0; i < 40; i++ ) {
-       fprintf( stderr, "%02x ", (unsigned char) *(buf+i) );
-}
-fprintf( stderr, "\n" );
-*/
-
        remain = buflen;
        while( remain > 0 ) {                                                           // until we've done something with all bytes passed in
-               if( DEBUG )  fprintf( stderr, "[DBUG] ====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
+               if( DEBUG )  rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
 
-               // FIX ME: size in the message  needs to be network byte order  
+               // FIX ME: size in the message  needs to be network byte order
                if( river->msg_size <= 0 ) {                            // don't have a size yet
                                                                                                        // FIX ME: we need a frame indicator to ensure alignment
                        need = sizeof( int ) - river->ipt;                                                      // what we need from transport buffer
                        if( need > remain ) {                                                                           // the whole size isn't there
-                               if( DEBUG > 1 ) fprintf( stderr, "[DBUG] need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
+                               if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
                                memcpy( &river->accum[river->ipt], buf+bidx, remain );                  // grab what we can and depart
                                river->ipt += remain;
-                               if( DEBUG > 1 ) fprintf( stderr, "[DBUG] data callback not enough bytes to compute size; need=%d have=%d\n", need, remain );
+                               if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough bytes to compute size; need=%d have=%d\n", need, remain );
                                return SI_RET_OK;
                        }
 
@@ -174,17 +165,20 @@ fprintf( stderr, "\n" );
                                river->ipt += need;
                                bidx += need;
                                remain -= need;
-                               river->msg_size = *((int *) river->accum);                              
-                               if( DEBUG > 1 ) {
-                                       fprintf( stderr, "[DBUG] size from accumulator =%d\n", river->msg_size );
-                                       if( river->msg_size > 500 ) {
-                                               dump_40( river->accum, "msg size way too large accum:"  );
+                               river->msg_size = *((int *) river->accum);              
+                               if( DEBUG ) {
+                                       rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size );
+                                       if( DEBUG > 1 ) {
+                                               dump_40( river->accum, "from accumulator:"  );
+                                               if( river->msg_size > 100 ) {
+                                                       dump_40( river->accum + 50, "from rmr buf:"  );
+                                               }
                                        }
                                }
                        } else {
                                river->msg_size = *((int *) &buf[bidx]);                                        // snarf directly and copy with rest later
                        }
-                       if( DEBUG ) fprintf( stderr, "[DBUG] data callback setting msg size: %d\n", river->msg_size );
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size );
 
                        if( river->msg_size > river->nbytes ) {                         // message is too big, we will drop it
                                river->flags |= RF_DROP;
@@ -192,7 +186,7 @@ fprintf( stderr, "\n" );
                }
 
                if( river->msg_size > (river->ipt + remain) ) {                                 // need more than is left in buffer
-                       if( DEBUG > 1 ) fprintf( stderr, "[DBUG] data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
+                       if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
                        if( (river->flags & RF_DROP) == 0  ) {
                                memcpy( &river->accum[river->ipt], buf+bidx, remain );          // buffer and go wait for more
                        }
@@ -200,14 +194,14 @@ fprintf( stderr, "\n" );
                        remain = 0;
                } else {
                        need = river->msg_size - river->ipt;                                            // bytes from transport we need to have complete message
-                       if( DEBUG ) fprintf( stderr, "[DBUG] data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain );
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain );
                        if( (river->flags & RF_DROP) == 0  ) {
                                memcpy( &river->accum[river->ipt], buf+bidx, need );                            // grab just what is needed (might be more)
-                               buf2mbuf( ctx, river->accum, river->msg_size, fd );                                     // build an RMR mbuf and queue
+                               buf2mbuf( ctx, river->accum, river->nbytes, fd );                                       // build an RMR mbuf and queue
                                river->accum = (char *) malloc( sizeof( char ) *  river->nbytes );      // fresh accumulator
                        } else {
-                               if( !(river->flags & RF_NOTIFIED) ) {   
-                                       fprintf( stderr, "[WRN] message larger than max (%d) have arrived on fd %d\n", river->nbytes, fd );
+                               if( !(river->flags & RF_NOTIFIED) ) {
+                                       rmr_vlog( RMR_VL_WARN, "message larger than max (%d) have arrived on fd %d\n", river->nbytes, fd );
                                        river->flags |= RF_NOTIFIED;
                                }
                        }
@@ -215,11 +209,38 @@ fprintf( stderr, "\n" );
                        river->msg_size = -1;
                        river->ipt = 0;
                        bidx += need;
-                       remain -= need; 
+                       remain -= need;
                }
        }
 
-       if( DEBUG >2 ) fprintf( stderr, "[DBUG] ##### data callback finished\n" );
+       if( DEBUG >2 ) rmr_vlog( RMR_VL_DEBUG, "##### data callback finished\n" );
+       return SI_RET_OK;
+}
+
+/*
+       Callback driven on a disconnect notification. We will attempt to find the related
+       endpoint via the fd2ep hash maintained in the context. If we find it, then we
+       remove it from the hash, and mark the endpoint as closed so that the next attempt
+       to send forces a reconnect attempt.
+
+       Future: put the ep on a queue to automatically attempt to reconnect.
+*/
+static int mt_disc_cb( void* vctx, int fd ) {
+       uta_ctx_t*      ctx;
+       endpoint_t*     ep;
+
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               return SI_RET_OK;
+       }
+
+       ep = fd2ep_del( ctx, fd );              // find ep and remove the fd from the hash
+       if( ep != NULL ) {
+       pthread_mutex_lock( &ep->gate );            // wise to lock this
+               ep->open = FALSE;
+               ep->nn_sock = -1;
+       pthread_mutex_unlock( &ep->gate );
+       }
+
        return SI_RET_OK;
 }
 
@@ -248,12 +269,15 @@ static void* mt_receive( void* vctx ) {
        uta_ctx_t*      ctx;
 
        if( (ctx = (uta_ctx_t*) vctx) == NULL ) {
-               fprintf( stderr, "[CRI], unable to start mt-receive: ctx was nil\n" );
+               rmr_vlog( RMR_VL_CRIT, "unable to start mt-receive: ctx was nil\n" );
                return NULL;
        }
 
-       if( DEBUG ) fprintf( stderr, "[DBUG] mt_receive: registering SI95 data callback and waiting\n" );
+       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mt_receive: registering SI95 data callback and waiting\n" );
+
        SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx );                  // our callback called only for "cooked" (tcp) data
+       SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx );                   // our callback for handling disconnects
+
        SIwait( ctx->si_ctx );
 
        return NULL;            // keep the compiler happy though never can be reached as SI wait doesn't return