rmr_free_msg( mbuf ); // drop if ring is full
//dcount++;
ctx->dcount++;
+ ctx->acc_dcount++;
if( time( NULL ) > last_warning + 60 ) { // issue warning no more frequently than every 60 sec
rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; %d msgs dropped since last warning\n", ctx->dcount );
last_warning = time( NULL );
return;
}
-
+ ctx->acc_ecount++;
chute = &ctx->chutes[0];
sem_post( &chute->barrier ); // tickle the ring monitor
}
}
}
+ // cross-check that header length indicators are not longer than actual message
+ uta_mhdr_t* hdr_check = (uta_mhdr_t*)(((char *) raw_msg) + TP_HDR_LEN);
+ uint32_t header_len=(uint32_t)RMR_HDR_LEN(hdr_check);
+ uint32_t payload_len=(uint32_t)ntohl(hdr_check->plen);
+ if (header_len+TP_HDR_LEN+payload_len> msg_size) {
+ rmr_vlog( RMR_VL_ERR, "Message dropped because %u + %u + %u > %u\n", header_len, payload_len, TP_HDR_LEN, msg_size);
+ free (raw_msg);
+ return;
+ }
+
+
if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) {
mbuf->tp_buf = raw_msg;
mbuf->rts_fd = sender_fd;
river->accum = (char *) malloc( river->nbytes );
river->ipt = 0;
} else {
- // future -- sync to next marker
- river->ipt = 0; // insert point
+ if( river->state == RS_RESET ) {
+ // future -- reset not implemented
+ return SI_RET_OK;
+ } else {
+ // future -- sync to next marker
+ river->ipt = 0; // insert point
+ }
}
}
} else {
river->msg_size = extract_mlen( &buf[bidx] ); // pull from buf as it's all there; it will copy later
}
+
+ if( river->msg_size < 0) { // addressing RIC-989
+ river->state=RS_RESET;
+ return SI_RET_OK;
+ }
+
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size );
if( river->msg_size > river->nbytes ) { // message bigger than app max size; grab huge buffer