X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fsi%2Fsrc%2Fmt_call_si_static.c;h=d18a8e02288377f4ef8003bcc2c82f110f604581;hb=116d0f5b6c5b1a396fed835c714a5d568c264cc5;hp=862b55441c4ba6a410125cd82a5e3a5437448345;hpb=6d112571b27574ae857da7cb8dc8758ffee4ff60;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/si/src/mt_call_si_static.c b/src/rmr/si/src/mt_call_si_static.c index 862b554..d18a8e0 100644 --- a/src/rmr/si/src/mt_call_si_static.c +++ b/src/rmr/si/src/mt_call_si_static.c @@ -118,7 +118,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { if( (ctx = (uta_ctx_t *) vctx) == NULL ) { return SI_RET_OK; } - + if( fd >= ctx->nrivers || fd < 0 ) { if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers ); return SI_RET_OK; @@ -148,7 +148,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { while( remain > 0 ) { // until we've done something with all bytes passed in if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain ); - // FIX ME: size in the message needs to be network byte order + // FIX ME: size in the message needs to be network byte order if( river->msg_size <= 0 ) { // don't have a size yet // FIX ME: we need a frame indicator to ensure alignment need = sizeof( int ) - river->ipt; // what we need from transport buffer @@ -165,11 +165,14 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { river->ipt += need; bidx += need; remain -= need; - river->msg_size = *((int *) river->accum); - if( DEBUG > 1 ) { + river->msg_size = *((int *) river->accum); + if( DEBUG ) { rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size ); - if( river->msg_size > 500 ) { - dump_40( river->accum, "msg size way too large accum:" ); + if( DEBUG > 1 ) { + dump_40( river->accum, "from accumulator:" ); + if( river->msg_size > 100 ) { + dump_40( river->accum + 50, "from rmr buf:" ); + } } } } else { @@ -197,7 +200,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { buf2mbuf( ctx, river->accum, river->nbytes, fd ); // build an RMR mbuf and queue river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator } else { - if( !(river->flags & RF_NOTIFIED) ) { + if( !(river->flags & RF_NOTIFIED) ) { rmr_vlog( RMR_VL_WARN, "message larger than max (%d) have arrived on fd %d\n", river->nbytes, fd ); river->flags |= RF_NOTIFIED; } @@ -206,7 +209,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { river->msg_size = -1; river->ipt = 0; bidx += need; - remain -= need; + remain -= need; } } @@ -215,8 +218,8 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { } /* - Callback driven on a disconnect notification. We will attempt to find the related - endpoint via the fd2ep hash maintained in the context. If we find it, then we + Callback driven on a disconnect notification. We will attempt to find the related + endpoint via the fd2ep hash maintained in the context. If we find it, then we remove it from the hash, and mark the endpoint as closed so that the next attempt to send forces a reconnect attempt. @@ -230,10 +233,12 @@ static int mt_disc_cb( void* vctx, int fd ) { return SI_RET_OK; } - ep = fd2ep_del( ctx, fd ); // find ep and remote the fd from the hash - if( ep ) { + ep = fd2ep_del( ctx, fd ); // find ep and remove the fd from the hash + if( ep != NULL ) { + pthread_mutex_lock( &ep->gate ); // wise to lock this ep->open = FALSE; ep->nn_sock = -1; + pthread_mutex_unlock( &ep->gate ); } return SI_RET_OK;