X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Frmr%2Fsi%2Fsrc%2Fmt_call_si_static.c;h=2d422bb2a6892787eeae1ae64fea9523f5be0f2e;hb=c113b0836f3ebd58911c30de1636a707174efe55;hp=d18a8e02288377f4ef8003bcc2c82f110f604581;hpb=fc5c77b3d78988aa407118235d7f5978642df753;p=ric-plt%2Flib%2Frmr.git diff --git a/src/rmr/si/src/mt_call_si_static.c b/src/rmr/si/src/mt_call_si_static.c index d18a8e0..2d422bb 100644 --- a/src/rmr/si/src/mt_call_si_static.c +++ b/src/rmr/si/src/mt_call_si_static.c @@ -1,7 +1,7 @@ // : vi ts=4 sw=4 noet: /* ================================================================================== - Copyright (c) 2019 Nokia + Copyright (c) 2020 Nokia Copyright (c) 2018-2020 AT&T Intellectual Property. Licensed under the Apache License, Version 2.0 (the "License"); @@ -21,7 +21,10 @@ /* Mnemonic: mt_call_si static.c Abstract: Static funcitons related to the multi-threaded call feature - which are SI specific. + which are SI specific. The functions here also provide the + message construction functions which build a message that + might be split across multiple "datagrams" received from the + underlying transport. Author: E. Scott Daniels Date: 20 May 2019 @@ -62,7 +65,7 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd chute_t* chute; unsigned int call_id; // the id assigned to the call generated message - if( PARINOID_CHECKS ) { // PARINOID mode is slower; off by default + if( PARANOID_CHECKS ) { // PARANOID mode is slower; off by default if( raw_msg == NULL || msg_size <= 0 ) { return; } @@ -71,6 +74,9 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) { mbuf->tp_buf = raw_msg; mbuf->rts_fd = sender_fd; + if( msg_size > ctx->max_ibm + 1024 ) { + mbuf->flags |= MFL_HUGE; // prevent caching of oversized buffers + } ref_tpbuf( mbuf, msg_size ); // point mbuf at bits in the datagram hdr = mbuf->header; // convenience @@ -93,6 +99,34 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd } } +/* + Given a buffer, extract the size. We assume the buffer contains one of: + + + + where is the size in native storage order (v1) and + is the size in network order. If is present then we assume + that is present and we use that after translating from net + byte order. If is not present, we use . This allows + old versions of RMR to continue to work with new versions that now + do the right thing with byte ordering. +*/ +static inline uint32_t extract_mlen( unsigned char* buf ) { + uint32_t size; // adjusted (if needed) size for return + uint32_t* blen; // length in the buffer to extract + + blen = (uint32_t *) buf; + if( *(buf + sizeof( int ) * 2 ) == TP_SZ_MARKER ) { + size = ntohl( *(blen+1) ); // pick up the second integer + if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len converted from net order to: %d\n", size ); + } else { + size = *blen; // old sender didn't encode size + if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len no conversion: %d\n", size ); + } + + return size; +} + /* This is the callback invoked when tcp data is received. It adds the data to the buffer for the connection and if a complete message is received @@ -100,21 +134,18 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd Return value indicates only that we handled the buffer and SI should continue or that SI should terminate, so on error it's NOT wrong to return "ok". - - - FUTURE: to do this better, SI needs to support a 'ready to read' callback - which allows us to to the actual receive directly into our buffer. */ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { uta_ctx_t* ctx; river_t* river; // river associated with the fd passed in + unsigned char* old_accum; // old accumulator reference should we need to realloc int bidx = 0; // transport buffer index int remain; // bytes in transport buf that need to be moved int* mlen; // pointer to spot in buffer for conversion to int int need; // bytes needed for something int i; - if( PARINOID_CHECKS ) { // PARINOID mode is slower; off by default + if( PARANOID_CHECKS ) { // PARANOID mode is slower; off by default if( (ctx = (uta_ctx_t *) vctx) == NULL ) { return SI_RET_OK; } @@ -123,6 +154,8 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers ); return SI_RET_OK; } + } else { + ctx = (uta_ctx_t *) vctx; } if( buflen <= 0 ) { @@ -133,7 +166,6 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { if( river->state != RS_GOOD ) { // all states which aren't good require reset first if( river->state == RS_NEW ) { memset( river, 0, sizeof( *river ) ); - //river->nbytes = sizeof( char ) * (8 * 1024); river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // max inbound message size river->accum = (char *) malloc( river->nbytes ); river->ipt = 0; @@ -148,10 +180,9 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { while( remain > 0 ) { // until we've done something with all bytes passed in if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain ); - // FIX ME: size in the message needs to be network byte order if( river->msg_size <= 0 ) { // don't have a size yet // FIX ME: we need a frame indicator to ensure alignment - need = sizeof( int ) - river->ipt; // what we need from transport buffer + need = TP_SZFIELD_LEN - river->ipt; // what we need to compute length if( need > remain ) { // the whole size isn't there if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt ); memcpy( &river->accum[river->ipt], buf+bidx, remain ); // grab what we can and depart @@ -165,7 +196,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { river->ipt += need; bidx += need; remain -= need; - river->msg_size = *((int *) river->accum); + river->msg_size = extract_mlen( river->accum ); if( DEBUG ) { rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size ); if( DEBUG > 1 ) { @@ -176,12 +207,21 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { } } } else { - river->msg_size = *((int *) &buf[bidx]); // snarf directly and copy with rest later + river->msg_size = extract_mlen( &buf[bidx] ); // pull from buf as it's all there; it will copy later } if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size ); - if( river->msg_size > river->nbytes ) { // message is too big, we will drop it - river->flags |= RF_DROP; + if( river->msg_size > river->nbytes ) { // message bigger than app max size; grab huge buffer + //river->flags |= RF_DROP; + if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "received message is huge (%d) reallocating buffer\n", river->msg_size ); + old_accum = river->accum; // need to copy any bytes we snarfed getting the size, so hold + river->nbytes = river->msg_size + 128; // buffer large enough with a bit of fudge room + river->accum = (char *) malloc( river->nbytes ); + if( river->ipt > 0 ) { + memcpy( river->accum, old_accum, river->ipt + 1 ); // copy anything snarfed in getting the sie + } + + free( old_accum ); } } @@ -198,6 +238,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { if( (river->flags & RF_DROP) == 0 ) { memcpy( &river->accum[river->ipt], buf+bidx, need ); // grab just what is needed (might be more) buf2mbuf( ctx, river->accum, river->nbytes, fd ); // build an RMR mbuf and queue + river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // prevent huge size from persisting river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator } else { if( !(river->flags & RF_NOTIFIED) ) {