Fixing handling of invalid header size
[ric-plt/lib/rmr.git] / src / rmr / si / src / mt_call_si_static.c
index 2894cdb..c3483d8 100644 (file)
@@ -1,8 +1,8 @@
 // : vi ts=4 sw=4 noet:
 /*
 ==================================================================================
-       Copyright (c) 2019 Nokia
-       Copyright (c) 2018-2020 AT&T Intellectual Property.
+       Copyright (c) 2020-2021 Nokia
+       Copyright (c) 2018-2021 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
 /*
        Mnemonic:       mt_call_si static.c
        Abstract:       Static funcitons related to the multi-threaded call feature
-                               which are SI specific.
+                               which are SI specific. The functions here also provide the
+                               message construction functions which build a message that
+                               might be split across multiple "datagrams" received from the
+                               underlying transport.
 
        Author:         E. Scott Daniels
        Date:           20 May 2019
 #include <semaphore.h>
 
 static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
-       static  int warned = 0;
+       static  time_t last_warning = 0;
+       //static        long dcount = 0;
+
        chute_t*        chute;
 
        if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
                rmr_free_msg( mbuf );                                                           // drop if ring is full
-               if( !warned ) {
-                       rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
-                       warned++;
+               //dcount++;
+               ctx->dcount++;
+               ctx->acc_dcount++;
+               if( time( NULL ) > last_warning + 60 ) {                        // issue warning no more frequently than every 60 sec
+                       rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; %d msgs dropped since last warning\n", ctx->dcount );
+                       last_warning = time( NULL );
+                       ctx->dcount = 0;
                }
 
                return;
        }
-
+       ctx->acc_ecount++;
        chute = &ctx->chutes[0];
        sem_post( &chute->barrier );                                                            // tickle the ring monitor
 }
@@ -62,15 +71,29 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd
        chute_t*                chute;
        unsigned int    call_id;        // the id assigned to the call generated message
 
-       if( PARINOID_CHECKS ) {                                                                 // PARINOID mode is slower; off by default
+       if( PARANOID_CHECKS ) {                                                                 // PARANOID mode is slower; off by default
                if( raw_msg == NULL || msg_size <= 0 ) {
                        return;
                }
        }
 
+       // cross-check that header length indicators are not longer than actual message
+       uta_mhdr_t* hdr_check = (uta_mhdr_t*)(((char *) raw_msg) + TP_HDR_LEN);
+        uint32_t header_len=(uint32_t)RMR_HDR_LEN(hdr_check);
+        uint32_t payload_len=(uint32_t)ntohl(hdr_check->plen);
+        if (header_len+TP_HDR_LEN+payload_len> msg_size) {
+                rmr_vlog( RMR_VL_ERR, "Message dropped because %u + %u + %u > %u\n", header_len, payload_len, TP_HDR_LEN, msg_size);
+                free (raw_msg);
+                return;
+        }
+
+
        if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) {
                mbuf->tp_buf = raw_msg;
                mbuf->rts_fd = sender_fd;
+               if( msg_size > ctx->max_ibm + 1024 ) {
+                       mbuf->flags |= MFL_HUGE;                                // prevent caching of oversized buffers
+               }
 
                ref_tpbuf( mbuf, msg_size );                            // point mbuf at bits in the datagram
                hdr = mbuf->header;                                                     // convenience
@@ -90,7 +113,49 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd
                                }
                        }
                }
+       } else {
+               free( raw_msg );
+       }
+}
+
+/*
+       Given a buffer, extract the size. We assume the buffer contains one of:
+               <int1><int2><mark>
+               <int1>
+
+       where <int1> is the size in native storage order (v1) and <int2>
+       is the size in network order. If <mark> is present then we assume
+       that <int2> is present and we use that after translating from net
+       byte order. If <mark> is not present, we use <int1>. This allows
+       old versions of RMR to continue to work with new versions that now
+       do the right thing with byte ordering.
+
+       If the receiver of a message is a backlevel RMR, and it uses RTS to
+       return a message, it will only update the old size, but when the
+       message is received back at a new RMR application it will appear that
+       the message came from a new instance.  Therefore, we must compare
+       the old and new sizes and if they are different we must use the old
+       size assuming that this is the case.
+*/
+static inline uint32_t extract_mlen( unsigned char* buf ) {
+       uint32_t        size;           // adjusted (if needed) size for return
+       uint32_t        osize;          // old size
+       uint32_t*       blen;           // length in the buffer to extract
+
+       blen = (uint32_t *) buf;
+       if( *(buf + sizeof( int ) * 2 ) == TP_SZ_MARKER ) {
+               osize = *blen;                                                  // old size
+               size = ntohl( *(blen+1) );                              // pick up the second integer
+               if( osize != size ) {                                   // assume back level return to sender
+                       size = osize;                                           // MUST use old size
+               }
+               if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len converted from net order to: %d\n", size );
+       } else {
+               size = *blen;                                                   // old sender didn't encode size
+               if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len no conversion: %d\n", size );
        }
+
+       return size;
 }
 
 /*
@@ -100,46 +165,58 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd
 
        Return value indicates only that we handled the buffer and SI should continue
        or that SI should terminate, so on error it's NOT wrong to return "ok".
-
-
-       FUTURE: to do this better, SI needs to support a 'ready to read' callback
-       which allows us to to the actual receive directly into our buffer.
 */
 static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
        uta_ctx_t*              ctx;
        river_t*                river;                  // river associated with the fd passed in
