X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fsi%2Fsrc%2Fmt_call_si_static.c;h=c3483d816f5b533c5cb571325122737421dec979;hb=HEAD;hp=73d776aedc1d795fb047f960741da6ee82059b4b;hpb=11838bcf76f3614384459cb56e2ce80dea788cef;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/si/src/mt_call_si_static.c b/src/rmr/si/src/mt_call_si_static.c index 73d776a..c3483d8 100644 --- a/src/rmr/si/src/mt_call_si_static.c +++ b/src/rmr/si/src/mt_call_si_static.c @@ -44,6 +44,7 @@ static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) { rmr_free_msg( mbuf ); // drop if ring is full //dcount++; ctx->dcount++; + ctx->acc_dcount++; if( time( NULL ) > last_warning + 60 ) { // issue warning no more frequently than every 60 sec rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; %d msgs dropped since last warning\n", ctx->dcount ); last_warning = time( NULL ); @@ -52,7 +53,7 @@ static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) { return; } - + ctx->acc_ecount++; chute = &ctx->chutes[0]; sem_post( &chute->barrier ); // tickle the ring monitor } @@ -76,6 +77,17 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd } } + // cross-check that header length indicators are not longer than actual message + uta_mhdr_t* hdr_check = (uta_mhdr_t*)(((char *) raw_msg) + TP_HDR_LEN); + uint32_t header_len=(uint32_t)RMR_HDR_LEN(hdr_check); + uint32_t payload_len=(uint32_t)ntohl(hdr_check->plen); + if (header_len+TP_HDR_LEN+payload_len> msg_size) { + rmr_vlog( RMR_VL_ERR, "Message dropped because %u + %u + %u > %u\n", header_len, payload_len, TP_HDR_LEN, msg_size); + free (raw_msg); + return; + } + + if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) { mbuf->tp_buf = raw_msg; mbuf->rts_fd = sender_fd; @@ -198,8 +210,13 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { river->accum = (char *) malloc( river->nbytes ); river->ipt = 0; } else { - // future -- sync to next marker - river->ipt = 0; // insert point + if( river->state == RS_RESET ) { + // future -- reset not implemented + return SI_RET_OK; + } else { + // future -- sync to next marker + river->ipt = 0; // insert point + } } } @@ -237,6 +254,12 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { } else { river->msg_size = extract_mlen( &buf[bidx] ); // pull from buf as it's all there; it will copy later } + + if( river->msg_size < 0) { // addressing RIC-989 + river->state=RS_RESET; + return SI_RET_OK; + } + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size ); if( river->msg_size > river->nbytes ) { // message bigger than app max size; grab huge buffer