if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
rmr_free_msg( mbuf ); // drop if ring is full
if( !warned ) {
- fprintf( stderr, "[WARN] rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
+ rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
warned++;
}
call ref to point to all of the various bits and set real len etc,
then we queue it. Raw_msg is expected to include the transport goo
placed in front of the RMR header and payload.
-
- done -- FIX ME?? can we eliminate the buffer copy here?
*/
static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd ) {
rmr_mbuf_t* mbuf;
}
}
-/*
- if( (mbuf = (rmr_mbuf_t *) malloc( sizeof( *mbuf ))) != NULL ) { // alloc mbuf and point at various bits of payload
- memset( mbuf, 0, sizeof( *mbuf ) );
- mbuf->tp_buf = raw_msg;
- mbuf->ring = ctx->zcb_mring;
-*/
if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) {
mbuf->tp_buf = raw_msg;
mbuf->rts_fd = sender_fd;
- // eliminated :) memcpy( mbuf->tp_buf, river->accum + offset, river->msg_size );
-
ref_tpbuf( mbuf, msg_size ); // point mbuf at bits in the datagram
hdr = mbuf->header; // convenience
if( hdr->flags & HFL_CALL_MSG ) { // call generated message; ignore call-id etc and queue
int need; // bytes needed for something
int i;
- // for speed these checks should be enabled only in debug mode and assume we always get a good context
- if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
- return SI_RET_OK;
- }
-
- if( fd >= ctx->nrivers || fd < 0 ) {
- if( DEBUG ) fprintf( stderr, "[DBUG] callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
- return SI_RET_OK;
+ if( PARINOID_CHECKS ) { // PARINOID mode is slower; off by default
+ if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+ return SI_RET_OK;
+ }
+
+ if( fd >= ctx->nrivers || fd < 0 ) {
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
+ return SI_RET_OK;
+ }
}
- // -------- end debug checks -----------------
-
if( buflen <= 0 ) {
return SI_RET_OK;
}
/*
fprintf( stderr, "\n>>>>> data callback for %d bytes from %d\n", buflen, fd );
for( i = 0; i < 40; i++ ) {
-fprintf( stderr, "%02x ", (unsigned char) *(buf+i) );
+ fprintf( stderr, "%02x ", (unsigned char) *(buf+i) );
}
fprintf( stderr, "\n" );
*/
remain = buflen;
while( remain > 0 ) { // until we've done something with all bytes passed in
- if( DEBUG ) fprintf( stderr, "[DBUG] ====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
// FIX ME: size in the message needs to be network byte order
if( river->msg_size <= 0 ) { // don't have a size yet
// FIX ME: we need a frame indicator to ensure alignment
need = sizeof( int ) - river->ipt; // what we need from transport buffer
if( need > remain ) { // the whole size isn't there
- if( DEBUG > 1 ) fprintf( stderr, "[DBUG] need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
+ if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
memcpy( &river->accum[river->ipt], buf+bidx, remain ); // grab what we can and depart
river->ipt += remain;
- if( DEBUG > 1 ) fprintf( stderr, "[DBUG] data callback not enough bytes to compute size; need=%d have=%d\n", need, remain );
+ if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough bytes to compute size; need=%d have=%d\n", need, remain );
return SI_RET_OK;
}
remain -= need;
river->msg_size = *((int *) river->accum);
if( DEBUG > 1 ) {
- fprintf( stderr, "[DBUG] size from accumulator =%d\n", river->msg_size );
+ rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size );
if( river->msg_size > 500 ) {
dump_40( river->accum, "msg size way too large accum:" );
}
} else {
river->msg_size = *((int *) &buf[bidx]); // snarf directly and copy with rest later
}
- if( DEBUG ) fprintf( stderr, "[DBUG] data callback setting msg size: %d\n", river->msg_size );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size );
+
+ if( river->msg_size > river->nbytes ) { // message is too big, we will drop it
+ river->flags |= RF_DROP;
+ }
}
if( river->msg_size > (river->ipt + remain) ) { // need more than is left in buffer
- if( DEBUG > 1 ) fprintf( stderr, "[DBUG] data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
- memcpy( &river->accum[river->ipt], buf+bidx, remain ); // buffer and go wait for more
+ if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
+ if( (river->flags & RF_DROP) == 0 ) {
+ memcpy( &river->accum[river->ipt], buf+bidx, remain ); // buffer and go wait for more
+ }
river->ipt += remain;
remain = 0;
} else {
need = river->msg_size - river->ipt; // bytes from transport we need to have complete message
- if( DEBUG ) fprintf( stderr, "[DBUG] data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain );
- memcpy( &river->accum[river->ipt], buf+bidx, need ); // grab just what is needed (might be more)
- buf2mbuf( ctx, river->accum, river->msg_size, fd ); // build an RMR mbuf and queue
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain );
+ if( (river->flags & RF_DROP) == 0 ) {
+ memcpy( &river->accum[river->ipt], buf+bidx, need ); // grab just what is needed (might be more)
+ buf2mbuf( ctx, river->accum, river->msg_size, fd ); // build an RMR mbuf and queue
+ river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator
+ } else {
+ if( !(river->flags & RF_NOTIFIED) ) {
+ rmr_vlog( RMR_VL_WARN, "message larger than max (%d) have arrived on fd %d\n", river->nbytes, fd );
+ river->flags |= RF_NOTIFIED;
+ }
+ }
- river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator
river->msg_size = -1;
river->ipt = 0;
bidx += need;
}
}
- if( DEBUG >2 ) fprintf( stderr, "[DBUG] ##### data callback finished\n" );
+ if( DEBUG >2 ) rmr_vlog( RMR_VL_DEBUG, "##### data callback finished\n" );
+ return SI_RET_OK;
+}
+
+/*
+ Callback driven on a disconnect notification. We will attempt to find the related
+ endpoint via the fd2ep hash maintained in the context. If we find it, then we
+ remove it from the hash, and mark the endpoint as closed so that the next attempt
+ to send forces a reconnect attempt.
+
+ Future: put the ep on a queue to automatically attempt to reconnect.
+*/
+static int mt_disc_cb( void* vctx, int fd ) {
+ uta_ctx_t* ctx;
+ endpoint_t* ep;
+
+ if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+ return SI_RET_OK;
+ }
+
+ ep = fd2ep_del( ctx, fd ); // find ep and remote the fd from the hash
+ if( ep ) {
+ ep->open = FALSE;
+ ep->nn_sock = -1;
+ }
+
return SI_RET_OK;
}
This is expected to execute in a separate thread. It is responsible for
_all_ receives and queues them on the appropriate ring, or chute.
It does this by registering the callback function above with the SI world
- and then caling SIwait() to drive the callback when data has arrived.
+ and then calling SIwait() to drive the callback when data has arrived.
The "state" of the message is checked which determines where the message
uta_ctx_t* ctx;
if( (ctx = (uta_ctx_t*) vctx) == NULL ) {
- fprintf( stderr, "[CRI], unable to start mt-receive: ctx was nil\n" );
+ rmr_vlog( RMR_VL_CRIT, "unable to start mt-receive: ctx was nil\n" );
return NULL;
}
- if( DEBUG ) fprintf( stderr, "[DBUG] mt_receive: registering SI95 data callback and waiting\n" );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mt_receive: registering SI95 data callback and waiting\n" );
+
SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx ); // our callback called only for "cooked" (tcp) data
+ SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx ); // our callback for handling disconnects
+
SIwait( ctx->si_ctx );
return NULL; // keep the compiler happy though never can be reached as SI wait doesn't return