Add safe connect, fix possible seg fault in RTC
[ric-plt/lib/rmr.git] / src / rmr / si / src / mt_call_si_static.c
index d6279fd..2d422bb 100644 (file)
@@ -1,7 +1,7 @@
 // : vi ts=4 sw=4 noet:
 /*
 ==================================================================================
-       Copyright (c) 2019 Nokia
+       Copyright (c) 2020 Nokia
        Copyright (c) 2018-2020 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
 /*
        Mnemonic:       mt_call_si static.c
        Abstract:       Static funcitons related to the multi-threaded call feature
-                               which are SI specific.
+                               which are SI specific. The functions here also provide the
+                               message construction functions which build a message that
+                               might be split across multiple "datagrams" received from the
+                               underlying transport.
 
        Author:         E. Scott Daniels
        Date:           20 May 2019
@@ -62,7 +65,7 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd
        chute_t*                chute;
        unsigned int    call_id;        // the id assigned to the call generated message
 
-       if( PARINOID_CHECKS ) {                                                                 // PARINOID mode is slower; off by default
+       if( PARANOID_CHECKS ) {                                                                 // PARANOID mode is slower; off by default
                if( raw_msg == NULL || msg_size <= 0 ) {
                        return;
                }
@@ -71,6 +74,9 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd
        if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) {
                mbuf->tp_buf = raw_msg;
                mbuf->rts_fd = sender_fd;
+               if( msg_size > ctx->max_ibm + 1024 ) {
+                       mbuf->flags |= MFL_HUGE;                                // prevent caching of oversized buffers
+               }
 
                ref_tpbuf( mbuf, msg_size );                            // point mbuf at bits in the datagram
                hdr = mbuf->header;                                                     // convenience
@@ -93,6 +99,34 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd
        }
 }
 
+/*
+       Given a buffer, extract the size. We assume the buffer contains one of:
+               <int1><int2><mark>
+               <int1>
+
+       where <int1> is the size in native storage order (v1) and <int2>
+       is the size in network order. If <mark> is present then we assume
+       that <int2> is present and we use that after translating from net
+       byte order. If <mark> is not present, we use <int1>. This allows
+       old versions of RMR to continue to work with new versions that now
+       do the right thing with byte ordering.
+*/
+static inline uint32_t extract_mlen( unsigned char* buf ) {
+       uint32_t        size;           // adjusted (if needed) size for return
+       uint32_t*       blen;           // length in the buffer to extract
+
+       blen = (uint32_t *) buf;
+       if( *(buf + sizeof( int ) * 2 ) == TP_SZ_MARKER ) {
+               size = ntohl( *(blen+1) );                              // pick up the second integer
+               if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len converted from net order to: %d\n", size );
+       } else {
+               size = *blen;                                                   // old sender didn't encode size
+               if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len no conversion: %d\n", size );
+       }
+
+       return size;
+}
+
 /*
        This is the callback invoked when tcp data is received. It adds the data
        to the buffer for the connection and if a complete message is received
@@ -100,29 +134,28 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd
 
        Return value indicates only that we handled the buffer and SI should continue
        or that SI should terminate, so on error it's NOT wrong to return "ok".
-
-
-       FUTURE: to do this better, SI needs to support a 'ready to read' callback
-       which allows us to to the actual receive directly into our buffer.
 */
 static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
        uta_ctx_t*              ctx;
        river_t*                river;                  // river associated with the fd passed in
+       unsigned char*  old_accum;              // old accumulator reference should we need to realloc
        int                             bidx = 0;               // transport buffer index
        int                             remain;                 // bytes in transport buf that need to be moved
        int*                    mlen;                   // pointer to spot in buffer for conversion to int
        int                             need;                   // bytes needed for something
        int                             i;
 
