// : vi ts=4 sw=4 noet:
/*
==================================================================================
- Copyright (c) 2019 Nokia
- Copyright (c) 2018-2020 AT&T Intellectual Property.
+ Copyright (c) 2020-2021 Nokia
+ Copyright (c) 2018-2021 AT&T Intellectual Property.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
/*
Mnemonic: mt_call_si static.c
Abstract: Static funcitons related to the multi-threaded call feature
- which are SI specific.
+ which are SI specific. The functions here also provide the
+ message construction functions which build a message that
+ might be split across multiple "datagrams" received from the
+ underlying transport.
Author: E. Scott Daniels
Date: 20 May 2019
#include <semaphore.h>
static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
- static int warned = 0;
+ static time_t last_warning = 0;
+ //static long dcount = 0;
+
chute_t* chute;
if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
rmr_free_msg( mbuf ); // drop if ring is full
- if( !warned ) {
- fprintf( stderr, "[WARN] rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
- warned++;
+ //dcount++;
+ ctx->dcount++;
+ ctx->acc_dcount++;
+ if( time( NULL ) > last_warning + 60 ) { // issue warning no more frequently than every 60 sec
+ rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; %d msgs dropped since last warning\n", ctx->dcount );
+ last_warning = time( NULL );
+ ctx->dcount = 0;
}
return;
}
-
+ ctx->acc_ecount++;
chute = &ctx->chutes[0];
sem_post( &chute->barrier ); // tickle the ring monitor
}
chute_t* chute;
unsigned int call_id; // the id assigned to the call generated message
- if( PARINOID_CHECKS ) { // PARINOID mode is slower; off by default
+ if( PARANOID_CHECKS ) { // PARANOID mode is slower; off by default
if( raw_msg == NULL || msg_size <= 0 ) {
return;
}
if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) {
mbuf->tp_buf = raw_msg;
mbuf->rts_fd = sender_fd;
+ if( msg_size > ctx->max_ibm + 1024 ) {
+ mbuf->flags |= MFL_HUGE; // prevent caching of oversized buffers
+ }
ref_tpbuf( mbuf, msg_size ); // point mbuf at bits in the datagram
hdr = mbuf->header; // convenience
}
}
}
+ } else {
+ free( raw_msg );
}
}
+/*
+ Given a buffer, extract the size. We assume the buffer contains one of:
+ <int1><int2><mark>
+ <int1>
+
+ where <int1> is the size in native storage order (v1) and <int2>
+ is the size in network order. If <mark> is present then we assume
+ that <int2> is present and we use that after translating from net
+ byte order. If <mark> is not present, we use <int1>. This allows
+ old versions of RMR to continue to work with new versions that now
+ do the right thing with byte ordering.
+
+ If the receiver of a message is a backlevel RMR, and it uses RTS to
+ return a message, it will only update the old size, but when the
+ message is received back at a new RMR application it will appear that
+ the message came from a new instance. Therefore, we must compare
+ the old and new sizes and if they are different we must use the old
+ size assuming that this is the case.
+*/
+static inline uint32_t extract_mlen( unsigned char* buf ) {
+ uint32_t size; // adjusted (if needed) size for return
+ uint32_t osize; // old size
+ uint32_t* blen; // length in the buffer to extract
+
+ blen = (uint32_t *) buf;
+ if( *(buf + sizeof( int ) * 2 ) == TP_SZ_MARKER ) {
+ osize = *blen; // old size
+ size = ntohl( *(blen+1) ); // pick up the second integer
+ if( osize != size ) { // assume back level return to sender
+ size = osize; // MUST use old size
+ }
+ if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len converted from net order to: %d\n", size );
+ } else {
+ size = *blen; // old sender didn't encode size
+ if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len no conversion: %d\n", size );
+ }
+
+ return size;
+}
+
/*
This is the callback invoked when tcp data is received. It adds the data
to the buffer for the connection and if a complete message is received
Return value indicates only that we handled the buffer and SI should continue
or that SI should terminate, so on error it's NOT wrong to return "ok".
-
-
- FUTURE: to do this better, SI needs to support a 'ready to read' callback
- which allows us to to the actual receive directly into our buffer.
*/
static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
uta_ctx_t* ctx;
river_t* river; // river associated with the fd passed in
+ unsigned char* old_accum; // old accumulator reference should we need to realloc
int bidx = 0; // transport buffer index
int remain; // bytes in transport buf that need to be moved
- int* mlen; // pointer to spot in buffer for conversion to int
+ int* mlen; // pointer to spot in buffer for conversion to int
int need; // bytes needed for something
int i;
- if( PARINOID_CHECKS ) { // PARINOID mode is slower; off by default
+ if( PARANOID_CHECKS ) { // PARANOID mode is slower; off by default
if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
return SI_RET_OK;
}
-
- if( fd >= ctx->nrivers || fd < 0 ) {
- if( DEBUG ) fprintf( stderr, "[DBUG] callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
- return SI_RET_OK;
- }
+ } else {
+ ctx = (uta_ctx_t *) vctx;
}
- if( buflen <= 0 ) {
+ if( buflen <= 0 || fd < 0 ) { // no buffer or invalid fd
return SI_RET_OK;
}
- river = &ctx->rivers[fd];
+ if( fd >= ctx->nrivers ) {
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
+ if( (river = (river_t *) rmr_sym_pull( ctx->river_hash, (uint64_t) fd )) == NULL ) {
+ river = (river_t *) malloc( sizeof( *river ) );
+ memset( river, 0, sizeof( *river ) );
+ rmr_sym_map( ctx->river_hash, (uint64_t) fd, river );
+ river->state = RS_NEW;
+ }
+ } else {
+ river = &ctx->rivers[fd]; // quick index for fd values < MAX_FD
+ }
+
if( river->state != RS_GOOD ) { // all states which aren't good require reset first
if( river->state == RS_NEW ) {
+ if( river->accum != NULL ) {
+ free( river->accum );
+ }
memset( river, 0, sizeof( *river ) );
- //river->nbytes = sizeof( char ) * (8 * 1024);
- river->nbytes = sizeof( char ) * ctx->max_ibm; // max inbound message size
+ river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // start with what user said would be the "normal" max inbound msg size
river->accum = (char *) malloc( river->nbytes );
river->ipt = 0;
} else {
}
river->state = RS_GOOD;
-
-/*
-fprintf( stderr, "\n>>>>> data callback for %d bytes from %d\n", buflen, fd );
-for( i = 0; i < 40; i++ ) {
- fprintf( stderr, "%02x ", (unsigned char) *(buf+i) );
-}
-fprintf( stderr, "\n" );
-*/
-
remain = buflen;
while( remain > 0 ) { // until we've done something with all bytes passed in
- if( DEBUG ) fprintf( stderr, "[DBUG] ====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
- // FIX ME: size in the message needs to be network byte order
- if( river->msg_size <= 0 ) { // don't have a size yet
+ if( river->msg_size <= 0 ) { // don't have a message length yet
// FIX ME: we need a frame indicator to ensure alignment
- need = sizeof( int ) - river->ipt; // what we need from transport buffer
- if( need > remain ) { // the whole size isn't there
- if( DEBUG > 1 ) fprintf( stderr, "[DBUG] need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
+ need = TP_SZFIELD_LEN - river->ipt; // what we need to compute the total message length
+ if( need > remain ) { // the whole message len information isn't in this transport buf
+ if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
memcpy( &river->accum[river->ipt], buf+bidx, remain ); // grab what we can and depart
river->ipt += remain;
- if( DEBUG > 1 ) fprintf( stderr, "[DBUG] data callback not enough bytes to compute size; need=%d have=%d\n", need, remain );
+ if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough bytes to compute size; need=%d have=%d\n", need, remain );
return SI_RET_OK;
}
river->ipt += need;
bidx += need;
remain -= need;
- river->msg_size = *((int *) river->accum);
- if( DEBUG > 1 ) {
- fprintf( stderr, "[DBUG] size from accumulator =%d\n", river->msg_size );
- if( river->msg_size > 500 ) {
- dump_40( river->accum, "msg size way too large accum:" );
+ river->msg_size = extract_mlen( river->accum );
+ if( DEBUG ) {
+ rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size );
+ if( DEBUG > 1 ) {
+ dump_40( river->accum, "from accumulator:" );
+ if( river->msg_size > 100 ) {
+ dump_40( river->accum + 50, "from rmr buf:" );
+ }
}
}
} else {
- river->msg_size = *((int *) &buf[bidx]); // snarf directly and copy with rest later
+ river->msg_size = extract_mlen( &buf[bidx] ); // pull from buf as it's all there; it will copy later
+ }
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size );
+
+ if( river->msg_size > river->nbytes ) { // message bigger than app max size; grab huge buffer
+ //river->flags |= RF_DROP; // uncomment to drop large messages
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "received message is huge (%d) reallocating buffer\n", river->msg_size );
+ old_accum = river->accum; // need to copy any bytes we snarfed getting the size, so hold
+ river->nbytes = river->msg_size + 128; // buffer large enough with a bit of fudge room
+ river->accum = (char *) malloc( river->nbytes );
+ if( river->ipt > 0 ) {
+ memcpy( river->accum, old_accum, river->ipt + 1 ); // copy anything snarfed in getting the sie
+ }
+
+ free( old_accum );
}
- if( DEBUG ) fprintf( stderr, "[DBUG] data callback setting msg size: %d\n", river->msg_size );
}
- if( river->msg_size > (river->ipt + remain) ) { // need more than is left in buffer
- if( DEBUG > 1 ) fprintf( stderr, "[DBUG] data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
- memcpy( &river->accum[river->ipt], buf+bidx, remain ); // buffer and go wait for more
+ if( river->msg_size > (river->ipt + remain) ) { // need more than is left in receive buffer
+ if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
+ if( (river->flags & RF_DROP) == 0 ) { // ok to keep this message; copy bytes
+ memcpy( &river->accum[river->ipt], buf+bidx, remain ); // grab what is in the rcv buffer and go wait for more
+ }
river->ipt += remain;
remain = 0;
} else {
need = river->msg_size - river->ipt; // bytes from transport we need to have complete message
- if( DEBUG ) fprintf( stderr, "[DBUG] data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain );
- memcpy( &river->accum[river->ipt], buf+bidx, need ); // grab just what is needed (might be more)
- buf2mbuf( ctx, river->accum, river->msg_size, fd ); // build an RMR mbuf and queue
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d flgs=%02x\n", river->msg_size, need, remain, river->flags );
+ if( (river->flags & RF_DROP) == 0 ) { // keeping this message, copy and pass it on
+ memcpy( &river->accum[river->ipt], buf+bidx, need ); // grab just what is needed (might be more)
+ buf2mbuf( ctx, river->accum, river->nbytes, fd ); // build an RMR mbuf and queue
+ river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // prevent huge size from persisting
+ river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator
+ } else {
+ if( !(river->flags & RF_NOTIFIED) ) { // not keeping huge messages; notify once per stream
+ rmr_vlog( RMR_VL_WARN, "message larger than allocated buffer (%d) arrived on fd %d\n", river->nbytes, fd );
+ river->flags |= RF_NOTIFIED;
+ }
+ }
- river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator
river->msg_size = -1;
river->ipt = 0;
bidx += need;
- remain -= need;
+ remain -= need;
+ }
+ }
+
+ if( DEBUG >2 ) rmr_vlog( RMR_VL_DEBUG, "##### data callback finished\n" );
+ return SI_RET_OK;
+}
+
+/*
+ Callback driven on a disconnect notification. We will attempt to find the related
+ endpoint via the fd2ep hash maintained in the context. If we find it, then we
+ remove it from the hash, and mark the endpoint as closed so that the next attempt
+ to send forces a reconnect attempt.
+
+ Future: put the ep on a queue to automatically attempt to reconnect.
+*/
+static int mt_disc_cb( void* vctx, int fd ) {
+ uta_ctx_t* ctx;
+ endpoint_t* ep;
+ river_t* river = NULL;
+
+ if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+ return SI_RET_OK;
+ }
+
+ if( fd < ctx->nrivers && fd >= 0 ) {
+ river = &ctx->rivers[fd];
+ } else {
+ if( fd > 0 ) {
+ river = rmr_sym_pull( ctx->river_hash, (uint64_t) fd );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "river reset on disconnect: fd=%d\n", fd );
}
}
- if( DEBUG >2 ) fprintf( stderr, "[DBUG] ##### data callback finished\n" );
+ if( river != NULL ) {
+ river->state = RS_NEW; // if one connects here later; ensure it's new
+ if( river->accum != NULL ) {
+ free( river->accum );
+ river->accum = NULL;
+ river->state = RS_NEW; // force realloc if the fd is used again
+ }
+ }
+
+ ep = fd2ep_del( ctx, fd ); // find ep and remove the fd from the hash
+ if( ep != NULL ) {
+ pthread_mutex_lock( &ep->gate ); // wise to lock this
+ ep->open = FALSE;
+ ep->nn_sock = -1;
+ pthread_mutex_unlock( &ep->gate );
+ }
+
return SI_RET_OK;
}
uta_ctx_t* ctx;
if( (ctx = (uta_ctx_t*) vctx) == NULL ) {
- fprintf( stderr, "[CRI], unable to start mt-receive: ctx was nil\n" );
+ rmr_vlog( RMR_VL_CRIT, "unable to start mt-receive: ctx was nil\n" );
return NULL;
}
- if( DEBUG ) fprintf( stderr, "[DBUG] mt_receive: registering SI95 data callback and waiting\n" );
+ rmr_vlog( RMR_VL_INFO, "mt_receive: pid=%lld registering SI95 data callback and waiting\n", (long long) pthread_self() );
+
SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx ); // our callback called only for "cooked" (tcp) data
+ SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx ); // our callback for handling disconnects
+
SIwait( ctx->si_ctx );
return NULL; // keep the compiler happy though never can be reached as SI wait doesn't return