X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fsi%2Fsrc%2Fmt_call_si_static.c;h=d18a8e02288377f4ef8003bcc2c82f110f604581;hb=refs%2Ftags%2F3.3.0;hp=6e2e8aa5efb4b236aefdbb906801c377b4d1649a;hpb=ec88d3c0563eeb6ae5f73427edb0b3c4d7acf299;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/si/src/mt_call_si_static.c b/src/rmr/si/src/mt_call_si_static.c index 6e2e8aa..d18a8e0 100644 --- a/src/rmr/si/src/mt_call_si_static.c +++ b/src/rmr/si/src/mt_call_si_static.c @@ -38,7 +38,7 @@ static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) { if( ! uta_ring_insert( ctx->mring, mbuf ) ) { rmr_free_msg( mbuf ); // drop if ring is full if( !warned ) { - fprintf( stderr, "[WARN] rmr_mt_receive: application is not receiving fast enough; messages dropping\n" ); + rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; messages dropping\n" ); warned++; } @@ -54,8 +54,6 @@ static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) { call ref to point to all of the various bits and set real len etc, then we queue it. Raw_msg is expected to include the transport goo placed in front of the RMR header and payload. - - done -- FIX ME?? can we eliminate the buffer copy here? */ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd ) { rmr_mbuf_t* mbuf; @@ -70,18 +68,10 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd } } -/* - if( (mbuf = (rmr_mbuf_t *) malloc( sizeof( *mbuf ))) != NULL ) { // alloc mbuf and point at various bits of payload - memset( mbuf, 0, sizeof( *mbuf ) ); - mbuf->tp_buf = raw_msg; - mbuf->ring = ctx->zcb_mring; -*/ if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) { mbuf->tp_buf = raw_msg; mbuf->rts_fd = sender_fd; - // eliminated :) memcpy( mbuf->tp_buf, river->accum + offset, river->msg_size ); - ref_tpbuf( mbuf, msg_size ); // point mbuf at bits in the datagram hdr = mbuf->header; // convenience if( hdr->flags & HFL_CALL_MSG ) { // call generated message; ignore call-id etc and queue @@ -124,18 +114,17 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { int need; // bytes needed for something int i; - // for speed these checks should be enabled only in debug mode and assume we always get a good context - if( (ctx = (uta_ctx_t *) vctx) == NULL ) { - return SI_RET_OK; - } + if( PARINOID_CHECKS ) { // PARINOID mode is slower; off by default + if( (ctx = (uta_ctx_t *) vctx) == NULL ) { + return SI_RET_OK; + } - if( fd >= ctx->nrivers || fd < 0 ) { - if( DEBUG ) fprintf( stderr, "[DBUG] callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers ); - return SI_RET_OK; + if( fd >= ctx->nrivers || fd < 0 ) { + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers ); + return SI_RET_OK; + } } - // -------- end debug checks ----------------- - if( buflen <= 0 ) { return SI_RET_OK; } @@ -145,7 +134,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { if( river->state == RS_NEW ) { memset( river, 0, sizeof( *river ) ); //river->nbytes = sizeof( char ) * (8 * 1024); - river->nbytes = sizeof( char ) * ctx->max_ibm; // max inbound message size + river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // max inbound message size river->accum = (char *) malloc( river->nbytes ); river->ipt = 0; } else { @@ -155,28 +144,19 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { } river->state = RS_GOOD; - -/* -fprintf( stderr, "\n>>>>> data callback for %d bytes from %d\n", buflen, fd ); -for( i = 0; i < 40; i++ ) { -fprintf( stderr, "%02x ", (unsigned char) *(buf+i) ); -} -fprintf( stderr, "\n" ); -*/ - remain = buflen; while( remain > 0 ) { // until we've done something with all bytes passed in - if( DEBUG ) fprintf( stderr, "[DBUG] ====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain ); - // FIX ME: size in the message needs to be network byte order + // FIX ME: size in the message needs to be network byte order if( river->msg_size <= 0 ) { // don't have a size yet // FIX ME: we need a frame indicator to ensure alignment need = sizeof( int ) - river->ipt; // what we need from transport buffer if( need > remain ) { // the whole size isn't there - if( DEBUG > 1 ) fprintf( stderr, "[DBUG] need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt ); + if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt ); memcpy( &river->accum[river->ipt], buf+bidx, remain ); // grab what we can and depart river->ipt += remain; - if( DEBUG > 1 ) fprintf( stderr, "[DBUG] data callback not enough bytes to compute size; need=%d have=%d\n", need, remain ); + if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough bytes to compute size; need=%d have=%d\n", need, remain ); return SI_RET_OK; } @@ -185,39 +165,82 @@ fprintf( stderr, "\n" ); river->ipt += need; bidx += need; remain -= need; - river->msg_size = *((int *) river->accum); - if( DEBUG > 1 ) { - fprintf( stderr, "[DBUG] size from accumulator =%d\n", river->msg_size ); - if( river->msg_size > 500 ) { - dump_40( river->accum, "msg size way too large accum:" ); + river->msg_size = *((int *) river->accum); + if( DEBUG ) { + rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size ); + if( DEBUG > 1 ) { + dump_40( river->accum, "from accumulator:" ); + if( river->msg_size > 100 ) { + dump_40( river->accum + 50, "from rmr buf:" ); + } } } } else { river->msg_size = *((int *) &buf[bidx]); // snarf directly and copy with rest later } - if( DEBUG ) fprintf( stderr, "[DBUG] data callback setting msg size: %d\n", river->msg_size ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size ); + + if( river->msg_size > river->nbytes ) { // message is too big, we will drop it + river->flags |= RF_DROP; + } } if( river->msg_size > (river->ipt + remain) ) { // need more than is left in buffer - if( DEBUG > 1 ) fprintf( stderr, "[DBUG] data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain ); - memcpy( &river->accum[river->ipt], buf+bidx, remain ); // buffer and go wait for more + if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain ); + if( (river->flags & RF_DROP) == 0 ) { + memcpy( &river->accum[river->ipt], buf+bidx, remain ); // buffer and go wait for more + } river->ipt += remain; remain = 0; } else { need = river->msg_size - river->ipt; // bytes from transport we need to have complete message - if( DEBUG ) fprintf( stderr, "[DBUG] data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain ); - memcpy( &river->accum[river->ipt], buf+bidx, need ); // grab just what is needed (might be more) - buf2mbuf( ctx, river->accum, river->msg_size, fd ); // build an RMR mbuf and queue + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain ); + if( (river->flags & RF_DROP) == 0 ) { + memcpy( &river->accum[river->ipt], buf+bidx, need ); // grab just what is needed (might be more) + buf2mbuf( ctx, river->accum, river->nbytes, fd ); // build an RMR mbuf and queue + river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator + } else { + if( !(river->flags & RF_NOTIFIED) ) { + rmr_vlog( RMR_VL_WARN, "message larger than max (%d) have arrived on fd %d\n", river->nbytes, fd ); + river->flags |= RF_NOTIFIED; + } + } - river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator river->msg_size = -1; river->ipt = 0; bidx += need; - remain -= need; + remain -= need; } } - if( DEBUG >2 ) fprintf( stderr, "[DBUG] ##### data callback finished\n" ); + if( DEBUG >2 ) rmr_vlog( RMR_VL_DEBUG, "##### data callback finished\n" ); + return SI_RET_OK; +} + +/* + Callback driven on a disconnect notification. We will attempt to find the related + endpoint via the fd2ep hash maintained in the context. If we find it, then we + remove it from the hash, and mark the endpoint as closed so that the next attempt + to send forces a reconnect attempt. + + Future: put the ep on a queue to automatically attempt to reconnect. +*/ +static int mt_disc_cb( void* vctx, int fd ) { + uta_ctx_t* ctx; + endpoint_t* ep; + + if( (ctx = (uta_ctx_t *) vctx) == NULL ) { + return SI_RET_OK; + } + + ep = fd2ep_del( ctx, fd ); // find ep and remove the fd from the hash + if( ep != NULL ) { + pthread_mutex_lock( &ep->gate ); // wise to lock this + ep->open = FALSE; + ep->nn_sock = -1; + pthread_mutex_unlock( &ep->gate ); + } + return SI_RET_OK; } @@ -226,7 +249,7 @@ fprintf( stderr, "\n" ); This is expected to execute in a separate thread. It is responsible for _all_ receives and queues them on the appropriate ring, or chute. It does this by registering the callback function above with the SI world - and then caling SIwait() to drive the callback when data has arrived. + and then calling SIwait() to drive the callback when data has arrived. The "state" of the message is checked which determines where the message @@ -246,12 +269,15 @@ static void* mt_receive( void* vctx ) { uta_ctx_t* ctx; if( (ctx = (uta_ctx_t*) vctx) == NULL ) { - fprintf( stderr, "[CRI], unable to start mt-receive: ctx was nil\n" ); + rmr_vlog( RMR_VL_CRIT, "unable to start mt-receive: ctx was nil\n" ); return NULL; } - if( DEBUG ) fprintf( stderr, "[DBUG] mt_receive: registering SI95 data callback and waiting\n" ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mt_receive: registering SI95 data callback and waiting\n" ); + SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx ); // our callback called only for "cooked" (tcp) data + SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx ); // our callback for handling disconnects + SIwait( ctx->si_ctx ); return NULL; // keep the compiler happy though never can be reached as SI wait doesn't return