// : vi ts=4 sw=4 noet:
/*
==================================================================================
- Copyright (c) 2020 Nokia
- Copyright (c) 2018-2020 AT&T Intellectual Property.
+ Copyright (c) 2020-2021 Nokia
+ Copyright (c) 2018-2021 AT&T Intellectual Property.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
#include <semaphore.h>
static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
- static int warned = 0;
+ static time_t last_warning = 0;
+ //static long dcount = 0;
+
chute_t* chute;
if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
rmr_free_msg( mbuf ); // drop if ring is full
- if( !warned ) {
- rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
- warned++;
+ //dcount++;
+ ctx->dcount++;
+ ctx->acc_dcount++;
+ if( time( NULL ) > last_warning + 60 ) { // issue warning no more frequently than every 60 sec
+ rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; %d msgs dropped since last warning\n", ctx->dcount );
+ last_warning = time( NULL );
+ ctx->dcount = 0;
}
return;
}
-
+ ctx->acc_ecount++;
chute = &ctx->chutes[0];
sem_post( &chute->barrier ); // tickle the ring monitor
}
}
}
+ // cross-check that header length indicators are not longer than actual message
+ uta_mhdr_t* hdr_check = (uta_mhdr_t*)(((char *) raw_msg) + TP_HDR_LEN);
+ uint32_t header_len=(uint32_t)RMR_HDR_LEN(hdr_check);
+ uint32_t payload_len=(uint32_t)ntohl(hdr_check->plen);
+ if (header_len+TP_HDR_LEN+payload_len> msg_size) {
+ rmr_vlog( RMR_VL_ERR, "Message dropped because %u + %u + %u > %u\n", header_len, payload_len, TP_HDR_LEN, msg_size);
+ free (raw_msg);
+ return;
+ }
+
+
if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) {
mbuf->tp_buf = raw_msg;
mbuf->rts_fd = sender_fd;
}
}
}
+ } else {
+ free( raw_msg );
}
}
unsigned char* old_accum; // old accumulator reference should we need to realloc
int bidx = 0; // transport buffer index
int remain; // bytes in transport buf that need to be moved
- int* mlen; // pointer to spot in buffer for conversion to int
+ int* mlen; // pointer to spot in buffer for conversion to int
int need; // bytes needed for something
int i;
if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
return SI_RET_OK;
}
-
- if( fd >= ctx->nrivers || fd < 0 ) {
- if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
- return SI_RET_OK;
- }
} else {
ctx = (uta_ctx_t *) vctx;
}
- if( buflen <= 0 ) {
+ if( buflen <= 0 || fd < 0 ) { // no buffer or invalid fd
return SI_RET_OK;
}
- river = &ctx->rivers[fd];
+ if( fd >= ctx->nrivers ) {
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
+ if( (river = (river_t *) rmr_sym_pull( ctx->river_hash, (uint64_t) fd )) == NULL ) {
+ river = (river_t *) malloc( sizeof( *river ) );
+ memset( river, 0, sizeof( *river ) );
+ rmr_sym_map( ctx->river_hash, (uint64_t) fd, river );
+ river->state = RS_NEW;
+ }
+ } else {
+ river = &ctx->rivers[fd]; // quick index for fd values < MAX_FD
+ }
+
if( river->state != RS_GOOD ) { // all states which aren't good require reset first
if( river->state == RS_NEW ) {
+ if( river->accum != NULL ) {
+ free( river->accum );
+ }
memset( river, 0, sizeof( *river ) );
river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // start with what user said would be the "normal" max inbound msg size
river->accum = (char *) malloc( river->nbytes );
river->ipt = 0;
} else {
- // future -- sync to next marker
- river->ipt = 0; // insert point
+ if( river->state == RS_RESET ) {
+ // future -- reset not implemented
+ return SI_RET_OK;
+ } else {
+ // future -- sync to next marker
+ river->ipt = 0; // insert point
+ }
}
}
} else {
river->msg_size = extract_mlen( &buf[bidx] ); // pull from buf as it's all there; it will copy later
}
+
+ if( river->msg_size < 0) { // addressing RIC-989
+ river->state=RS_RESET;
+ return SI_RET_OK;
+ }
+
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size );
if( river->msg_size > river->nbytes ) { // message bigger than app max size; grab huge buffer
static int mt_disc_cb( void* vctx, int fd ) {
uta_ctx_t* ctx;
endpoint_t* ep;
+ river_t* river = NULL;
if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
return SI_RET_OK;
}
+ if( fd < ctx->nrivers && fd >= 0 ) {
+ river = &ctx->rivers[fd];
+ } else {
+ if( fd > 0 ) {
+ river = rmr_sym_pull( ctx->river_hash, (uint64_t) fd );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "river reset on disconnect: fd=%d\n", fd );
+ }
+ }
+
+ if( river != NULL ) {
+ river->state = RS_NEW; // if one connects here later; ensure it's new
+ if( river->accum != NULL ) {
+ free( river->accum );
+ river->accum = NULL;
+ river->state = RS_NEW; // force realloc if the fd is used again
+ }
+ }
+
ep = fd2ep_del( ctx, fd ); // find ep and remove the fd from the hash
if( ep != NULL ) {
- pthread_mutex_lock( &ep->gate ); // wise to lock this
+ pthread_mutex_lock( &ep->gate ); // wise to lock this
ep->open = FALSE;
ep->nn_sock = -1;
- pthread_mutex_unlock( &ep->gate );
+ pthread_mutex_unlock( &ep->gate );
}
return SI_RET_OK;
return NULL;
}
- if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mt_receive: registering SI95 data callback and waiting\n" );
+ rmr_vlog( RMR_VL_INFO, "mt_receive: pid=%lld registering SI95 data callback and waiting\n", (long long) pthread_self() );
SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx ); // our callback called only for "cooked" (tcp) data
SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx ); // our callback for handling disconnects