X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=src%2Frmr%2Fsi%2Fsrc%2Fmt_call_si_static.c;h=d18a8e02288377f4ef8003bcc2c82f110f604581;hb=refs%2Ftags%2F3.3.0;hp=7a8b666841926165d9e2d8edfe8d0388f4546427;hpb=c1658934f329e02a704dd5ec94b38dff293b09ee;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/si/src/mt_call_si_static.c b/src/rmr/si/src/mt_call_si_static.c index 7a8b666..d18a8e0 100644 --- a/src/rmr/si/src/mt_call_si_static.c +++ b/src/rmr/si/src/mt_call_si_static.c @@ -118,7 +118,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { if( (ctx = (uta_ctx_t *) vctx) == NULL ) { return SI_RET_OK; } - + if( fd >= ctx->nrivers || fd < 0 ) { if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers ); return SI_RET_OK; @@ -134,7 +134,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { if( river->state == RS_NEW ) { memset( river, 0, sizeof( *river ) ); //river->nbytes = sizeof( char ) * (8 * 1024); - river->nbytes = sizeof( char ) * ctx->max_ibm; // max inbound message size + river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // max inbound message size river->accum = (char *) malloc( river->nbytes ); river->ipt = 0; } else { @@ -144,20 +144,11 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { } river->state = RS_GOOD; - -/* -fprintf( stderr, "\n>>>>> data callback for %d bytes from %d\n", buflen, fd ); -for( i = 0; i < 40; i++ ) { - fprintf( stderr, "%02x ", (unsigned char) *(buf+i) ); -} -fprintf( stderr, "\n" ); -*/ - remain = buflen; while( remain > 0 ) { // until we've done something with all bytes passed in if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain ); - // FIX ME: size in the message needs to be network byte order + // FIX ME: size in the message needs to be network byte order if( river->msg_size <= 0 ) { // don't have a size yet // FIX ME: we need a frame indicator to ensure alignment need = sizeof( int ) - river->ipt; // what we need from transport buffer @@ -174,11 +165,14 @@ fprintf( stderr, "\n" ); river->ipt += need; bidx += need; remain -= need; - river->msg_size = *((int *) river->accum); - if( DEBUG > 1 ) { + river->msg_size = *((int *) river->accum); + if( DEBUG ) { rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size ); - if( river->msg_size > 500 ) { - dump_40( river->accum, "msg size way too large accum:" ); + if( DEBUG > 1 ) { + dump_40( river->accum, "from accumulator:" ); + if( river->msg_size > 100 ) { + dump_40( river->accum + 50, "from rmr buf:" ); + } } } } else { @@ -203,10 +197,10 @@ fprintf( stderr, "\n" ); if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain ); if( (river->flags & RF_DROP) == 0 ) { memcpy( &river->accum[river->ipt], buf+bidx, need ); // grab just what is needed (might be more) - buf2mbuf( ctx, river->accum, river->msg_size, fd ); // build an RMR mbuf and queue + buf2mbuf( ctx, river->accum, river->nbytes, fd ); // build an RMR mbuf and queue river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator } else { - if( !(river->flags & RF_NOTIFIED) ) { + if( !(river->flags & RF_NOTIFIED) ) { rmr_vlog( RMR_VL_WARN, "message larger than max (%d) have arrived on fd %d\n", river->nbytes, fd ); river->flags |= RF_NOTIFIED; } @@ -215,7 +209,7 @@ fprintf( stderr, "\n" ); river->msg_size = -1; river->ipt = 0; bidx += need; - remain -= need; + remain -= need; } } @@ -224,8 +218,8 @@ fprintf( stderr, "\n" ); } /* - Callback driven on a disconnect notification. We will attempt to find the related - endpoint via the fd2ep hash maintained in the context. If we find it, then we + Callback driven on a disconnect notification. We will attempt to find the related + endpoint via the fd2ep hash maintained in the context. If we find it, then we remove it from the hash, and mark the endpoint as closed so that the next attempt to send forces a reconnect attempt. @@ -239,10 +233,12 @@ static int mt_disc_cb( void* vctx, int fd ) { return SI_RET_OK; } - ep = fd2ep_del( ctx, fd ); // find ep and remote the fd from the hash - if( ep ) { + ep = fd2ep_del( ctx, fd ); // find ep and remove the fd from the hash + if( ep != NULL ) { + pthread_mutex_lock( &ep->gate ); // wise to lock this ep->open = FALSE; ep->nn_sock = -1; + pthread_mutex_unlock( &ep->gate ); } return SI_RET_OK;