Merge "Cmake, change files missing from previous commit"
[ric-plt/lib/rmr.git] / src / rmr / si / src / mt_call_si_static.c
index 6e2e8aa..7a8b666 100644 (file)
@@ -38,7 +38,7 @@ static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
        if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
                rmr_free_msg( mbuf );                                                           // drop if ring is full
                if( !warned ) {
        if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
                rmr_free_msg( mbuf );                                                           // drop if ring is full
                if( !warned ) {
-                       fprintf( stderr, "[WARN] rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
+                       rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
                        warned++;
                }
 
                        warned++;
                }
 
@@ -54,8 +54,6 @@ static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
        call ref to point to all of the various bits and set real len etc,
        then we queue it.  Raw_msg is expected to include the transport goo
        placed in front of the RMR header and payload.
        call ref to point to all of the various bits and set real len etc,
        then we queue it.  Raw_msg is expected to include the transport goo
        placed in front of the RMR header and payload.
-
-       done -- FIX ME?? can we eliminate the buffer copy here?
 */
 static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd ) {
        rmr_mbuf_t*             mbuf;
 */
 static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd ) {
        rmr_mbuf_t*             mbuf;
@@ -70,18 +68,10 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd
                }
        }
 
                }
        }
 
-/*
-       if( (mbuf = (rmr_mbuf_t *) malloc( sizeof( *mbuf ))) != NULL ) {                // alloc mbuf and point at various bits of payload
-               memset( mbuf, 0, sizeof( *mbuf ) );
-               mbuf->tp_buf = raw_msg;
-               mbuf->ring = ctx->zcb_mring;
-*/
        if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) {
                mbuf->tp_buf = raw_msg;
                mbuf->rts_fd = sender_fd;
 
        if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) {
                mbuf->tp_buf = raw_msg;
                mbuf->rts_fd = sender_fd;
 
-               // eliminated :)   memcpy( mbuf->tp_buf, river->accum + offset, river->msg_size );
-
                ref_tpbuf( mbuf, msg_size );                            // point mbuf at bits in the datagram
                hdr = mbuf->header;                                                     // convenience
                if( hdr->flags & HFL_CALL_MSG ) {                       // call generated message; ignore call-id etc and queue
                ref_tpbuf( mbuf, msg_size );                            // point mbuf at bits in the datagram
                hdr = mbuf->header;                                                     // convenience
                if( hdr->flags & HFL_CALL_MSG ) {                       // call generated message; ignore call-id etc and queue
@@ -124,18 +114,17 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
        int                             need;                   // bytes needed for something
        int                             i;
 
        int                             need;                   // bytes needed for something
        int                             i;
 
-       // for speed these checks should be enabled only in debug mode and assume we always get a good context
-       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
-               return SI_RET_OK;
-       }
-
-       if( fd >= ctx->nrivers || fd < 0 ) {
-               if( DEBUG ) fprintf( stderr, "[DBUG] callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
-               return SI_RET_OK;
+       if( PARINOID_CHECKS ) {                                                                 // PARINOID mode is slower; off by default
+               if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+                       return SI_RET_OK;
+               }
+       
+               if( fd >= ctx->nrivers || fd < 0 ) {
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
+                       return SI_RET_OK;
+               }
        }
 
        }
 
-       // -------- end debug checks -----------------
-
        if( buflen <= 0 ) {
                return SI_RET_OK;
        }
        if( buflen <= 0 ) {
                return SI_RET_OK;
        }
@@ -159,24 +148,24 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
 /*
 fprintf( stderr, "\n>>>>> data callback for %d bytes from %d\n", buflen, fd );
 for( i = 0; i < 40; i++ ) {
 /*
 fprintf( stderr, "\n>>>>> data callback for %d bytes from %d\n", buflen, fd );
 for( i = 0; i < 40; i++ ) {
-fprintf( stderr, "%02x ", (unsigned char) *(buf+i) );
+       fprintf( stderr, "%02x ", (unsigned char) *(buf+i) );
 }
 fprintf( stderr, "\n" );
 */
 
        remain = buflen;
        while( remain > 0 ) {                                                           // until we've done something with all bytes passed in
 }
 fprintf( stderr, "\n" );
 */
 
        remain = buflen;
        while( remain > 0 ) {                                                           // until we've done something with all bytes passed in
-               if( DEBUG )  fprintf( stderr, "[DBUG] ====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
+               if( DEBUG )  rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
 
                // FIX ME: size in the message  needs to be network byte order  
                if( river->msg_size <= 0 ) {                            // don't have a size yet
                                                                                                        // FIX ME: we need a frame indicator to ensure alignment
                        need = sizeof( int ) - river->ipt;                                                      // what we need from transport buffer
                        if( need > remain ) {                                                                           // the whole size isn't there
 
                // FIX ME: size in the message  needs to be network byte order  
                if( river->msg_size <= 0 ) {                            // don't have a size yet
                                                                                                        // FIX ME: we need a frame indicator to ensure alignment
                        need = sizeof( int ) - river->ipt;                                                      // what we need from transport buffer
                        if( need > remain ) {                                                                           // the whole size isn't there
-                               if( DEBUG > 1 ) fprintf( stderr, "[DBUG] need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
+                               if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
                                memcpy( &river->accum[river->ipt], buf+bidx, remain );                  // grab what we can and depart
                                river->ipt += remain;
                                memcpy( &river->accum[river->ipt], buf+bidx, remain );                  // grab what we can and depart
                                river->ipt += remain;
-                               if( DEBUG > 1 ) fprintf( stderr, "[DBUG] data callback not enough bytes to compute size; need=%d have=%d\n", need, remain );
+                               if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough bytes to compute size; need=%d have=%d\n", need, remain );
                                return SI_RET_OK;
                        }
 
                                return SI_RET_OK;
                        }
 
@@ -187,7 +176,7 @@ fprintf( stderr, "\n" );
                                remain -= need;
                                river->msg_size = *((int *) river->accum);                              
                                if( DEBUG > 1 ) {
                                remain -= need;
                                river->msg_size = *((int *) river->accum);                              
                                if( DEBUG > 1 ) {
-                                       fprintf( stderr, "[DBUG] size from accumulator =%d\n", river->msg_size );
+                                       rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size );
                                        if( river->msg_size > 500 ) {
                                                dump_40( river->accum, "msg size way too large accum:"  );
                                        }
                                        if( river->msg_size > 500 ) {
                                                dump_40( river->accum, "msg size way too large accum:"  );
                                        }
@@ -195,21 +184,34 @@ fprintf( stderr, "\n" );
                        } else {
                                river->msg_size = *((int *) &buf[bidx]);                                        // snarf directly and copy with rest later
                        }
                        } else {
                                river->msg_size = *((int *) &buf[bidx]);                                        // snarf directly and copy with rest later
                        }
-                       if( DEBUG ) fprintf( stderr, "[DBUG] data callback setting msg size: %d\n", river->msg_size );
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size );
+
+                       if( river->msg_size > river->nbytes ) {                         // message is too big, we will drop it
+                               river->flags |= RF_DROP;
+                       }
                }
 
                if( river->msg_size > (river->ipt + remain) ) {                                 // need more than is left in buffer
                }
 
                if( river->msg_size > (river->ipt + remain) ) {                                 // need more than is left in buffer
-                       if( DEBUG > 1 ) fprintf( stderr, "[DBUG] data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
-                       memcpy( &river->accum[river->ipt], buf+bidx, remain );          // buffer and go wait for more
+                       if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
+                       if( (river->flags & RF_DROP) == 0  ) {
+                               memcpy( &river->accum[river->ipt], buf+bidx, remain );          // buffer and go wait for more
+                       }
                        river->ipt += remain;
                        remain = 0;
                } else {
                        need = river->msg_size - river->ipt;                                            // bytes from transport we need to have complete message
                        river->ipt += remain;
                        remain = 0;
                } else {
                        need = river->msg_size - river->ipt;                                            // bytes from transport we need to have complete message
-                       if( DEBUG ) fprintf( stderr, "[DBUG] data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain );
-                       memcpy( &river->accum[river->ipt], buf+bidx, need );            // grab just what is needed (might be more)
-                       buf2mbuf( ctx, river->accum, river->msg_size, fd );                             // build an RMR mbuf and queue
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain );
+                       if( (river->flags & RF_DROP) == 0  ) {
+                               memcpy( &river->accum[river->ipt], buf+bidx, need );                            // grab just what is needed (might be more)
+                               buf2mbuf( ctx, river->accum, river->msg_size, fd );                                     // build an RMR mbuf and queue
+                               river->accum = (char *) malloc( sizeof( char ) *  river->nbytes );      // fresh accumulator
+                       } else {
+                               if( !(river->flags & RF_NOTIFIED) ) {   
+                                       rmr_vlog( RMR_VL_WARN, "message larger than max (%d) have arrived on fd %d\n", river->nbytes, fd );
+                                       river->flags |= RF_NOTIFIED;
+                               }
+                       }
 
 
-                       river->accum = (char *) malloc( sizeof( char ) *  river->nbytes );      // fresh accumulator
                        river->msg_size = -1;
                        river->ipt = 0;
                        bidx += need;
                        river->msg_size = -1;
                        river->ipt = 0;
                        bidx += need;
@@ -217,7 +219,32 @@ fprintf( stderr, "\n" );
                }
        }
 
                }
        }
 
-       if( DEBUG >2 ) fprintf( stderr, "[DBUG] ##### data callback finished\n" );
+       if( DEBUG >2 ) rmr_vlog( RMR_VL_DEBUG, "##### data callback finished\n" );
+       return SI_RET_OK;
+}
+
+/*
+       Callback driven on a disconnect notification. We will attempt to find the related 
+       endpoint via the fd2ep hash maintained in the context. If we find it, then we 
+       remove it from the hash, and mark the endpoint as closed so that the next attempt
+       to send forces a reconnect attempt.
+
+       Future: put the ep on a queue to automatically attempt to reconnect.
+*/
+static int mt_disc_cb( void* vctx, int fd ) {
+       uta_ctx_t*      ctx;
+       endpoint_t*     ep;
+
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               return SI_RET_OK;
+       }
+
+       ep = fd2ep_del( ctx, fd );              // find ep and remote the fd from the hash
+       if( ep ) {
+               ep->open = FALSE;
+               ep->nn_sock = -1;
+       }
+
        return SI_RET_OK;
 }
 
        return SI_RET_OK;
 }
 
