river->msg_size = *((int *) &buf[bidx]); // snarf directly and copy with rest later
}
if( DEBUG ) fprintf( stderr, "[DBUG] data callback setting msg size: %d\n", river->msg_size );
+
+ if( river->msg_size > river->nbytes ) { // message is too big, we will drop it
+ river->flags |= RF_DROP;
+ }
}
if( river->msg_size > (river->ipt + remain) ) { // need more than is left in buffer
if( DEBUG > 1 ) fprintf( stderr, "[DBUG] data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
- memcpy( &river->accum[river->ipt], buf+bidx, remain ); // buffer and go wait for more
+ if( (river->flags & RF_DROP) == 0 ) {
+ memcpy( &river->accum[river->ipt], buf+bidx, remain ); // buffer and go wait for more
+ }
river->ipt += remain;
remain = 0;
} else {
need = river->msg_size - river->ipt; // bytes from transport we need to have complete message
if( DEBUG ) fprintf( stderr, "[DBUG] data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain );
- memcpy( &river->accum[river->ipt], buf+bidx, need ); // grab just what is needed (might be more)
- buf2mbuf( ctx, river->accum, river->msg_size, fd ); // build an RMR mbuf and queue
+ if( (river->flags & RF_DROP) == 0 ) {
+ memcpy( &river->accum[river->ipt], buf+bidx, need ); // grab just what is needed (might be more)
+ buf2mbuf( ctx, river->accum, river->msg_size, fd ); // build an RMR mbuf and queue
+ river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator
+ } else {
+ if( !(river->flags & RF_NOTIFIED) ) {
+ fprintf( stderr, "[WRN] message larger than max (%d) have arrived on fd %d\n", river->nbytes, fd );
+ river->flags |= RF_NOTIFIED;
+ }
+ }
- river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator
river->msg_size = -1;
river->ipt = 0;
bidx += need;