X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fsi%2Fsrc%2Fmt_call_si_static.c;h=1b31bf41df2af017ccb0403de661fe3d0b652430;hb=refs%2Fchanges%2F21%2F5521%2F1;hp=2d422bb2a6892787eeae1ae64fea9523f5be0f2e;hpb=c113b0836f3ebd58911c30de1636a707174efe55;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/si/src/mt_call_si_static.c b/src/rmr/si/src/mt_call_si_static.c index 2d422bb..1b31bf4 100644 --- a/src/rmr/si/src/mt_call_si_static.c +++ b/src/rmr/si/src/mt_call_si_static.c @@ -110,14 +110,26 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd byte order. If is not present, we use . This allows old versions of RMR to continue to work with new versions that now do the right thing with byte ordering. + + If the receiver of a message is a backlevel RMR, and it uses RTS to + return a message, it will only update the old size, but when the + message is received back at a new RMR application it will appear that + the message came from a new instance. Therefore, we must compare + the old and new sizes and if they are different we must use the old + size assuming that this is the case. */ static inline uint32_t extract_mlen( unsigned char* buf ) { uint32_t size; // adjusted (if needed) size for return + uint32_t osize; // old size uint32_t* blen; // length in the buffer to extract blen = (uint32_t *) buf; if( *(buf + sizeof( int ) * 2 ) == TP_SZ_MARKER ) { + osize = *blen; // old size size = ntohl( *(blen+1) ); // pick up the second integer + if( osize != size ) { // assume back level return to sender + size = osize; // MUST use old size + } if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len converted from net order to: %d\n", size ); } else { size = *blen; // old sender didn't encode size @@ -166,7 +178,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { if( river->state != RS_GOOD ) { // all states which aren't good require reset first if( river->state == RS_NEW ) { memset( river, 0, sizeof( *river ) ); - river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // max inbound message size + river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // start with what user said would be the "normal" max inbound msg size river->accum = (char *) malloc( river->nbytes ); river->ipt = 0; } else { @@ -180,10 +192,10 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { while( remain > 0 ) { // until we've done something with all bytes passed in if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain ); - if( river->msg_size <= 0 ) { // don't have a size yet + if( river->msg_size <= 0 ) { // don't have a message length yet // FIX ME: we need a frame indicator to ensure alignment - need = TP_SZFIELD_LEN - river->ipt; // what we need to compute length - if( need > remain ) { // the whole size isn't there + need = TP_SZFIELD_LEN - river->ipt; // what we need to compute the total message length + if( need > remain ) { // the whole message len information isn't in this transport buf if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt ); memcpy( &river->accum[river->ipt], buf+bidx, remain ); // grab what we can and depart river->ipt += remain; @@ -212,7 +224,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size ); if( river->msg_size > river->nbytes ) { // message bigger than app max size; grab huge buffer - //river->flags |= RF_DROP; + //river->flags |= RF_DROP; // uncomment to drop large messages if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "received message is huge (%d) reallocating buffer\n", river->msg_size ); old_accum = river->accum; // need to copy any bytes we snarfed getting the size, so hold river->nbytes = river->msg_size + 128; // buffer large enough with a bit of fudge room @@ -225,24 +237,24 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { } } - if( river->msg_size > (river->ipt + remain) ) { // need more than is left in buffer + if( river->msg_size > (river->ipt + remain) ) { // need more than is left in receive buffer if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain ); - if( (river->flags & RF_DROP) == 0 ) { - memcpy( &river->accum[river->ipt], buf+bidx, remain ); // buffer and go wait for more + if( (river->flags & RF_DROP) == 0 ) { // ok to keep this message; copy bytes + memcpy( &river->accum[river->ipt], buf+bidx, remain ); // grab what is in the rcv buffer and go wait for more } river->ipt += remain; remain = 0; } else { need = river->msg_size - river->ipt; // bytes from transport we need to have complete message - if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain ); - if( (river->flags & RF_DROP) == 0 ) { + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d flgs=%02x\n", river->msg_size, need, remain, river->flags ); + if( (river->flags & RF_DROP) == 0 ) { // keeping this message, copy and pass it on memcpy( &river->accum[river->ipt], buf+bidx, need ); // grab just what is needed (might be more) buf2mbuf( ctx, river->accum, river->nbytes, fd ); // build an RMR mbuf and queue river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // prevent huge size from persisting river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator } else { - if( !(river->flags & RF_NOTIFIED) ) { - rmr_vlog( RMR_VL_WARN, "message larger than max (%d) have arrived on fd %d\n", river->nbytes, fd ); + if( !(river->flags & RF_NOTIFIED) ) { // not keeping huge messages; notify once per stream + rmr_vlog( RMR_VL_WARN, "message larger than allocated buffer (%d) arrived on fd %d\n", river->nbytes, fd ); river->flags |= RF_NOTIFIED; } }