Fixing handling of invalid header size
[ric-plt/lib/rmr.git] / src / rmr / si / src / mt_call_si_static.c
index d6279fd..c3483d8 100644 (file)
@@ -1,8 +1,8 @@
 // : vi ts=4 sw=4 noet:
 /*
 ==================================================================================
-       Copyright (c) 2019 Nokia
-       Copyright (c) 2018-2020 AT&T Intellectual Property.
+       Copyright (c) 2020-2021 Nokia
+       Copyright (c) 2018-2021 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
 /*
        Mnemonic:       mt_call_si static.c
        Abstract:       Static funcitons related to the multi-threaded call feature
-                               which are SI specific.
+                               which are SI specific. The functions here also provide the
+                               message construction functions which build a message that
+                               might be split across multiple "datagrams" received from the
+                               underlying transport.
 
        Author:         E. Scott Daniels
        Date:           20 May 2019
 #include <semaphore.h>
 
 static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
-       static  int warned = 0;
+       static  time_t last_warning = 0;
+       //static        long dcount = 0;
+
        chute_t*        chute;
 
        if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
                rmr_free_msg( mbuf );                                                           // drop if ring is full
-               if( !warned ) {
-                       rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
-                       warned++;
+               //dcount++;
+               ctx->dcount++;
+               ctx->acc_dcount++;
+               if( time( NULL ) > last_warning + 60 ) {                        // issue warning no more frequently than every 60 sec
+                       rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; %d msgs dropped since last warning\n", ctx->dcount );
+                       last_warning = time( NULL );
+                       ctx->dcount = 0;
                }
 
                return;
        }
-
+       ctx->acc_ecount++;
        chute = &ctx->chutes[0];
        sem_post( &chute->barrier );                                                            // tickle the ring monitor
 }
@@ -62,15 +71,29 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd
        chute_t*                chute;
        unsigned int    call_id;        // the id assigned to the call generated message
 
-       if( PARINOID_CHECKS ) {                                                                 // PARINOID mode is slower; off by default
+       if( PARANOID_CHECKS ) {                                                                 // PARANOID mode is slower; off by default
                if( raw_msg == NULL || msg_size <= 0 ) {
                        return;
                }
        }
 
+       // cross-check that header length indicators are not longer than actual message
+       uta_mhdr_t* hdr_check = (uta_mhdr_t*)(((char *) raw_msg) + TP_HDR_LEN);
+        uint32_t header_len=(uint32_t)RMR_HDR_LEN(hdr_check);
+        uint32_t payload_len=(uint32_t)ntohl(hdr_check->plen);
+        if (header_len+TP_HDR_LEN+payload_len> msg_size) {
+                rmr_vlog( RMR_VL_ERR, "Message dropped because %u + %u + %u > %u\n", header_len, payload_len, TP_HDR_LEN, msg_size);
+                free (raw_msg);
+                return;
+        }
+
+
        if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) {
                mbuf->tp_buf = raw_msg;
                mbuf->rts_fd = sender_fd;
+               if( msg_size > ctx->max_ibm + 1024 ) {
+                       mbuf->flags |= MFL_HUGE;                                // prevent caching of oversized buffers
+               }
 
                ref_tpbuf( mbuf, msg_size );                            // point mbuf at bits in the datagram
                hdr = mbuf->header;                                                     // convenience
@@ -90,9 +113,51 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd
                                }
                        }
                }
+       } else {
+               free( raw_msg );
        }
 }
 
+/*
+       Given a buffer, extract the size. We assume the buffer contains one of:
+               <int1><int2><mark>
+               <int1>
+
+       where <int1> is the size in native storage order (v1) and <int2>
+       is the size in network order. If <mark> is present then we assume
+       that <int2> is present and we use that after translating from net
+       byte order. If <mark> is not present, we use <int1>. This allows
+       old versions of RMR to continue to work with new versions that now
+       do the right thing with byte ordering.
+
+       If the receiver of a message is a backlevel RMR, and it uses RTS to
+       return a message, it will only update the old size, but when the
+       message is received back at a new RMR application it will appear that
+       the message came from a new instance.  Therefore, we must compare
+       the old and new sizes and if they are different we must use the old
+       size assuming that this is the case.
+*/
+static inline uint32_t extract_mlen( unsigned char* buf ) {
+       uint32_t        size;           // adjusted (if needed) size for return
+       uint32_t        osize;          // old size
+       uint32_t*       blen;           // length in the buffer to extract
+
+       blen = (uint32_t *) buf;
+       if( *(buf + sizeof( int ) * 2 ) == TP_SZ_MARKER ) {
+               osize = *blen;                                                  // old size
+               size = ntohl( *(blen+1) );                              // pick up the second integer
+               if( osize != size ) {                                   // assume back level return to sender
+                       size = osize;                                           // MUST use old size
+               }
+               if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len converted from net order to: %d\n", size );
+       } else {
+               size = *blen;                                                   // old sender didn't encode size
+               if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len no conversion: %d\n", size );
+       }
+
+       return size;
+}
+
 /*
        This is the callback invoked when tcp data is received. It adds the data
        to the buffer for the connection and if a complete message is received
@@ -100,46 +165,58 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd
 
        Return value indicates only that we handled the buffer and SI should continue
        or that SI should terminate, so on error it's NOT wrong to return "ok".
-
-
-       FUTURE: to do this better, SI needs to support a 'ready to read' callback
-       which allows us to to the actual receive directly into our buffer.
 */
 static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
        uta_ctx_t*              ctx;
        river_t*                river;                  // river associated with the fd passed in
