// : vi ts=4 sw=4 noet:
/*
==================================================================================
- Copyright (c) 2019 Nokia
- Copyright (c) 2018-2020 AT&T Intellectual Property.
+ Copyright (c) 2020-2021 Nokia
+ Copyright (c) 2018-2021 AT&T Intellectual Property.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
/*
Mnemonic: mt_call_si static.c
Abstract: Static funcitons related to the multi-threaded call feature
- which are SI specific.
+ which are SI specific. The functions here also provide the
+ message construction functions which build a message that
+ might be split across multiple "datagrams" received from the
+ underlying transport.
Author: E. Scott Daniels
Date: 20 May 2019
#include <semaphore.h>
static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
- static int warned = 0;
+ static time_t last_warning = 0;
+ //static long dcount = 0;
+
chute_t* chute;
if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
rmr_free_msg( mbuf ); // drop if ring is full
- if( !warned ) {
- fprintf( stderr, "[WARN] rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
- warned++;
+ //dcount++;
+ ctx->dcount++;
+ ctx->acc_dcount++;
+ if( time( NULL ) > last_warning + 60 ) { // issue warning no more frequently than every 60 sec
+ rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; %d msgs dropped since last warning\n", ctx->dcount );
+ last_warning = time( NULL );
+ ctx->dcount = 0;
}
return;
}
-
+ ctx->acc_ecount++;
chute = &ctx->chutes[0];
sem_post( &chute->barrier ); // tickle the ring monitor
}
chute_t* chute;
unsigned int call_id; // the id assigned to the call generated message
- if( PARINOID_CHECKS ) { // PARINOID mode is slower; off by default
+ if( PARANOID_CHECKS ) { // PARANOID mode is slower; off by default
if( raw_msg == NULL || msg_size <= 0 ) {
return;
}
if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) {
mbuf->tp_buf = raw_msg;
mbuf->rts_fd = sender_fd;
+ if( msg_size > ctx->max_ibm + 1024 ) {
+ mbuf->flags |= MFL_HUGE; // prevent caching of oversized buffers
+ }
ref_tpbuf( mbuf, msg_size ); // point mbuf at bits in the datagram
hdr = mbuf->header; // convenience
}
}
}
+ } else {
+ free( raw_msg );
+ }
+}
+
+/*
+ Given a buffer, extract the size. We assume the buffer contains one of:
+ <int1><int2><mark>
+ <int1>
+
+ where <int1> is the size in native storage order (v1) and <int2>
+ is the size in network order. If <mark> is present then we assume
+ that <int2> is present and we use that after translating from net
+ byte order. If <mark> is not present, we use <int1>. This allows
+ old versions of RMR to continue to work with new versions that now
+ do the right thing with byte ordering.
+
+ If the receiver of a message is a backlevel RMR, and it uses RTS to
+ return a message, it will only update the old size, but when the
+ message is received back at a new RMR application it will appear that
+ the message came from a new instance. Therefore, we must compare
+ the old and new sizes and if they are different we must use the old
+ size assuming that this is the case.
+*/
+static inline uint32_t extract_mlen( unsigned char* buf ) {
+ uint32_t size; // adjusted (if needed) size for return
+ uint32_t osize; // old size
+ uint32_t* blen; // length in the buffer to extract
+
+ blen = (uint32_t *) buf;
+ if( *(buf + sizeof( int ) * 2 ) == TP_SZ_MARKER ) {
+ osize = *blen; // old size
+ size = ntohl( *(blen+1) ); // pick up the second integer
+ if( osize != size ) { // assume back level return to sender
+ size = osize; // MUST use old size
+ }
+ if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len converted from net order to: %d\n", size );
+ } else {
+ size = *blen; // old sender didn't encode size
+ if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len no conversion: %d\n", size );
}
+
+ return size;
}
/*
Return value indicates only that we handled the buffer and SI should continue
or that SI should terminate, so on error it's NOT wrong to return "ok".
-
-
- FUTURE: to do this better, SI needs to support a 'ready to read' callback
- which allows us to to the actual receive directly into our buffer.
*/
static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
uta_ctx_t* ctx;
river_t* river; // river associated with the fd passed in
+ unsigned char* old_accum; // old accumulator reference should we need to realloc
int bidx = 0; // transport buffer index
int remain; // bytes in transport buf that need to be moved
- int* mlen; // pointer to spot in buffer for conversion to int
+ int* mlen; // pointer to spot in buffer for conversion to int
int need; // bytes needed for something
int i;
- if( PARINOID_CHECKS ) { // PARINOID mode is slower; off by default
+ if( PARANOID_CHECKS ) { // PARANOID mode is slower; off by default
if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
return SI_RET_OK;
}
-
- if( fd >= ctx->nrivers || fd < 0 ) {
- if( DEBUG ) fprintf( stderr, "[DBUG] callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
- return SI_RET_OK;
- }
+ } else {
+ ctx = (uta_ctx_t *) vctx;
}
- if( buflen <= 0 ) {
+ if( buflen <= 0 || fd < 0 ) { // no buffer or invalid fd
return SI_RET_OK;
}
- river = &ctx->rivers[fd];
+ if( fd >= ctx->nrivers ) {
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
+ if( (river = (river_t *) rmr_sym_pull( ctx->river_hash, (uint64_t) fd )) == NULL ) {
+ river = (river_t *) malloc( sizeof( *river ) );
+ memset( river, 0, sizeof( *river ) );
+ rmr_sym_map( ctx->river_hash, (uint64_t) fd, river );
+ river->state = RS_NEW;
+ }
+ } else {
+ river = &ctx->rivers[fd]; // quick index for fd values < MAX_FD
+ }
+
if( river->state != RS_GOOD ) { // all states which aren't good require reset first
if( river->state == RS_NEW ) {
+ if( river->accum != NULL ) {
+ free( river->accum );
+ }
memset( river, 0, sizeof( *river ) );
- //river->nbytes = sizeof( char ) * (8 * 1024);
- river->nbytes = sizeof( char ) * ctx->max_ibm; // max inbound message size
+ river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // start with what user said would be the "normal" max inbound msg size
river->accum = (char *) malloc( river->nbytes );
river->ipt = 0;
} else {
}
river->state = RS_GOOD;
-
-/*
-fprintf( stderr, "\n>>>>> data callback for %d bytes from %d\n", buflen, fd );
-for( i = 0; i < 40; i++ ) {
- fprintf( stderr, "%02x ", (unsigned char) *(buf+i) );
-}
-fprintf( stderr, "\n" );
-*/
-
remain = buflen;
while( remain > 0 ) { // until we've done something with all bytes passed in
- if( DEBUG ) fprintf( stderr, "[DBUG] ====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
- // FIX ME: size in the message needs to be network byte order
- if( river->msg_size <= 0 ) { // don't have a size yet
+ if( river->msg_size <= 0 ) { // don't have a message length yet
// FIX ME: we need a frame indicator to ensure alignment
- need = sizeof( int ) - river->ipt; // what we need from transport buffer
- if( need > remain ) { // the whole size isn't there
- if( DEBUG > 1 ) fprintf( stderr, "[DBUG] need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
+ need = TP_SZFIELD_LEN - river->ipt; // what we need to compute the total message length
+ if( need > remain ) { // the whole message len information isn't in this transport buf
+ if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
memcpy( &river->accum[river->ipt], buf+bidx, remain ); // grab what we can and depart
river->ipt += remain;
- if( DEBUG > 1 ) fprintf( stderr, "[DBUG] data callback not enough bytes to compute size; need=%d have=%d\n", need, remain );
+ if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough bytes to compute size; need=%d have=%d\n", need, remain );
return SI_RET_OK;
}
river->ipt += need;
bidx += need;
remain -= need;
- river->msg_size = *((int *) river->accum);
- if( DEBUG > 1 ) {
- fprintf( stderr, "[DBUG] size from accumulator =%d\n", river->msg_size );
- if( river->msg_size > 500 ) {
- dump_40( river->accum, "msg size way too large accum:" );
+ river->msg_size = extract_mlen( river->accum );
+ if( DEBUG ) {
+ rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size );
+ if( DEBUG > 1 ) {
+ dump_40( river->accum, "from accumulator:" );
+ if( river->msg_size > 100 ) {
+ dump_40( river->accum + 50, "from rmr buf:" );
+ }
}
}
} else {
- river->msg_size = *((int *) &buf[bidx]); // snarf directly and copy with rest later
+ river->msg_size = extract_mlen( &buf[bidx] ); // pull from buf as it's all there; it will copy later
}
- if( DEBUG ) fprintf( stderr, "[DBUG] data callback setting msg size: %d\n", river->msg_size );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size );
+
+ if( river->msg_size > river->nbytes ) { // message bigger than app max size; grab huge buffer
+ //river->flags |= RF_DROP; // uncomment to drop large messages
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "received message is huge (%d) reallocating buffer\n", river->msg_size );
+ old_accum = river->accum; // need to copy any bytes we snarfed getting the size, so hold
+ river->nbytes = river->msg_size + 128; // buffer large enough with a bit of fudge room
+ river->accum = (char *) malloc( river->nbytes );
+ if( river->ipt > 0 ) {
+ memcpy( river->accum, old_accum, river->ipt + 1 ); // copy anything snarfed in getting the sie
+ }
- if( river->msg_size > river->nbytes ) { // message is too big, we will drop it
- river->flags |= RF_DROP;
+ free( old_accum );
}
}
- if( river->msg_size > (river->ipt + remain) ) { // need more than is left in buffer
- if( DEBUG > 1 ) fprintf( stderr, "[DBUG] data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
- if( (river->flags & RF_DROP) == 0 ) {
- memcpy( &river->accum[river->ipt], buf+bidx, remain ); // buffer and go wait for more
+ if( river->msg_size > (river->ipt + remain) ) { // need more than is left in receive buffer
+ if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
+ if( (river->flags & RF_DROP) == 0 ) { // ok to keep this message; copy bytes
+ memcpy( &river->accum[river->ipt], buf+bidx, remain ); // grab what is in the rcv buffer and go wait for more
}
river->ipt += remain;
remain = 0;
} else {
need = river->msg_size - river->ipt; // bytes from transport we need to have complete message
- if( DEBUG ) fprintf( stderr, "[DBUG] data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain );
- if( (river->flags & RF_DROP) == 0 ) {
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d flgs=%02x\n", river->msg_size, need, remain, river->flags );
+ if( (river->flags & RF_DROP) == 0 ) { // keeping this message, copy and pass it on
memcpy( &river->accum[river->ipt], buf+bidx, need ); // grab just what is needed (might be more)
- buf2mbuf( ctx, river->accum, river->msg_size, fd ); // build an RMR mbuf and queue
+ buf2mbuf( ctx, river->accum, river->nbytes, fd ); // build an RMR mbuf and queue
+ river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // prevent huge size from persisting
river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator
} else {
- if( !(river->flags & RF_NOTIFIED) ) {
- fprintf( stderr, "[WRN] message larger than max (%d) have arrived on fd %d\n", river->nbytes, fd );
+ if( !(river->flags & RF_NOTIFIED) ) { // not keeping huge messages; notify once per stream
+ rmr_vlog( RMR_VL_WARN, "message larger than allocated buffer (%d) arrived on fd %d\n", river->nbytes, fd );
river->flags |= RF_NOTIFIED;
}
}
river->msg_size = -1;
river->ipt = 0;
bidx += need;
- remain -= need;
+ remain -= need;
}
}
- if( DEBUG >2 ) fprintf( stderr, "[DBUG] ##### data callback finished\n" );
+ if( DEBUG >2 ) rmr_vlog( RMR_VL_DEBUG, "##### data callback finished\n" );
+ return SI_RET_OK;
+}
+
+/*
+ Callback driven on a disconnect notification. We will attempt to find the related
+ endpoint via the fd2ep hash maintained in the context. If we find it, then we
+ remove it from the hash, and mark the endpoint as closed so that the next attempt
+ to send forces a reconnect attempt.
+
+ Future: put the ep on a queue to automatically attempt to reconnect.
+*/
+static int mt_disc_cb( void* vctx, int fd ) {
+ uta_ctx_t* ctx;
+ endpoint_t* ep;
+ river_t* river = NULL;
+
+ if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+ return SI_RET_OK;
+ }
+
+ if( fd < ctx->nrivers && fd >= 0 ) {
+ river = &ctx->rivers[fd];
+ } else {
+ if( fd > 0 ) {
+ river = rmr_sym_pull( ctx->river_hash, (uint64_t) fd );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "river reset on disconnect: fd=%d\n", fd );
+ }
+ }
+
+ if( river != NULL ) {
+ river->state = RS_NEW; // if one connects here later; ensure it's new
+ if( river->accum != NULL ) {
+ free( river->accum );
+ river->accum = NULL;
+ river->state = RS_NEW; // force realloc if the fd is used again
+ }
+ }
+
+ ep = fd2ep_del( ctx, fd ); // find ep and remove the fd from the hash
+ if( ep != NULL ) {
+ pthread_mutex_lock( &ep->gate ); // wise to lock this
+ ep->open = FALSE;
+ ep->nn_sock = -1;
+ pthread_mutex_unlock( &ep->gate );
+ }
+
return SI_RET_OK;
}
uta_ctx_t* ctx;
if( (ctx = (uta_ctx_t*) vctx) == NULL ) {
- fprintf( stderr, "[CRI], unable to start mt-receive: ctx was nil\n" );
+ rmr_vlog( RMR_VL_CRIT, "unable to start mt-receive: ctx was nil\n" );
return NULL;
}
- if( DEBUG ) fprintf( stderr, "[DBUG] mt_receive: registering SI95 data callback and waiting\n" );
+ rmr_vlog( RMR_VL_INFO, "mt_receive: pid=%lld registering SI95 data callback and waiting\n", (long long) pthread_self() );
+
SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx ); // our callback called only for "cooked" (tcp) data
+ SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx ); // our callback for handling disconnects
+
SIwait( ctx->si_ctx );
return NULL; // keep the compiler happy though never can be reached as SI wait doesn't return