if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
rmr_free_msg( mbuf ); // drop if ring is full
if( !warned ) {
- rmr_vlog( RMR_VL_WARN, "rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
+ rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
warned++;
}
if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
return SI_RET_OK;
}
-
+
if( fd >= ctx->nrivers || fd < 0 ) {
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
return SI_RET_OK;
if( river->state == RS_NEW ) {
memset( river, 0, sizeof( *river ) );
//river->nbytes = sizeof( char ) * (8 * 1024);
- river->nbytes = sizeof( char ) * ctx->max_ibm; // max inbound message size
+ river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // max inbound message size
river->accum = (char *) malloc( river->nbytes );
river->ipt = 0;
} else {
}
river->state = RS_GOOD;
-
-/*
-fprintf( stderr, "\n>>>>> data callback for %d bytes from %d\n", buflen, fd );
-for( i = 0; i < 40; i++ ) {
- fprintf( stderr, "%02x ", (unsigned char) *(buf+i) );
-}
-fprintf( stderr, "\n" );
-*/
-
remain = buflen;
while( remain > 0 ) { // until we've done something with all bytes passed in
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
- // FIX ME: size in the message needs to be network byte order
+ // FIX ME: size in the message needs to be network byte order
if( river->msg_size <= 0 ) { // don't have a size yet
// FIX ME: we need a frame indicator to ensure alignment
need = sizeof( int ) - river->ipt; // what we need from transport buffer
river->ipt += need;
bidx += need;
remain -= need;
- river->msg_size = *((int *) river->accum);
- if( DEBUG > 1 ) {
+ river->msg_size = *((int *) river->accum);
+ if( DEBUG ) {
rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size );
- if( river->msg_size > 500 ) {
- dump_40( river->accum, "msg size way too large accum:" );
+ if( DEBUG > 1 ) {
+ dump_40( river->accum, "from accumulator:" );
+ if( river->msg_size > 100 ) {
+ dump_40( river->accum + 50, "from rmr buf:" );
+ }
}
}
} else {
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain );
if( (river->flags & RF_DROP) == 0 ) {
memcpy( &river->accum[river->ipt], buf+bidx, need ); // grab just what is needed (might be more)
- buf2mbuf( ctx, river->accum, river->msg_size, fd ); // build an RMR mbuf and queue
+ buf2mbuf( ctx, river->accum, river->nbytes, fd ); // build an RMR mbuf and queue
river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator
} else {
- if( !(river->flags & RF_NOTIFIED) ) {
+ if( !(river->flags & RF_NOTIFIED) ) {
rmr_vlog( RMR_VL_WARN, "message larger than max (%d) have arrived on fd %d\n", river->nbytes, fd );
river->flags |= RF_NOTIFIED;
}
river->msg_size = -1;
river->ipt = 0;
bidx += need;
- remain -= need;
+ remain -= need;
}
}
return SI_RET_OK;
}
+/*
+ Callback driven on a disconnect notification. We will attempt to find the related
+ endpoint via the fd2ep hash maintained in the context. If we find it, then we
+ remove it from the hash, and mark the endpoint as closed so that the next attempt
+ to send forces a reconnect attempt.
+
+ Future: put the ep on a queue to automatically attempt to reconnect.
+*/
+static int mt_disc_cb( void* vctx, int fd ) {
+ uta_ctx_t* ctx;
+ endpoint_t* ep;
+
+ if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+ return SI_RET_OK;
+ }
+
+ ep = fd2ep_del( ctx, fd ); // find ep and remove the fd from the hash
+ if( ep != NULL ) {
+ pthread_mutex_lock( &ep->gate ); // wise to lock this
+ ep->open = FALSE;
+ ep->nn_sock = -1;
+ pthread_mutex_unlock( &ep->gate );
+ }
+
+ return SI_RET_OK;
+}
+
/*
This is expected to execute in a separate thread. It is responsible for
}
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mt_receive: registering SI95 data callback and waiting\n" );
+
SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx ); // our callback called only for "cooked" (tcp) data
+ SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx ); // our callback for handling disconnects
+
SIwait( ctx->si_ctx );
return NULL; // keep the compiler happy though never can be reached as SI wait doesn't return