if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
rmr_free_msg( mbuf ); // drop if ring is full
if( !warned ) {
- fprintf( stderr, "[WARN] rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
+ rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
warned++;
}
if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
return SI_RET_OK;
}
-
+
if( fd >= ctx->nrivers || fd < 0 ) {
- if( DEBUG ) fprintf( stderr, "[DBUG] callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
return SI_RET_OK;
}
}
if( river->state == RS_NEW ) {
memset( river, 0, sizeof( *river ) );
//river->nbytes = sizeof( char ) * (8 * 1024);
- river->nbytes = sizeof( char ) * ctx->max_ibm; // max inbound message size
+ river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // max inbound message size
river->accum = (char *) malloc( river->nbytes );
river->ipt = 0;
} else {
}
river->state = RS_GOOD;
-
-/*
-fprintf( stderr, "\n>>>>> data callback for %d bytes from %d\n", buflen, fd );
-for( i = 0; i < 40; i++ ) {
- fprintf( stderr, "%02x ", (unsigned char) *(buf+i) );
-}
-fprintf( stderr, "\n" );
-*/
-
remain = buflen;
while( remain > 0 ) { // until we've done something with all bytes passed in
- if( DEBUG ) fprintf( stderr, "[DBUG] ====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
- // FIX ME: size in the message needs to be network byte order
+ // FIX ME: size in the message needs to be network byte order
if( river->msg_size <= 0 ) { // don't have a size yet
// FIX ME: we need a frame indicator to ensure alignment
need = sizeof( int ) - river->ipt; // what we need from transport buffer
if( need > remain ) { // the whole size isn't there
- if( DEBUG > 1 ) fprintf( stderr, "[DBUG] need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
+ if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
memcpy( &river->accum[river->ipt], buf+bidx, remain ); // grab what we can and depart
river->ipt += remain;
- if( DEBUG > 1 ) fprintf( stderr, "[DBUG] data callback not enough bytes to compute size; need=%d have=%d\n", need, remain );
+ if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough bytes to compute size; need=%d have=%d\n", need, remain );
return SI_RET_OK;
}
river->ipt += need;
bidx += need;
remain -= need;
- river->msg_size = *((int *) river->accum);
- if( DEBUG > 1 ) {
- fprintf( stderr, "[DBUG] size from accumulator =%d\n", river->msg_size );
- if( river->msg_size > 500 ) {
- dump_40( river->accum, "msg size way too large accum:" );
+ river->msg_size = *((int *) river->accum);
+ if( DEBUG ) {
+ rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size );
+ if( DEBUG > 1 ) {
+ dump_40( river->accum, "from accumulator:" );
+ if( river->msg_size > 100 ) {
+ dump_40( river->accum + 50, "from rmr buf:" );
+ }
}
}
} else {
river->msg_size = *((int *) &buf[bidx]); // snarf directly and copy with rest later
}
- if( DEBUG ) fprintf( stderr, "[DBUG] data callback setting msg size: %d\n", river->msg_size );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size );
if( river->msg_size > river->nbytes ) { // message is too big, we will drop it
river->flags |= RF_DROP;
}
if( river->msg_size > (river->ipt + remain) ) { // need more than is left in buffer
- if( DEBUG > 1 ) fprintf( stderr, "[DBUG] data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
+ if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
if( (river->flags & RF_DROP) == 0 ) {
memcpy( &river->accum[river->ipt], buf+bidx, remain ); // buffer and go wait for more
}
remain = 0;
} else {
need = river->msg_size - river->ipt; // bytes from transport we need to have complete message
- if( DEBUG ) fprintf( stderr, "[DBUG] data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain );
if( (river->flags & RF_DROP) == 0 ) {
memcpy( &river->accum[river->ipt], buf+bidx, need ); // grab just what is needed (might be more)
- buf2mbuf( ctx, river->accum, river->msg_size, fd ); // build an RMR mbuf and queue
+ buf2mbuf( ctx, river->accum, river->nbytes, fd ); // build an RMR mbuf and queue
river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator
} else {
- if( !(river->flags & RF_NOTIFIED) ) {
- fprintf( stderr, "[WRN] message larger than max (%d) have arrived on fd %d\n", river->nbytes, fd );
+ if( !(river->flags & RF_NOTIFIED) ) {
+ rmr_vlog( RMR_VL_WARN, "message larger than max (%d) have arrived on fd %d\n", river->nbytes, fd );
river->flags |= RF_NOTIFIED;
}
}
river->msg_size = -1;
river->ipt = 0;
bidx += need;
- remain -= need;
+ remain -= need;
}
}
- if( DEBUG >2 ) fprintf( stderr, "[DBUG] ##### data callback finished\n" );
+ if( DEBUG >2 ) rmr_vlog( RMR_VL_DEBUG, "##### data callback finished\n" );
+ return SI_RET_OK;
+}
+
+/*
+ Callback driven on a disconnect notification. We will attempt to find the related
+ endpoint via the fd2ep hash maintained in the context. If we find it, then we
+ remove it from the hash, and mark the endpoint as closed so that the next attempt
+ to send forces a reconnect attempt.
+
+ Future: put the ep on a queue to automatically attempt to reconnect.
+*/
+static int mt_disc_cb( void* vctx, int fd ) {
+ uta_ctx_t* ctx;
+ endpoint_t* ep;
+
+ if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+ return SI_RET_OK;
+ }
+
+ ep = fd2ep_del( ctx, fd ); // find ep and remove the fd from the hash
+ if( ep != NULL ) {
+ pthread_mutex_lock( &ep->gate ); // wise to lock this
+ ep->open = FALSE;
+ ep->nn_sock = -1;
+ pthread_mutex_unlock( &ep->gate );
+ }
+
return SI_RET_OK;
}
uta_ctx_t* ctx;
if( (ctx = (uta_ctx_t*) vctx) == NULL ) {
- fprintf( stderr, "[CRI], unable to start mt-receive: ctx was nil\n" );
+ rmr_vlog( RMR_VL_CRIT, "unable to start mt-receive: ctx was nil\n" );
return NULL;
}
- if( DEBUG ) fprintf( stderr, "[DBUG] mt_receive: registering SI95 data callback and waiting\n" );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mt_receive: registering SI95 data callback and waiting\n" );
+
SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx ); // our callback called only for "cooked" (tcp) data
+ SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx ); // our callback for handling disconnects
+
SIwait( ctx->si_ctx );
return NULL; // keep the compiler happy though never can be reached as SI wait doesn't return