X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fsi%2Fsrc%2Fmt_call_si_static.c;h=c3483d816f5b533c5cb571325122737421dec979;hb=c8c5946b142c5cc04449edc49a599f282c6573e4;hp=55636bb56201a6ed4330799d6bb5f00aa7679e84;hpb=a1575dacc478b945ea63f5d0cc3db3d66dcb5983;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/si/src/mt_call_si_static.c b/src/rmr/si/src/mt_call_si_static.c index 55636bb..c3483d8 100644 --- a/src/rmr/si/src/mt_call_si_static.c +++ b/src/rmr/si/src/mt_call_si_static.c @@ -1,8 +1,8 @@ // : vi ts=4 sw=4 noet: /* ================================================================================== - Copyright (c) 2019 Nokia - Copyright (c) 2018-2020 AT&T Intellectual Property. + Copyright (c) 2020-2021 Nokia + Copyright (c) 2018-2021 AT&T Intellectual Property. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -21,7 +21,10 @@ /* Mnemonic: mt_call_si static.c Abstract: Static funcitons related to the multi-threaded call feature - which are SI specific. + which are SI specific. The functions here also provide the + message construction functions which build a message that + might be split across multiple "datagrams" received from the + underlying transport. Author: E. Scott Daniels Date: 20 May 2019 @@ -32,19 +35,25 @@ #include static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) { - static int warned = 0; + static time_t last_warning = 0; + //static long dcount = 0; + chute_t* chute; if( ! uta_ring_insert( ctx->mring, mbuf ) ) { rmr_free_msg( mbuf ); // drop if ring is full - if( !warned ) { - fprintf( stderr, "[WARN] rmr_mt_receive: application is not receiving fast enough; messages dropping\n" ); - warned++; + //dcount++; + ctx->dcount++; + ctx->acc_dcount++; + if( time( NULL ) > last_warning + 60 ) { // issue warning no more frequently than every 60 sec + rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; %d msgs dropped since last warning\n", ctx->dcount ); + last_warning = time( NULL ); + ctx->dcount = 0; } return; } - + ctx->acc_ecount++; chute = &ctx->chutes[0]; sem_post( &chute->barrier ); // tickle the ring monitor } @@ -62,15 +71,29 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd chute_t* chute; unsigned int call_id; // the id assigned to the call generated message - if( PARINOID_CHECKS ) { // PARINOID mode is slower; off by default + if( PARANOID_CHECKS ) { // PARANOID mode is slower; off by default if( raw_msg == NULL || msg_size <= 0 ) { return; } } + // cross-check that header length indicators are not longer than actual message + uta_mhdr_t* hdr_check = (uta_mhdr_t*)(((char *) raw_msg) + TP_HDR_LEN); + uint32_t header_len=(uint32_t)RMR_HDR_LEN(hdr_check); + uint32_t payload_len=(uint32_t)ntohl(hdr_check->plen); + if (header_len+TP_HDR_LEN+payload_len> msg_size) { + rmr_vlog( RMR_VL_ERR, "Message dropped because %u + %u + %u > %u\n", header_len, payload_len, TP_HDR_LEN, msg_size); + free (raw_msg); + return; + } + + if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) { mbuf->tp_buf = raw_msg; mbuf->rts_fd = sender_fd; + if( msg_size > ctx->max_ibm + 1024 ) { + mbuf->flags |= MFL_HUGE; // prevent caching of oversized buffers + } ref_tpbuf( mbuf, msg_size ); // point mbuf at bits in the datagram hdr = mbuf->header; // convenience @@ -90,9 +113,51 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd } } } + } else { + free( raw_msg ); } } +/* + Given a buffer, extract the size. We assume the buffer contains one of: + + + + where is the size in native storage order (v1) and + is the size in network order. If is present then we assume + that is present and we use that after translating from net + byte order. If is not present, we use . This allows + old versions of RMR to continue to work with new versions that now + do the right thing with byte ordering. + + If the receiver of a message is a backlevel RMR, and it uses RTS to + return a message, it will only update the old size, but when the + message is received back at a new RMR application it will appear that + the message came from a new instance. Therefore, we must compare + the old and new sizes and if they are different we must use the old + size assuming that this is the case. +*/ +static inline uint32_t extract_mlen( unsigned char* buf ) { + uint32_t size; // adjusted (if needed) size for return + uint32_t osize; // old size + uint32_t* blen; // length in the buffer to extract + + blen = (uint32_t *) buf; + if( *(buf + sizeof( int ) * 2 ) == TP_SZ_MARKER ) { + osize = *blen; // old size + size = ntohl( *(blen+1) ); // pick up the second integer + if( osize != size ) { // assume back level return to sender + size = osize; // MUST use old size + } + if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len converted from net order to: %d\n", size ); + } else { + size = *blen; // old sender didn't encode size + if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len no conversion: %d\n", size ); + } + + return size; +} + /* This is the callback invoked when tcp data is received. It adds the data to the buffer for the connection and if a complete message is received @@ -100,72 +165,74 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd Return value indicates only that we handled the buffer and SI should continue or that SI should terminate, so on error it's NOT wrong to return "ok". - - - FUTURE: to do this better, SI needs to support a 'ready to read' callback - which allows us to to the actual receive directly into our buffer. */ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { uta_ctx_t* ctx; river_t* river; // river associated with the fd passed in + unsigned char* old_accum; // old accumulator reference should we need to realloc int bidx = 0; // transport buffer index int remain; // bytes in transport buf that need to be moved - int* mlen; // pointer to spot in buffer for conversion to int + int* mlen; // pointer to spot in buffer for conversion to int int need; // bytes needed for something int i; - if( PARINOID_CHECKS ) { // PARINOID mode is slower; off by default + if( PARANOID_CHECKS ) { // PARANOID mode is slower; off by default if( (ctx = (uta_ctx_t *) vctx) == NULL ) { return SI_RET_OK; } - - if( fd >= ctx->nrivers || fd < 0 ) { - if( DEBUG ) fprintf( stderr, "[DBUG] callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers ); - return SI_RET_OK; - } + } else { + ctx = (uta_ctx_t *) vctx; } - if( buflen <= 0 ) { + if( buflen <= 0 || fd < 0 ) { // no buffer or invalid fd return SI_RET_OK; } - river = &ctx->rivers[fd]; + if( fd >= ctx->nrivers ) { + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers ); + if( (river = (river_t *) rmr_sym_pull( ctx->river_hash, (uint64_t) fd )) == NULL ) { + river = (river_t *) malloc( sizeof( *river ) ); + memset( river, 0, sizeof( *river ) ); + rmr_sym_map( ctx->river_hash, (uint64_t) fd, river ); + river->state = RS_NEW; + } + } else { + river = &ctx->rivers[fd]; // quick index for fd values < MAX_FD + } + if( river->state != RS_GOOD ) { // all states which aren't good require reset first if( river->state == RS_NEW ) { + if( river->accum != NULL ) { + free( river->accum ); + } memset( river, 0, sizeof( *river ) ); - //river->nbytes = sizeof( char ) * (8 * 1024); - river->nbytes = sizeof( char ) * ctx->max_ibm; // max inbound message size + river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // start with what user said would be the "normal" max inbound msg size river->accum = (char *) malloc( river->nbytes ); river->ipt = 0; } else { - // future -- sync to next marker - river->ipt = 0; // insert point + if( river->state == RS_RESET ) { + // future -- reset not implemented + return SI_RET_OK; + } else { + // future -- sync to next marker + river->ipt = 0; // insert point + } } } river->state = RS_GOOD; - -/* -fprintf( stderr, "\n>>>>> data callback for %d bytes from %d\n", buflen, fd ); -for( i = 0; i < 40; i++ ) { - fprintf( stderr, "%02x ", (unsigned char) *(buf+i) ); -} -fprintf( stderr, "\n" ); -*/ - remain = buflen; while( remain > 0 ) { // until we've done something with all bytes passed in - if( DEBUG ) fprintf( stderr, "[DBUG] ====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain ); - // FIX ME: size in the message needs to be network byte order - if( river->msg_size <= 0 ) { // don't have a size yet + if( river->msg_size <= 0 ) { // don't have a message length yet // FIX ME: we need a frame indicator to ensure alignment - need = sizeof( int ) - river->ipt; // what we need from transport buffer - if( need > remain ) { // the whole size isn't there - if( DEBUG > 1 ) fprintf( stderr, "[DBUG] need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt ); + need = TP_SZFIELD_LEN - river->ipt; // what we need to compute the total message length + if( need > remain ) { // the whole message len information isn't in this transport buf + if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt ); memcpy( &river->accum[river->ipt], buf+bidx, remain ); // grab what we can and depart river->ipt += remain; - if( DEBUG > 1 ) fprintf( stderr, "[DBUG] data callback not enough bytes to compute size; need=%d have=%d\n", need, remain ); + if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough bytes to compute size; need=%d have=%d\n", need, remain ); return SI_RET_OK; } @@ -174,40 +241,59 @@ fprintf( stderr, "\n" ); river->ipt += need; bidx += need; remain -= need; - river->msg_size = *((int *) river->accum); - if( DEBUG > 1 ) { - fprintf( stderr, "[DBUG] size from accumulator =%d\n", river->msg_size ); - if( river->msg_size > 500 ) { - dump_40( river->accum, "msg size way too large accum:" ); + river->msg_size = extract_mlen( river->accum ); + if( DEBUG ) { + rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size ); + if( DEBUG > 1 ) { + dump_40( river->accum, "from accumulator:" ); + if( river->msg_size > 100 ) { + dump_40( river->accum + 50, "from rmr buf:" ); + } } } } else { - river->msg_size = *((int *) &buf[bidx]); // snarf directly and copy with rest later + river->msg_size = extract_mlen( &buf[bidx] ); // pull from buf as it's all there; it will copy later } - if( DEBUG ) fprintf( stderr, "[DBUG] data callback setting msg size: %d\n", river->msg_size ); - if( river->msg_size > river->nbytes ) { // message is too big, we will drop it - river->flags |= RF_DROP; + if( river->msg_size < 0) { // addressing RIC-989 + river->state=RS_RESET; + return SI_RET_OK; + } + + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size ); + + if( river->msg_size > river->nbytes ) { // message bigger than app max size; grab huge buffer + //river->flags |= RF_DROP; // uncomment to drop large messages + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "received message is huge (%d) reallocating buffer\n", river->msg_size ); + old_accum = river->accum; // need to copy any bytes we snarfed getting the size, so hold + river->nbytes = river->msg_size + 128; // buffer large enough with a bit of fudge room + river->accum = (char *) malloc( river->nbytes ); + if( river->ipt > 0 ) { + memcpy( river->accum, old_accum, river->ipt + 1 ); // copy anything snarfed in getting the sie + } + + free( old_accum ); } } - if( river->msg_size > (river->ipt + remain) ) { // need more than is left in buffer - if( DEBUG > 1 ) fprintf( stderr, "[DBUG] data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain ); - if( (river->flags & RF_DROP) == 0 ) { - memcpy( &river->accum[river->ipt], buf+bidx, remain ); // buffer and go wait for more + if( river->msg_size > (river->ipt + remain) ) { // need more than is left in receive buffer + if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain ); + if( (river->flags & RF_DROP) == 0 ) { // ok to keep this message; copy bytes + memcpy( &river->accum[river->ipt], buf+bidx, remain ); // grab what is in the rcv buffer and go wait for more } river->ipt += remain; remain = 0; } else { need = river->msg_size - river->ipt; // bytes from transport we need to have complete message - if( DEBUG ) fprintf( stderr, "[DBUG] data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain ); - if( (river->flags & RF_DROP) == 0 ) { + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d flgs=%02x\n", river->msg_size, need, remain, river->flags ); + if( (river->flags & RF_DROP) == 0 ) { // keeping this message, copy and pass it on memcpy( &river->accum[river->ipt], buf+bidx, need ); // grab just what is needed (might be more) - buf2mbuf( ctx, river->accum, river->msg_size, fd ); // build an RMR mbuf and queue + buf2mbuf( ctx, river->accum, river->nbytes, fd ); // build an RMR mbuf and queue + river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // prevent huge size from persisting river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator } else { - if( !(river->flags & RF_NOTIFIED) ) { - fprintf( stderr, "[WRN] message larger than max (%d) have arrived on fd %d\n", river->nbytes, fd ); + if( !(river->flags & RF_NOTIFIED) ) { // not keeping huge messages; notify once per stream + rmr_vlog( RMR_VL_WARN, "message larger than allocated buffer (%d) arrived on fd %d\n", river->nbytes, fd ); river->flags |= RF_NOTIFIED; } } @@ -215,11 +301,57 @@ fprintf( stderr, "\n" ); river->msg_size = -1; river->ipt = 0; bidx += need; - remain -= need; + remain -= need; + } + } + + if( DEBUG >2 ) rmr_vlog( RMR_VL_DEBUG, "##### data callback finished\n" ); + return SI_RET_OK; +} + +/* + Callback driven on a disconnect notification. We will attempt to find the related + endpoint via the fd2ep hash maintained in the context. If we find it, then we + remove it from the hash, and mark the endpoint as closed so that the next attempt + to send forces a reconnect attempt. + + Future: put the ep on a queue to automatically attempt to reconnect. +*/ +static int mt_disc_cb( void* vctx, int fd ) { + uta_ctx_t* ctx; + endpoint_t* ep; + river_t* river = NULL; + + if( (ctx = (uta_ctx_t *) vctx) == NULL ) { + return SI_RET_OK; + } + + if( fd < ctx->nrivers && fd >= 0 ) { + river = &ctx->rivers[fd]; + } else { + if( fd > 0 ) { + river = rmr_sym_pull( ctx->river_hash, (uint64_t) fd ); + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "river reset on disconnect: fd=%d\n", fd ); } } - if( DEBUG >2 ) fprintf( stderr, "[DBUG] ##### data callback finished\n" ); + if( river != NULL ) { + river->state = RS_NEW; // if one connects here later; ensure it's new + if( river->accum != NULL ) { + free( river->accum ); + river->accum = NULL; + river->state = RS_NEW; // force realloc if the fd is used again + } + } + + ep = fd2ep_del( ctx, fd ); // find ep and remove the fd from the hash + if( ep != NULL ) { + pthread_mutex_lock( &ep->gate ); // wise to lock this + ep->open = FALSE; + ep->nn_sock = -1; + pthread_mutex_unlock( &ep->gate ); + } + return SI_RET_OK; } @@ -248,12 +380,15 @@ static void* mt_receive( void* vctx ) { uta_ctx_t* ctx; if( (ctx = (uta_ctx_t*) vctx) == NULL ) { - fprintf( stderr, "[CRI], unable to start mt-receive: ctx was nil\n" ); + rmr_vlog( RMR_VL_CRIT, "unable to start mt-receive: ctx was nil\n" ); return NULL; } - if( DEBUG ) fprintf( stderr, "[DBUG] mt_receive: registering SI95 data callback and waiting\n" ); + rmr_vlog( RMR_VL_INFO, "mt_receive: pid=%lld registering SI95 data callback and waiting\n", (long long) pthread_self() ); + SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx ); // our callback called only for "cooked" (tcp) data + SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx ); // our callback for handling disconnects + SIwait( ctx->si_ctx ); return NULL; // keep the compiler happy though never can be reached as SI wait doesn't return