-       if( PARINOID_CHECKS ) {                                                                 // PARINOID mode is slower; off by default
+       if( PARANOID_CHECKS ) {                                                                 // PARANOID mode is slower; off by default
                if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
                        return SI_RET_OK;
                }
-       
+
                if( fd >= ctx->nrivers || fd < 0 ) {
                        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
                        return SI_RET_OK;
                }
+       } else {
+               ctx = (uta_ctx_t *) vctx;
        }
 
        if( buflen <= 0 ) {
@@ -133,8 +166,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
        if( river->state != RS_GOOD ) {                         // all states which aren't good require reset first
                if( river->state == RS_NEW ) {
                        memset( river, 0, sizeof( *river ) );
-                       //river->nbytes = sizeof( char ) * (8 * 1024);
-                       river->nbytes = sizeof( char ) * ctx->max_ibm;                  // max inbound message size
+                       river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024);         // max inbound message size
                        river->accum = (char *) malloc( river->nbytes );
                        river->ipt = 0;
                } else {
@@ -148,10 +180,9 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
        while( remain > 0 ) {                                                           // until we've done something with all bytes passed in
                if( DEBUG )  rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
 
-               // FIX ME: size in the message  needs to be network byte order  
                if( river->msg_size <= 0 ) {                            // don't have a size yet
                                                                                                        // FIX ME: we need a frame indicator to ensure alignment
-                       need = sizeof( int ) - river->ipt;                                                      // what we need from transport buffer
+                       need = TP_SZFIELD_LEN - river->ipt;             // what we need to compute length
                        if( need > remain ) {                                                                           // the whole size isn't there
                                if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
                                memcpy( &river->accum[river->ipt], buf+bidx, remain );                  // grab what we can and depart
@@ -165,20 +196,32 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
                                river->ipt += need;
                                bidx += need;
                                remain -= need;
-                               river->msg_size = *((int *) river->accum);                              
-                               if( DEBUG > 1 ) {
+                               river->msg_size = extract_mlen( river->accum );
+                               if( DEBUG ) {
                                        rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size );
-                                       if( river->msg_size > 500 ) {
-                                               dump_40( river->accum, "msg size way too large accum:"  );
+                                       if( DEBUG > 1 ) {
+                                               dump_40( river->accum, "from accumulator:"  );
+                                               if( river->msg_size > 100 ) {
+                                                       dump_40( river->accum + 50, "from rmr buf:"  );
+                                               }
                                        }
                                }
                        } else {
-                               river->msg_size = *((int *) &buf[bidx]);                                        // snarf directly and copy with rest later
+                               river->msg_size = extract_mlen( &buf[bidx] );                   // pull from buf as it's all there; it will copy later
                        }
                        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size );
 
-                       if( river->msg_size > river->nbytes ) {                         // message is too big, we will drop it
-                               river->flags |= RF_DROP;
+                       if( river->msg_size > river->nbytes ) {                                         // message bigger than app max size; grab huge buffer
+                               //river->flags |= RF_DROP;
+                               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "received message is huge (%d) reallocating buffer\n", river->msg_size );
+                               old_accum = river->accum;                                       // need to copy any bytes we snarfed getting the size, so hold
+                               river->nbytes = river->msg_size + 128;                                  // buffer large enough with a bit of fudge room
+                               river->accum = (char *) malloc( river->nbytes );
+                               if( river->ipt > 0 ) {
+                                       memcpy( river->accum, old_accum, river->ipt + 1 );              // copy anything snarfed in getting the sie
+                               }
+
+                               free( old_accum );
                        }
                }
 
@@ -194,10 +237,11 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
                        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain );
                        if( (river->flags & RF_DROP) == 0  ) {
                                memcpy( &river->accum[river->ipt], buf+bidx, need );                            // grab just what is needed (might be more)
-                               buf2mbuf( ctx, river->accum, river->msg_size, fd );                                     // build an RMR mbuf and queue
+                               buf2mbuf( ctx, river->accum, river->nbytes, fd );                                       // build an RMR mbuf and queue
+                               river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024);                         // prevent huge size from persisting
                                river->accum = (char *) malloc( sizeof( char ) *  river->nbytes );      // fresh accumulator
                        } else {
-                               if( !(river->flags & RF_NOTIFIED) ) {   
+                               if( !(river->flags & RF_NOTIFIED) ) {
                                        rmr_vlog( RMR_VL_WARN, "message larger than max (%d) have arrived on fd %d\n", river->nbytes, fd );
                                        river->flags |= RF_NOTIFIED;
                                }
@@ -206,7 +250,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
                        river->msg_size = -1;
                        river->ipt = 0;
                        bidx += need;
-                       remain -= need; 
+                       remain -= need;
                }
        }
 
@@ -215,8 +259,8 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
 }
 
 /*
-       Callback driven on a disconnect notification. We will attempt to find the related 
-       endpoint via the fd2ep hash maintained in the context. If we find it, then we 
+       Callback driven on a disconnect notification. We will attempt to find the related
+       endpoint via the fd2ep hash maintained in the context. If we find it, then we
        remove it from the hash, and mark the endpoint as closed so that the next attempt
        to send forces a reconnect attempt.
 
@@ -230,10 +274,12 @@ static int mt_disc_cb( void* vctx, int fd ) {
                return SI_RET_OK;
        }
 
-       ep = fd2ep_del( ctx, fd );              // find ep and remote the fd from the hash
-       if( ep ) {
+       ep = fd2ep_del( ctx, fd );              // find ep and remove the fd from the hash
+       if( ep != NULL ) {
+       pthread_mutex_lock( &ep->gate );            // wise to lock this
                ep->open = FALSE;
                ep->nn_sock = -1;
+       pthread_mutex_unlock( &ep->gate );
        }
 
        return SI_RET_OK;