@@ -226,7 +253,7 @@ fprintf( stderr, "\n" );
        This is expected to execute in a separate thread. It is responsible for
        _all_ receives and queues them on the appropriate ring, or chute.
        It does this by registering the callback function above with the SI world
        This is expected to execute in a separate thread. It is responsible for
        _all_ receives and queues them on the appropriate ring, or chute.
        It does this by registering the callback function above with the SI world
-       and then caling SIwait() to drive the callback when data has arrived.
+       and then calling SIwait() to drive the callback when data has arrived.
 
 
        The "state" of the message is checked which determines where the message
 
 
        The "state" of the message is checked which determines where the message
@@ -246,12 +273,15 @@ static void* mt_receive( void* vctx ) {
        uta_ctx_t*      ctx;
 
        if( (ctx = (uta_ctx_t*) vctx) == NULL ) {
        uta_ctx_t*      ctx;
 
        if( (ctx = (uta_ctx_t*) vctx) == NULL ) {
-               fprintf( stderr, "[CRI], unable to start mt-receive: ctx was nil\n" );
+               rmr_vlog( RMR_VL_CRIT, "unable to start mt-receive: ctx was nil\n" );
                return NULL;
        }
 
                return NULL;
        }
 
-       if( DEBUG ) fprintf( stderr, "[DBUG] mt_receive: registering SI95 data callback and waiting\n" );
+       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mt_receive: registering SI95 data callback and waiting\n" );
+
        SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx );                  // our callback called only for "cooked" (tcp) data
        SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx );                  // our callback called only for "cooked" (tcp) data
+       SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx );                   // our callback for handling disconnects
+
        SIwait( ctx->si_ctx );
 
        return NULL;            // keep the compiler happy though never can be reached as SI wait doesn't return
        SIwait( ctx->si_ctx );
 
        return NULL;            // keep the compiler happy though never can be reached as SI wait doesn't return