X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fsi%2Fsrc%2Fmt_call_si_static.c;h=c3483d816f5b533c5cb571325122737421dec979;hb=HEAD;hp=2d422bb2a6892787eeae1ae64fea9523f5be0f2e;hpb=c113b0836f3ebd58911c30de1636a707174efe55;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/si/src/mt_call_si_static.c b/src/rmr/si/src/mt_call_si_static.c index 2d422bb..c3483d8 100644 --- a/src/rmr/si/src/mt_call_si_static.c +++ b/src/rmr/si/src/mt_call_si_static.c @@ -1,8 +1,8 @@ // : vi ts=4 sw=4 noet: /* ================================================================================== - Copyright (c) 2020 Nokia - Copyright (c) 2018-2020 AT&T Intellectual Property. + Copyright (c) 2020-2021 Nokia + Copyright (c) 2018-2021 AT&T Intellectual Property. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -35,19 +35,25 @@ #include static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) { - static int warned = 0; + static time_t last_warning = 0; + //static long dcount = 0; + chute_t* chute; if( ! uta_ring_insert( ctx->mring, mbuf ) ) { rmr_free_msg( mbuf ); // drop if ring is full - if( !warned ) { - rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; messages dropping\n" ); - warned++; + //dcount++; + ctx->dcount++; + ctx->acc_dcount++; + if( time( NULL ) > last_warning + 60 ) { // issue warning no more frequently than every 60 sec + rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; %d msgs dropped since last warning\n", ctx->dcount ); + last_warning = time( NULL ); + ctx->dcount = 0; } return; } - + ctx->acc_ecount++; chute = &ctx->chutes[0]; sem_post( &chute->barrier ); // tickle the ring monitor } @@ -71,6 +77,17 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd } } + // cross-check that header length indicators are not longer than actual message + uta_mhdr_t* hdr_check = (uta_mhdr_t*)(((char *) raw_msg) + TP_HDR_LEN); + uint32_t header_len=(uint32_t)RMR_HDR_LEN(hdr_check); + uint32_t payload_len=(uint32_t)ntohl(hdr_check->plen); + if (header_len+TP_HDR_LEN+payload_len> msg_size) { + rmr_vlog( RMR_VL_ERR, "Message dropped because %u + %u + %u > %u\n", header_len, payload_len, TP_HDR_LEN, msg_size); + free (raw_msg); + return; + } + + if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) { mbuf->tp_buf = raw_msg; mbuf->rts_fd = sender_fd; @@ -96,6 +113,8 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd } } } + } else { + free( raw_msg ); } } @@ -110,14 +129,26 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd byte order. If is not present, we use . This allows old versions of RMR to continue to work with new versions that now do the right thing with byte ordering. + + If the receiver of a message is a backlevel RMR, and it uses RTS to + return a message, it will only update the old size, but when the + message is received back at a new RMR application it will appear that + the message came from a new instance. Therefore, we must compare + the old and new sizes and if they are different we must use the old + size assuming that this is the case. */ static inline uint32_t extract_mlen( unsigned char* buf ) { uint32_t size; // adjusted (if needed) size for return + uint32_t osize; // old size uint32_t* blen; // length in the buffer to extract blen = (uint32_t *) buf; if( *(buf + sizeof( int ) * 2 ) == TP_SZ_MARKER ) { + osize = *blen; // old size size = ntohl( *(blen+1) ); // pick up the second integer + if( osize != size ) { // assume back level return to sender + size = osize; // MUST use old size + } if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len converted from net order to: %d\n", size ); } else { size = *blen; // old sender didn't encode size @@ -141,7 +172,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { unsigned char* old_accum; // old accumulator reference should we need to realloc int bidx = 0; // transport buffer index int remain; // bytes in transport buf that need to be moved - int* mlen; // pointer to spot in buffer for conversion to int + int* mlen; // pointer to spot in buffer for conversion to int int need; // bytes needed for something int i; @@ -149,29 +180,43 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { if( (ctx = (uta_ctx_t *) vctx) == NULL ) { return SI_RET_OK; } - - if( fd >= ctx->nrivers || fd < 0 ) { - if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers ); - return SI_RET_OK; - } } else { ctx = (uta_ctx_t *) vctx; } - if( buflen <= 0 ) { + if( buflen <= 0 || fd < 0 ) { // no buffer or invalid fd return SI_RET_OK; } - river = &ctx->rivers[fd]; + if( fd >= ctx->nrivers ) { + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers ); + if( (river = (river_t *) rmr_sym_pull( ctx->river_hash, (uint64_t) fd )) == NULL ) { + river = (river_t *) malloc( sizeof( *river ) ); + memset( river, 0, sizeof( *river ) ); + rmr_sym_map( ctx->river_hash, (uint64_t) fd, river ); + river->state = RS_NEW; + } + } else { + river = &ctx->rivers[fd]; // quick index for fd values < MAX_FD + } + if( river->state != RS_GOOD ) { // all states which aren't good require reset first if( river->state == RS_NEW ) { + if( river->accum != NULL ) { + free( river->accum ); + } memset( river, 0, sizeof( *river ) ); - river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // max inbound message size + river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // start with what user said would be the "normal" max inbound msg size river->accum = (char *) malloc( river->nbytes ); river->ipt = 0; } else { - // future -- sync to next marker - river->ipt = 0; // insert point + if( river->state == RS_RESET ) { + // future -- reset not implemented + return SI_RET_OK; + } else { + // future -- sync to next marker + river->ipt = 0; // insert point + } } } @@ -180,10 +225,10 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { while( remain > 0 ) { // until we've done something with all bytes passed in if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain ); - if( river->msg_size <= 0 ) { // don't have a size yet + if( river->msg_size <= 0 ) { // don't have a message length yet // FIX ME: we need a frame indicator to ensure alignment - need = TP_SZFIELD_LEN - river->ipt; // what we need to compute length - if( need > remain ) { // the whole size isn't there + need = TP_SZFIELD_LEN - river->ipt; // what we need to compute the total message length + if( need > remain ) { // the whole message len information isn't in this transport buf if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt ); memcpy( &river->accum[river->ipt], buf+bidx, remain ); // grab what we can and depart river->ipt += remain; @@ -209,10 +254,16 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { } else { river->msg_size = extract_mlen( &buf[bidx] ); // pull from buf as it's all there; it will copy later } + + if( river->msg_size < 0) { // addressing RIC-989 + river->state=RS_RESET; + return SI_RET_OK; + } + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size ); if( river->msg_size > river->nbytes ) { // message bigger than app max size; grab huge buffer - //river->flags |= RF_DROP; + //river->flags |= RF_DROP; // uncomment to drop large messages if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "received message is huge (%d) reallocating buffer\n", river->msg_size ); old_accum = river->accum; // need to copy any bytes we snarfed getting the size, so hold river->nbytes = river->msg_size + 128; // buffer large enough with a bit of fudge room @@ -225,24 +276,24 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { } } - if( river->msg_size > (river->ipt + remain) ) { // need more than is left in buffer + if( river->msg_size > (river->ipt + remain) ) { // need more than is left in receive buffer if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain ); - if( (river->flags & RF_DROP) == 0 ) { - memcpy( &river->accum[river->ipt], buf+bidx, remain ); // buffer and go wait for more + if( (river->flags & RF_DROP) == 0 ) { // ok to keep this message; copy bytes + memcpy( &river->accum[river->ipt], buf+bidx, remain ); // grab what is in the rcv buffer and go wait for more } river->ipt += remain; remain = 0; } else { need = river->msg_size - river->ipt; // bytes from transport we need to have complete message - if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain ); - if( (river->flags & RF_DROP) == 0 ) { + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d flgs=%02x\n", river->msg_size, need, remain, river->flags ); + if( (river->flags & RF_DROP) == 0 ) { // keeping this message, copy and pass it on memcpy( &river->accum[river->ipt], buf+bidx, need ); // grab just what is needed (might be more) buf2mbuf( ctx, river->accum, river->nbytes, fd ); // build an RMR mbuf and queue river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // prevent huge size from persisting river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator } else { - if( !(river->flags & RF_NOTIFIED) ) { - rmr_vlog( RMR_VL_WARN, "message larger than max (%d) have arrived on fd %d\n", river->nbytes, fd ); + if( !(river->flags & RF_NOTIFIED) ) { // not keeping huge messages; notify once per stream + rmr_vlog( RMR_VL_WARN, "message larger than allocated buffer (%d) arrived on fd %d\n", river->nbytes, fd ); river->flags |= RF_NOTIFIED; } } @@ -269,17 +320,36 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { static int mt_disc_cb( void* vctx, int fd ) { uta_ctx_t* ctx; endpoint_t* ep; + river_t* river = NULL; if( (ctx = (uta_ctx_t *) vctx) == NULL ) { return SI_RET_OK; } + if( fd < ctx->nrivers && fd >= 0 ) { + river = &ctx->rivers[fd]; + } else { + if( fd > 0 ) { + river = rmr_sym_pull( ctx->river_hash, (uint64_t) fd ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "river reset on disconnect: fd=%d\n", fd ); + } + } + + if( river != NULL ) { + river->state = RS_NEW; // if one connects here later; ensure it's new + if( river->accum != NULL ) { + free( river->accum ); + river->accum = NULL; + river->state = RS_NEW; // force realloc if the fd is used again + } + } + ep = fd2ep_del( ctx, fd ); // find ep and remove the fd from the hash if( ep != NULL ) { - pthread_mutex_lock( &ep->gate ); // wise to lock this + pthread_mutex_lock( &ep->gate ); // wise to lock this ep->open = FALSE; ep->nn_sock = -1; - pthread_mutex_unlock( &ep->gate ); + pthread_mutex_unlock( &ep->gate ); } return SI_RET_OK; @@ -314,7 +384,7 @@ static void* mt_receive( void* vctx ) { return NULL; } - if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mt_receive: registering SI95 data callback and waiting\n" ); + rmr_vlog( RMR_VL_INFO, "mt_receive: pid=%lld registering SI95 data callback and waiting\n", (long long) pthread_self() ); SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx ); // our callback called only for "cooked" (tcp) data SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx ); // our callback for handling disconnects