+       unsigned char*  old_accum;              // old accumulator reference should we need to realloc
        int                             bidx = 0;               // transport buffer index
        int                             remain;                 // bytes in transport buf that need to be moved
-       int*                    mlen;                   // pointer to spot in buffer for conversion to int
+       int*                    mlen;                   // pointer to spot in buffer for conversion to int
        int                             need;                   // bytes needed for something
        int                             i;
 
-       if( PARINOID_CHECKS ) {                                                                 // PARINOID mode is slower; off by default
+       if( PARANOID_CHECKS ) {                                                                 // PARANOID mode is slower; off by default
                if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
                        return SI_RET_OK;
                }
-       
-               if( fd >= ctx->nrivers || fd < 0 ) {
-                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
-                       return SI_RET_OK;
-               }
+       } else {
+               ctx = (uta_ctx_t *) vctx;
        }
 
-       if( buflen <= 0 ) {
+       if( buflen <= 0 || fd < 0 ) {                   // no buffer or invalid fd
                return SI_RET_OK;
        }
 
-       river = &ctx->rivers[fd];
+       if( fd >= ctx->nrivers ) {
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
+               if( (river = (river_t *) rmr_sym_pull( ctx->river_hash, (uint64_t) fd )) == NULL ) {
+                       river = (river_t *) malloc( sizeof( *river ) );
+                       memset( river, 0, sizeof( *river ) );
+                       rmr_sym_map( ctx->river_hash, (uint64_t) fd, river );
+                       river->state = RS_NEW;
+               }
+       } else {
+               river = &ctx->rivers[fd];                               // quick index for fd values < MAX_FD
+       }
+
        if( river->state != RS_GOOD ) {                         // all states which aren't good require reset first
                if( river->state == RS_NEW ) {
+                       if( river->accum != NULL ) {
+                               free( river->accum );
+                       }
                        memset( river, 0, sizeof( *river ) );
-                       //river->nbytes = sizeof( char ) * (8 * 1024);
-                       river->nbytes = sizeof( char ) * ctx->max_ibm;                  // max inbound message size
+                       river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024);         // start with what user said would be the "normal" max inbound msg size
                        river->accum = (char *) malloc( river->nbytes );
                        river->ipt = 0;
                } else {
-                       // future -- sync to next marker
-                       river->ipt = 0;                                         // insert point
+                       if( river->state == RS_RESET ) {
+                               // future -- reset not implemented
+                               return SI_RET_OK;
+                       } else {
+                               // future -- sync to next marker
+                               river->ipt = 0;                                         // insert point
+                       }
                }
        }
 
