if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
return SI_RET_OK;
}
-
+
if( fd >= ctx->nrivers || fd < 0 ) {
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
return SI_RET_OK;
if( river->state == RS_NEW ) {
memset( river, 0, sizeof( *river ) );
//river->nbytes = sizeof( char ) * (8 * 1024);
- river->nbytes = sizeof( char ) * ctx->max_ibm; // max inbound message size
+ river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // max inbound message size
river->accum = (char *) malloc( river->nbytes );
river->ipt = 0;
} else {
while( remain > 0 ) { // until we've done something with all bytes passed in
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
- // FIX ME: size in the message needs to be network byte order
+ // FIX ME: size in the message needs to be network byte order
if( river->msg_size <= 0 ) { // don't have a size yet
// FIX ME: we need a frame indicator to ensure alignment
need = sizeof( int ) - river->ipt; // what we need from transport buffer
river->ipt += need;
bidx += need;
remain -= need;
- river->msg_size = *((int *) river->accum);
- if( DEBUG > 1 ) {
+ river->msg_size = *((int *) river->accum);
+ if( DEBUG ) {
rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size );
- if( river->msg_size > 500 ) {
- dump_40( river->accum, "msg size way too large accum:" );
+ if( DEBUG > 1 ) {
+ dump_40( river->accum, "from accumulator:" );
+ if( river->msg_size > 100 ) {
+ dump_40( river->accum + 50, "from rmr buf:" );
+ }
}
}
} else {
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain );
if( (river->flags & RF_DROP) == 0 ) {
memcpy( &river->accum[river->ipt], buf+bidx, need ); // grab just what is needed (might be more)
- buf2mbuf( ctx, river->accum, river->msg_size, fd ); // build an RMR mbuf and queue
+ buf2mbuf( ctx, river->accum, river->nbytes, fd ); // build an RMR mbuf and queue
river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator
} else {
- if( !(river->flags & RF_NOTIFIED) ) {
+ if( !(river->flags & RF_NOTIFIED) ) {
rmr_vlog( RMR_VL_WARN, "message larger than max (%d) have arrived on fd %d\n", river->nbytes, fd );
river->flags |= RF_NOTIFIED;
}
river->msg_size = -1;
river->ipt = 0;
bidx += need;
- remain -= need;
+ remain -= need;
}
}
}
/*
- Callback driven on a disconnect notification. We will attempt to find the related
- endpoint via the fd2ep hash maintained in the context. If we find it, then we
+ Callback driven on a disconnect notification. We will attempt to find the related
+ endpoint via the fd2ep hash maintained in the context. If we find it, then we
remove it from the hash, and mark the endpoint as closed so that the next attempt
to send forces a reconnect attempt.
return SI_RET_OK;
}
- ep = fd2ep_del( ctx, fd ); // find ep and remote the fd from the hash
- if( ep ) {
+ ep = fd2ep_del( ctx, fd ); // find ep and remove the fd from the hash
+ if( ep != NULL ) {
+ pthread_mutex_lock( &ep->gate ); // wise to lock this
ep->open = FALSE;
ep->nn_sock = -1;
+ pthread_mutex_unlock( &ep->gate );
}
return SI_RET_OK;