X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fsi%2Fsrc%2Fmt_call_si_static.c;h=1b31bf41df2af017ccb0403de661fe3d0b652430;hb=refs%2Fchanges%2F21%2F5521%2F1;hp=76b56b21a0f04437ded7c7d157870f9704c05238;hpb=d762b36427051dc6d1a7e64b44df4334f1ad9dbd;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/si/src/mt_call_si_static.c b/src/rmr/si/src/mt_call_si_static.c index 76b56b2..1b31bf4 100644 --- a/src/rmr/si/src/mt_call_si_static.c +++ b/src/rmr/si/src/mt_call_si_static.c @@ -178,7 +178,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { if( river->state != RS_GOOD ) { // all states which aren't good require reset first if( river->state == RS_NEW ) { memset( river, 0, sizeof( *river ) ); - river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // max inbound message size + river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // start with what user said would be the "normal" max inbound msg size river->accum = (char *) malloc( river->nbytes ); river->ipt = 0; } else { @@ -192,10 +192,10 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { while( remain > 0 ) { // until we've done something with all bytes passed in if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain ); - if( river->msg_size <= 0 ) { // don't have a size yet + if( river->msg_size <= 0 ) { // don't have a message length yet // FIX ME: we need a frame indicator to ensure alignment - need = TP_SZFIELD_LEN - river->ipt; // what we need to compute length - if( need > remain ) { // the whole size isn't there + need = TP_SZFIELD_LEN - river->ipt; // what we need to compute the total message length + if( need > remain ) { // the whole message len information isn't in this transport buf if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt ); memcpy( &river->accum[river->ipt], buf+bidx, remain ); // grab what we can and depart river->ipt += remain; @@ -224,7 +224,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size ); if( river->msg_size > river->nbytes ) { // message bigger than app max size; grab huge buffer - //river->flags |= RF_DROP; + //river->flags |= RF_DROP; // uncomment to drop large messages if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "received message is huge (%d) reallocating buffer\n", river->msg_size ); old_accum = river->accum; // need to copy any bytes we snarfed getting the size, so hold river->nbytes = river->msg_size + 128; // buffer large enough with a bit of fudge room @@ -237,23 +237,23 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { } } - if( river->msg_size > (river->ipt + remain) ) { // need more than is left in buffer + if( river->msg_size > (river->ipt + remain) ) { // need more than is left in receive buffer if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain ); - if( (river->flags & RF_DROP) == 0 ) { - memcpy( &river->accum[river->ipt], buf+bidx, remain ); // buffer and go wait for more + if( (river->flags & RF_DROP) == 0 ) { // ok to keep this message; copy bytes + memcpy( &river->accum[river->ipt], buf+bidx, remain ); // grab what is in the rcv buffer and go wait for more } river->ipt += remain; remain = 0; } else { need = river->msg_size - river->ipt; // bytes from transport we need to have complete message if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d flgs=%02x\n", river->msg_size, need, remain, river->flags ); - if( (river->flags & RF_DROP) == 0 ) { + if( (river->flags & RF_DROP) == 0 ) { // keeping this message, copy and pass it on memcpy( &river->accum[river->ipt], buf+bidx, need ); // grab just what is needed (might be more) buf2mbuf( ctx, river->accum, river->nbytes, fd ); // build an RMR mbuf and queue river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // prevent huge size from persisting river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator } else { - if( !(river->flags & RF_NOTIFIED) ) { + if( !(river->flags & RF_NOTIFIED) ) { // not keeping huge messages; notify once per stream rmr_vlog( RMR_VL_WARN, "message larger than allocated buffer (%d) arrived on fd %d\n", river->nbytes, fd ); river->flags |= RF_NOTIFIED; }