@@ -148,11 +225,10 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
        while( remain > 0 ) {                                                           // until we've done something with all bytes passed in
                if( DEBUG )  rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
 
-               // FIX ME: size in the message  needs to be network byte order  
-               if( river->msg_size <= 0 ) {                            // don't have a size yet
+               if( river->msg_size <= 0 ) {                            // don't have a message length  yet
                                                                                                        // FIX ME: we need a frame indicator to ensure alignment
-                       need = sizeof( int ) - river->ipt;                                                      // what we need from transport buffer
-                       if( need > remain ) {                                                                           // the whole size isn't there
+                       need = TP_SZFIELD_LEN - river->ipt;             // what we need to compute the total message length
+                       if( need > remain ) {                                                                           // the whole message len  information isn't in this transport buf
                                if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
                                memcpy( &river->accum[river->ipt], buf+bidx, remain );                  // grab what we can and depart
                                river->ipt += remain;
@@ -165,40 +241,59 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
                                river->ipt += need;
                                bidx += need;
                                remain -= need;
-                               river->msg_size = *((int *) river->accum);                              
-                               if( DEBUG > 1 ) {
+                               river->msg_size = extract_mlen( river->accum );
+                               if( DEBUG ) {
                                        rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size );
-                                       if( river->msg_size > 500 ) {
-                                               dump_40( river->accum, "msg size way too large accum:"  );
+                                       if( DEBUG > 1 ) {
+                                               dump_40( river->accum, "from accumulator:"  );
+                                               if( river->msg_size > 100 ) {
+                                                       dump_40( river->accum + 50, "from rmr buf:"  );
+                                               }
                                        }
                                }
                        } else {
-                               river->msg_size = *((int *) &buf[bidx]);                                        // snarf directly and copy with rest later
+                               river->msg_size = extract_mlen( &buf[bidx] );                   // pull from buf as it's all there; it will copy later
                        }
+
+                        if( river->msg_size < 0) { // addressing RIC-989
+                                river->state=RS_RESET;
+                               return SI_RET_OK;
+                        }
+
                        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size );
 
-                       if( river->msg_size > river->nbytes ) {                         // message is too big, we will drop it
-                               river->flags |= RF_DROP;
+                       if( river->msg_size > river->nbytes ) {                                         // message bigger than app max size; grab huge buffer
+                               //river->flags |= RF_DROP;                                                              //  uncomment to drop large messages
+                               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "received message is huge (%d) reallocating buffer\n", river->msg_size );
+                               old_accum = river->accum;                                       // need to copy any bytes we snarfed getting the size, so hold
+                               river->nbytes = river->msg_size + 128;                                  // buffer large enough with a bit of fudge room
+                               river->accum = (char *) malloc( river->nbytes );
+                               if( river->ipt > 0 ) {
+                                       memcpy( river->accum, old_accum, river->ipt + 1 );              // copy anything snarfed in getting the sie
+                               }
+
+                               free( old_accum );
                        }
                }
 
-               if( river->msg_size > (river->ipt + remain) ) {                                 // need more than is left in buffer
+               if( river->msg_size > (river->ipt + remain) ) {                                 // need more than is left in receive buffer
                        if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
-                       if( (river->flags & RF_DROP) == 0  ) {
-                               memcpy( &river->accum[river->ipt], buf+bidx, remain );          // buffer and go wait for more
+                       if( (river->flags & RF_DROP) == 0  ) {                                                  // ok to keep this message; copy bytes
+                               memcpy( &river->accum[river->ipt], buf+bidx, remain );          // grab what is in the rcv buffer and go wait for more
                        }
                        river->ipt += remain;
                        remain = 0;
                } else {
                        need = river->msg_size - river->ipt;                                            // bytes from transport we need to have complete message
-                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain );
-                       if( (river->flags & RF_DROP) == 0  ) {
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d flgs=%02x\n", river->msg_size, need, remain, river->flags );
+                       if( (river->flags & RF_DROP) == 0  ) {                                                                  // keeping this message, copy and pass it on
                                memcpy( &river->accum[river->ipt], buf+bidx, need );                            // grab just what is needed (might be more)
-                               buf2mbuf( ctx, river->accum, river->msg_size, fd );                                     // build an RMR mbuf and queue
+                               buf2mbuf( ctx, river->accum, river->nbytes, fd );                                       // build an RMR mbuf and queue
+                               river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024);                         // prevent huge size from persisting
                                river->accum = (char *) malloc( sizeof( char ) *  river->nbytes );      // fresh accumulator
                        } else {
-                               if( !(river->flags & RF_NOTIFIED) ) {   
-                                       rmr_vlog( RMR_VL_WARN, "message larger than max (%d) have arrived on fd %d\n", river->nbytes, fd );
+                               if( !(river->flags & RF_NOTIFIED) ) {                                                           // not keeping huge messages; notify once per stream
+                                       rmr_vlog( RMR_VL_WARN, "message larger than allocated buffer (%d) arrived on fd %d\n", river->nbytes, fd );
                                        river->flags |= RF_NOTIFIED;
                                }
                        }
@@ -206,7 +301,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
                        river->msg_size = -1;
                        river->ipt = 0;
                        bidx += need;
-                       remain -= need; 
+                       remain -= need;
                }
        }
 
@@ -215,8 +310,8 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
 }
 
 /*
-       Callback driven on a disconnect notification. We will attempt to find the related 
-       endpoint via the fd2ep hash maintained in the context. If we find it, then we 
+       Callback driven on a disconnect notification. We will attempt to find the related
+       endpoint via the fd2ep hash maintained in the context. If we find it, then we
        remove it from the hash, and mark the endpoint as closed so that the next attempt
        to send forces a reconnect attempt.
 
@@ -225,15 +320,36 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
 static int mt_disc_cb( void* vctx, int fd ) {
        uta_ctx_t*      ctx;
        endpoint_t*     ep;
+       river_t*        river = NULL;
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
                return SI_RET_OK;
        }
 
-       ep = fd2ep_del( ctx, fd );              // find ep and remote the fd from the hash
-       if( ep ) {
+       if( fd < ctx->nrivers && fd >= 0 ) {
+               river = &ctx->rivers[fd];
+       } else {
+               if( fd > 0 ) {
+                       river = rmr_sym_pull( ctx->river_hash, (uint64_t) fd );
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "river reset on disconnect: fd=%d\n", fd );
+               }
+       }
+
+       if( river != NULL ) {
+               river->state = RS_NEW;                  // if one connects here later; ensure it's new
+               if( river->accum != NULL ) {
+                       free( river->accum );
+                       river->accum = NULL;
+                       river->state = RS_NEW;          // force realloc if the fd is used again
+               }
+       }
+
+       ep = fd2ep_del( ctx, fd );              // find ep and remove the fd from the hash
+       if( ep != NULL ) {
+               pthread_mutex_lock( &ep->gate );            // wise to lock this
                ep->open = FALSE;
                ep->nn_sock = -1;
+               pthread_mutex_unlock( &ep->gate );
        }
 
        return SI_RET_OK;
@@ -268,7 +384,7 @@ static void* mt_receive( void* vctx ) {
                return NULL;
        }
 
-       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mt_receive: registering SI95 data callback and waiting\n" );
+       rmr_vlog( RMR_VL_INFO, "mt_receive: pid=%lld  registering SI95 data callback and waiting\n", (long long) pthread_self() );
 
        SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx );                  // our callback called only for "cooked" (tcp) data
        SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx );                   // our callback for handling disconnects