X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fsi%2Fsrc%2Fmt_call_si_static.c;h=c3483d816f5b533c5cb571325122737421dec979;hb=HEAD;hp=ec86a820973955f880c7c9e802d6985cda765d00;hpb=fd4477a9698a46ce5755d614b663c18ceadf43c4;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/si/src/mt_call_si_static.c b/src/rmr/si/src/mt_call_si_static.c index ec86a82..c3483d8 100644 --- a/src/rmr/si/src/mt_call_si_static.c +++ b/src/rmr/si/src/mt_call_si_static.c @@ -1,8 +1,8 @@ - // : vi ts=4 sw=4 noet2 +// : vi ts=4 sw=4 noet: /* ================================================================================== - Copyright (c) 2020 Nokia - Copyright (c) 2018-2020 AT&T Intellectual Property. + Copyright (c) 2020-2021 Nokia + Copyright (c) 2018-2021 AT&T Intellectual Property. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -36,22 +36,24 @@ static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) { static time_t last_warning = 0; - static long dcount = 0; + //static long dcount = 0; chute_t* chute; if( ! uta_ring_insert( ctx->mring, mbuf ) ) { rmr_free_msg( mbuf ); // drop if ring is full - dcount++; + //dcount++; + ctx->dcount++; + ctx->acc_dcount++; if( time( NULL ) > last_warning + 60 ) { // issue warning no more frequently than every 60 sec - rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; %ld msgs dropped since last warning\n", dcount ); + rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; %d msgs dropped since last warning\n", ctx->dcount ); last_warning = time( NULL ); - dcount = 0; + ctx->dcount = 0; } return; } - + ctx->acc_ecount++; chute = &ctx->chutes[0]; sem_post( &chute->barrier ); // tickle the ring monitor } @@ -75,6 +77,17 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd } } + // cross-check that header length indicators are not longer than actual message + uta_mhdr_t* hdr_check = (uta_mhdr_t*)(((char *) raw_msg) + TP_HDR_LEN); + uint32_t header_len=(uint32_t)RMR_HDR_LEN(hdr_check); + uint32_t payload_len=(uint32_t)ntohl(hdr_check->plen); + if (header_len+TP_HDR_LEN+payload_len> msg_size) { + rmr_vlog( RMR_VL_ERR, "Message dropped because %u + %u + %u > %u\n", header_len, payload_len, TP_HDR_LEN, msg_size); + free (raw_msg); + return; + } + + if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) { mbuf->tp_buf = raw_msg; mbuf->rts_fd = sender_fd; @@ -197,8 +210,13 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { river->accum = (char *) malloc( river->nbytes ); river->ipt = 0; } else { - // future -- sync to next marker - river->ipt = 0; // insert point + if( river->state == RS_RESET ) { + // future -- reset not implemented + return SI_RET_OK; + } else { + // future -- sync to next marker + river->ipt = 0; // insert point + } } } @@ -236,6 +254,12 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { } else { river->msg_size = extract_mlen( &buf[bidx] ); // pull from buf as it's all there; it will copy later } + + if( river->msg_size < 0) { // addressing RIC-989 + river->state=RS_RESET; + return SI_RET_OK; + } + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size ); if( river->msg_size > river->nbytes ) { // message bigger than app max size; grab huge buffer