+       unsigned char*  old_accum;              // old accumulator reference should we need to realloc
        int                             bidx = 0;               // transport buffer index
        int                             remain;                 // bytes in transport buf that need to be moved
-       int*                    mlen;                   // pointer to spot in buffer for conversion to int
+       int*                    mlen;                   // pointer to spot in buffer for conversion to int
        int                             need;                   // bytes needed for something
        int                             i;
 
-       if( PARINOID_CHECKS ) {                                                                 // PARINOID mode is slower; off by default
+       if( PARANOID_CHECKS ) {                                                                 // PARANOID mode is slower; off by default
                if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
                        return SI_RET_OK;
                }
-
-               if( fd >= ctx->nrivers || fd < 0 ) {
-                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
-                       return SI_RET_OK;
-               }
+       } else {
+               ctx = (uta_ctx_t *) vctx;
        }
 
-       if( buflen <= 0 ) {
+       if( buflen <= 0 || fd < 0 ) {                   // no buffer or invalid fd
                return SI_RET_OK;
        }
 
-       river = &ctx->rivers[fd];
+       if( fd >= ctx->nrivers ) {
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
+               if( (river = (river_t *) rmr_sym_pull( ctx->river_hash, (uint64_t) fd )) == NULL ) {
+                       river = (river_t *) malloc( sizeof( *river ) );
+                       memset( river, 0, sizeof( *river ) );
+                       rmr_sym_map( ctx->river_hash, (uint64_t) fd, river );
+                       river->state = RS_NEW;
+               }
+       } else {
+               river = &ctx->rivers[fd];                               // quick index for fd values < MAX_FD
+       }
+
        if( river->state != RS_GOOD ) {                         // all states which aren't good require reset first
                if( river->state == RS_NEW ) {
+                       if( river->accum != NULL ) {
+                               free( river->accum );
+                       }
                        memset( river, 0, sizeof( *river ) );
-                       //river->nbytes = sizeof( char ) * (8 * 1024);
-                       river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024);         // max inbound message size
+                       river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024);         // start with what user said would be the "normal" max inbound msg size
                        river->accum = (char *) malloc( river->nbytes );
                        river->ipt = 0;
                } else {
-                       // future -- sync to next marker
-                       river->ipt = 0;                                         // insert point
+                       if( river->state == RS_RESET ) {
+                               // future -- reset not implemented
+                               return SI_RET_OK;
+                       } else {
+                               // future -- sync to next marker
+                               river->ipt = 0;                                         // insert point
+                       }
                }
        }
 
