if( river->state != RS_GOOD ) { // all states which aren't good require reset first
if( river->state == RS_NEW ) {
memset( river, 0, sizeof( *river ) );
- river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // max inbound message size
+ river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // start with what user said would be the "normal" max inbound msg size
river->accum = (char *) malloc( river->nbytes );
river->ipt = 0;
} else {
while( remain > 0 ) { // until we've done something with all bytes passed in
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
- if( river->msg_size <= 0 ) { // don't have a size yet
+ if( river->msg_size <= 0 ) { // don't have a message length yet
// FIX ME: we need a frame indicator to ensure alignment
- need = TP_SZFIELD_LEN - river->ipt; // what we need to compute length
- if( need > remain ) { // the whole size isn't there
+ need = TP_SZFIELD_LEN - river->ipt; // what we need to compute the total message length
+ if( need > remain ) { // the whole message len information isn't in this transport buf
if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
memcpy( &river->accum[river->ipt], buf+bidx, remain ); // grab what we can and depart
river->ipt += remain;
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size );
if( river->msg_size > river->nbytes ) { // message bigger than app max size; grab huge buffer
- //river->flags |= RF_DROP;
+ //river->flags |= RF_DROP; // uncomment to drop large messages
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "received message is huge (%d) reallocating buffer\n", river->msg_size );
old_accum = river->accum; // need to copy any bytes we snarfed getting the size, so hold
river->nbytes = river->msg_size + 128; // buffer large enough with a bit of fudge room
}
}
- if( river->msg_size > (river->ipt + remain) ) { // need more than is left in buffer
+ if( river->msg_size > (river->ipt + remain) ) { // need more than is left in receive buffer
if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
- if( (river->flags & RF_DROP) == 0 ) {
- memcpy( &river->accum[river->ipt], buf+bidx, remain ); // buffer and go wait for more
+ if( (river->flags & RF_DROP) == 0 ) { // ok to keep this message; copy bytes
+ memcpy( &river->accum[river->ipt], buf+bidx, remain ); // grab what is in the rcv buffer and go wait for more
}
river->ipt += remain;
remain = 0;
} else {
need = river->msg_size - river->ipt; // bytes from transport we need to have complete message
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d flgs=%02x\n", river->msg_size, need, remain, river->flags );
- if( (river->flags & RF_DROP) == 0 ) {
+ if( (river->flags & RF_DROP) == 0 ) { // keeping this message, copy and pass it on
memcpy( &river->accum[river->ipt], buf+bidx, need ); // grab just what is needed (might be more)
buf2mbuf( ctx, river->accum, river->nbytes, fd ); // build an RMR mbuf and queue
river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // prevent huge size from persisting
river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator
} else {
- if( !(river->flags & RF_NOTIFIED) ) {
+ if( !(river->flags & RF_NOTIFIED) ) { // not keeping huge messages; notify once per stream
rmr_vlog( RMR_VL_WARN, "message larger than allocated buffer (%d) arrived on fd %d\n", river->nbytes, fd );
river->flags |= RF_NOTIFIED;
}