if( river->state == RS_NEW ) {
memset( river, 0, sizeof( *river ) );
//river->nbytes = sizeof( char ) * (8 * 1024);
- river->nbytes = sizeof( char ) * ctx->max_ibm; // max inbound message size
+ river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // max inbound message size
river->accum = (char *) malloc( river->nbytes );
river->ipt = 0;
} else {
}
river->state = RS_GOOD;
-
-/*
-fprintf( stderr, "\n>>>>> data callback for %d bytes from %d\n", buflen, fd );
-for( i = 0; i < 40; i++ ) {
- fprintf( stderr, "%02x ", (unsigned char) *(buf+i) );
-}
-fprintf( stderr, "\n" );
-*/
-
remain = buflen;
while( remain > 0 ) { // until we've done something with all bytes passed in
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain );
if( (river->flags & RF_DROP) == 0 ) {
memcpy( &river->accum[river->ipt], buf+bidx, need ); // grab just what is needed (might be more)
- buf2mbuf( ctx, river->accum, river->msg_size, fd ); // build an RMR mbuf and queue
+ buf2mbuf( ctx, river->accum, river->nbytes, fd ); // build an RMR mbuf and queue
river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator
} else {
if( !(river->flags & RF_NOTIFIED) ) {