Eliminate the SI receive buffer length requirement
[ric-plt/lib/rmr.git] / src / rmr / si / src / mt_call_si_static.c
index d18a8e0..2d422bb 100644 (file)
@@ -1,7 +1,7 @@
 // : vi ts=4 sw=4 noet:
 /*
 ==================================================================================
-       Copyright (c) 2019 Nokia
+       Copyright (c) 2020 Nokia
        Copyright (c) 2018-2020 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
 /*
        Mnemonic:       mt_call_si static.c
        Abstract:       Static funcitons related to the multi-threaded call feature
-                               which are SI specific.
+                               which are SI specific. The functions here also provide the
+                               message construction functions which build a message that
+                               might be split across multiple "datagrams" received from the
+                               underlying transport.
 
        Author:         E. Scott Daniels
        Date:           20 May 2019
@@ -62,7 +65,7 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd
        chute_t*                chute;
        unsigned int    call_id;        // the id assigned to the call generated message
 
-       if( PARINOID_CHECKS ) {                                                                 // PARINOID mode is slower; off by default
+       if( PARANOID_CHECKS ) {                                                                 // PARANOID mode is slower; off by default
                if( raw_msg == NULL || msg_size <= 0 ) {
                        return;
                }
@@ -71,6 +74,9 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd
        if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) {
                mbuf->tp_buf = raw_msg;
                mbuf->rts_fd = sender_fd;
+               if( msg_size > ctx->max_ibm + 1024 ) {
+                       mbuf->flags |= MFL_HUGE;                                // prevent caching of oversized buffers
+               }
 
                ref_tpbuf( mbuf, msg_size );                            // point mbuf at bits in the datagram
                hdr = mbuf->header;                                                     // convenience
@@ -93,6 +99,34 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd
        }
 }
 
+/*
+       Given a buffer, extract the size. We assume the buffer contains one of:
+               <int1><int2><mark>
+               <int1>
+
+       where <int1> is the size in native storage order (v1) and <int2>
+       is the size in network order. If <mark> is present then we assume
+       that <int2> is present and we use that after translating from net
+       byte order. If <mark> is not present, we use <int1>. This allows
+       old versions of RMR to continue to work with new versions that now
+       do the right thing with byte ordering.
+*/
+static inline uint32_t extract_mlen( unsigned char* buf ) {
+       uint32_t        size;           // adjusted (if needed) size for return
+       uint32_t*       blen;           // length in the buffer to extract
+
+       blen = (uint32_t *) buf;
+       if( *(buf + sizeof( int ) * 2 ) == TP_SZ_MARKER ) {
+               size = ntohl( *(blen+1) );                              // pick up the second integer
+               if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len converted from net order to: %d\n", size );
+       } else {
+               size = *blen;                                                   // old sender didn't encode size
+               if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len no conversion: %d\n", size );
+       }
+
+       return size;
+}
+
 /*
        This is the callback invoked when tcp data is received. It adds the data
        to the buffer for the connection and if a complete message is received
@@ -100,21 +134,18 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd
 
        Return value indicates only that we handled the buffer and SI should continue
        or that SI should terminate, so on error it's NOT wrong to return "ok".
-
-
-       FUTURE: to do this better, SI needs to support a 'ready to read' callback
-       which allows us to to the actual receive directly into our buffer.
 */
 static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
        uta_ctx_t*              ctx;
        river_t*                river;                  // river associated with the fd passed in
+       unsigned char*  old_accum;              // old accumulator reference should we need to realloc
        int                             bidx = 0;               // transport buffer index
        int                             remain;                 // bytes in transport buf that need to be moved
        int*                    mlen;                   // pointer to spot in buffer for conversion to int
        int                             need;                   // bytes needed for something
        int                             i;
 
-       if( PARINOID_CHECKS ) {                                                                 // PARINOID mode is slower; off by default
+       if( PARANOID_CHECKS ) {                                                                 // PARANOID mode is slower; off by default
                if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
                        return SI_RET_OK;
                }
@@ -123,6 +154,8 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
                        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
                        return SI_RET_OK;
                }
+       } else {
+               ctx = (uta_ctx_t *) vctx;
        }
 
        if( buflen <= 0 ) {
@@ -133,7 +166,6 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
        if( river->state != RS_GOOD ) {                         // all states which aren't good require reset first
                if( river->state == RS_NEW ) {
                        memset( river, 0, sizeof( *river ) );
-                       //river->nbytes = sizeof( char ) * (8 * 1024);
                        river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024);         // max inbound message size
                        river->accum = (char *) malloc( river->nbytes );
                        river->ipt = 0;
@@ -148,10 +180,9 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
        while( remain > 0 ) {                                                           // until we've done something with all bytes passed in
                if( DEBUG )  rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
 
-               // FIX ME: size in the message  needs to be network byte order
                if( river->msg_size <= 0 ) {                            // don't have a size yet
                                                                                                        // FIX ME: we need a frame indicator to ensure alignment
-                       need = sizeof( int ) - river->ipt;                                                      // what we need from transport buffer
+                       need = TP_SZFIELD_LEN - river->ipt;             // what we need to compute length
                        if( need > remain ) {                                                                           // the whole size isn't there
                                if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
                                memcpy( &river->accum[river->ipt], buf+bidx, remain );                  // grab what we can and depart
@@ -165,7 +196,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
                                river->ipt += need;
                                bidx += need;
                                remain -= need;
-                               river->msg_size = *((int *) river->accum);              
+                               river->msg_size = extract_mlen( river->accum );
                                if( DEBUG ) {
                                        rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size );
                                        if( DEBUG > 1 ) {
@@ -176,12 +207,21 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
                                        }
                                }
                        } else {
-                               river->msg_size = *((int *) &buf[bidx]);                                        // snarf directly and copy with rest later
+                               river->msg_size = extract_mlen( &buf[bidx] );                   // pull from buf as it's all there; it will copy later
                        }
                        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size );
 
-                       if( river->msg_size > river->nbytes ) {                         // message is too big, we will drop it
-                               river->flags |= RF_DROP;
+                       if( river->msg_size > river->nbytes ) {                                         // message bigger than app max size; grab huge buffer
+                               //river->flags |= RF_DROP;
+                               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "received message is huge (%d) reallocating buffer\n", river->msg_size );
+                               old_accum = river->accum;                                       // need to copy any bytes we snarfed getting the size, so hold
+                               river->nbytes = river->msg_size + 128;                                  // buffer large enough with a bit of fudge room
+                               river->accum = (char *) malloc( river->nbytes );
+                               if( river->ipt > 0 ) {
+                                       memcpy( river->accum, old_accum, river->ipt + 1 );              // copy anything snarfed in getting the sie
+                               }
+
+                               free( old_accum );
                        }
                }
 
@@ -198,6 +238,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
                        if( (river->flags & RF_DROP) == 0  ) {
                                memcpy( &river->accum[river->ipt], buf+bidx, need );                            // grab just what is needed (might be more)
                                buf2mbuf( ctx, river->accum, river->nbytes, fd );                                       // build an RMR mbuf and queue
+                               river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024);                         // prevent huge size from persisting
                                river->accum = (char *) malloc( sizeof( char ) *  river->nbytes );      // fresh accumulator
                        } else {
                                if( !(river->flags & RF_NOTIFIED) ) {