@@ -148,11 +225,10 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
        while( remain > 0 ) {                                                           // until we've done something with all bytes passed in
                if( DEBUG )  rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
 
-               // FIX ME: size in the message  needs to be network byte order
-               if( river->msg_size <= 0 ) {                            // don't have a size yet
+               if( river->msg_size <= 0 ) {                            // don't have a message length  yet
                                                                                                        // FIX ME: we need a frame indicator to ensure alignment
-                       need = sizeof( int ) - river->ipt;                                                      // what we need from transport buffer
-                       if( need > remain ) {                                                                           // the whole size isn't there
+                       need = TP_SZFIELD_LEN - river->ipt;             // what we need to compute the total message length
+                       if( need > remain ) {                                                                           // the whole message len  information isn't in this transport buf
                                if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
                                memcpy( &river->accum[river->ipt], buf+bidx, remain );                  // grab what we can and depart
                                river->ipt += remain;
@@ -165,40 +241,59 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
                                river->ipt += need;
                                bidx += need;
                                remain -= need;
-                               river->msg_size = *((int *) river->accum);              
-                               if( DEBUG > 1 ) {
+                               river->msg_size = extract_mlen( river->accum );
+                               if( DEBUG ) {
                                        rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size );
-                                       if( river->msg_size > 500 ) {
-                                               dump_40( river->accum, "msg size way too large accum:"  );
+                                       if( DEBUG > 1 ) {
+                                               dump_40( river->accum, "from accumulator:"  );
+                                               if( river->msg_size > 100 ) {
+                                                       dump_40( river->accum + 50, "from rmr buf:"  );
+                                               }
                                        }
                                }
                        } else {
-                               river->msg_size = *((int *) &buf[bidx]);                                        // snarf directly and copy with rest later
+                               river->msg_size = extract_mlen( &buf[bidx] );                   // pull from buf as it's all there; it will copy later
                        }
+
+                        if( river->msg_size < 0) { // addressing RIC-989
+                                river->state=RS_RESET;
+                               return SI_RET_OK;
+                        }
+
                        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size );
 
-                       if( river->msg_size > river->nbytes ) {                         // message is too big, we will drop it
-                               river->flags |= RF_DROP;
+                       if( river->msg_size > river->nbytes ) {                                         // message bigger than app max size; grab huge buffer
+                               //river->flags |= RF_DROP;                                                              //  uncomment to drop large messages
+                               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "received message is huge (%d) reallocating buffer\n", river->msg_size );
+                               old_accum = river->accum;                                       // need to copy any bytes we snarfed getting the size, so hold
+                               river->nbytes = river->msg_size + 128;                                  // buffer large enough with a bit of fudge room
+                               river->accum = (char *) malloc( river->nbytes );
+                               if( river->ipt > 0 ) {
+                                       memcpy( river->accum, old_accum, river->ipt + 1 );              // copy anything snarfed in getting the sie
+                               }
+
+                               free( old_accum );
                        }
                }
 
-               if( river->msg_size > (river->ipt + remain) ) {                                 // need more than is left in buffer
+               if( river->msg_size > (river->ipt + remain) ) {                                 // need more than is left in receive buffer
                        if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
-                       if( (river->flags & RF_DROP) == 0  ) {
-                               memcpy( &river->accum[river->ipt], buf+bidx, remain );          // buffer and go wait for more
+                       if( (river->flags & RF_DROP) == 0  ) {                                                  // ok to keep this message; copy bytes
+                               memcpy( &river->accum[river->ipt], buf+bidx, remain );          // grab what is in the rcv buffer and go wait for more
                        }
                        river->ipt += remain;
                        remain = 0;
                } else {
                        need = river->msg_size - river->ipt;                                            // bytes from transport we need to have complete message
-                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain );
-                       if( (river->flags & RF_DROP) == 0  ) {
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d flgs=%02x\n", river->msg_size, need, remain, river->flags );
+                       if( (river->flags & RF_DROP) == 0  ) {                                                                  // keeping this message, copy and pass it on
                                memcpy( &river->accum[river->ipt], buf+bidx, need );                            // grab just what is needed (might be more)
                                buf2mbuf( ctx, river->accum, river->nbytes, fd );                                       // build an RMR mbuf and queue
+                               river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024);                         // prevent huge size from persisting
                                river->accum = (char *) malloc( sizeof( char ) *  river->nbytes );      // fresh accumulator
                        } else {
-                               if( !(river->flags & RF_NOTIFIED) ) {
-                                       rmr_vlog( RMR_VL_WARN, "message larger than max (%d) have arrived on fd %d\n", river->nbytes, fd );
+                               if( !(river->flags & RF_NOTIFIED) ) {                                                           // not keeping huge messages; notify once per stream
+                                       rmr_vlog( RMR_VL_WARN, "message larger than allocated buffer (%d) arrived on fd %d\n", river->nbytes, fd );
                                        river->flags |= RF_NOTIFIED;
                                }
                        }
@@ -225,17 +320,36 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
 static int mt_disc_cb( void* vctx, int fd ) {
        uta_ctx_t*      ctx;
        endpoint_t*     ep;
+       river_t*        river = NULL;
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
                return SI_RET_OK;
        }
 
+       if( fd < ctx->nrivers && fd >= 0 ) {
+               river = &ctx->rivers[fd];
+       } else {
+               if( fd > 0 ) {
+                       river = rmr_sym_pull( ctx->river_hash, (uint64_t) fd );
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "river reset on disconnect: fd=%d\n", fd );
+               }
+       }
+
+       if( river != NULL ) {
+               river->state = RS_NEW;                  // if one connects here later; ensure it's new
+               if( river->accum != NULL ) {
+                       free( river->accum );
+                       river->accum = NULL;
+                       river->state = RS_NEW;          // force realloc if the fd is used again
+               }
+       }
+
        ep = fd2ep_del( ctx, fd );              // find ep and remove the fd from the hash
        if( ep != NULL ) {
-       pthread_mutex_lock( &ep->gate );            // wise to lock this
+               pthread_mutex_lock( &ep->gate );            // wise to lock this
                ep->open = FALSE;
                ep->nn_sock = -1;
-       pthread_mutex_unlock( &ep->gate );
+               pthread_mutex_unlock( &ep->gate );
        }
 
        return SI_RET_OK;
@@ -270,7 +384,7 @@ static void* mt_receive( void* vctx ) {
                return NULL;
        }
 
-       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mt_receive: registering SI95 data callback and waiting\n" );
+       rmr_vlog( RMR_VL_INFO, "mt_receive: pid=%lld  registering SI95 data callback and waiting\n", (long long) pthread_self() );
 
        SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx );                  // our callback called only for "cooked" (tcp) data
        SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx );                   // our callback for handling disconnects