Beef up unit tests for SI95 code
[ric-plt/lib/rmr.git] / src / rmr / si / src / mt_call_si_static.c
index 6e2e8aa..73d776a 100644 (file)
@@ -1,8 +1,8 @@
 // : vi ts=4 sw=4 noet:
 /*
 ==================================================================================
 // : vi ts=4 sw=4 noet:
 /*
 ==================================================================================
-       Copyright (c) 2019 Nokia
-       Copyright (c) 2018-2020 AT&T Intellectual Property.
+       Copyright (c) 2020-2021 Nokia
+       Copyright (c) 2018-2021 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
 /*
        Mnemonic:       mt_call_si static.c
        Abstract:       Static funcitons related to the multi-threaded call feature
 /*
        Mnemonic:       mt_call_si static.c
        Abstract:       Static funcitons related to the multi-threaded call feature
-                               which are SI specific.
+                               which are SI specific. The functions here also provide the
+                               message construction functions which build a message that
+                               might be split across multiple "datagrams" received from the
+                               underlying transport.
 
        Author:         E. Scott Daniels
        Date:           20 May 2019
 
        Author:         E. Scott Daniels
        Date:           20 May 2019
 #include <semaphore.h>
 
 static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
 #include <semaphore.h>
 
 static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
-       static  int warned = 0;
+       static  time_t last_warning = 0;
+       //static        long dcount = 0;
+
        chute_t*        chute;
 
        if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
                rmr_free_msg( mbuf );                                                           // drop if ring is full
        chute_t*        chute;
 
        if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
                rmr_free_msg( mbuf );                                                           // drop if ring is full
-               if( !warned ) {
-                       fprintf( stderr, "[WARN] rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
-                       warned++;
+               //dcount++;
+               ctx->dcount++;
+               if( time( NULL ) > last_warning + 60 ) {                        // issue warning no more frequently than every 60 sec
+                       rmr_vlog( RMR_VL_ERR, "rmr_mt_receive: application is not receiving fast enough; %d msgs dropped since last warning\n", ctx->dcount );
+                       last_warning = time( NULL );
+                       ctx->dcount = 0;
                }
 
                return;
                }
 
                return;
@@ -54,8 +62,6 @@ static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
        call ref to point to all of the various bits and set real len etc,
        then we queue it.  Raw_msg is expected to include the transport goo
        placed in front of the RMR header and payload.
        call ref to point to all of the various bits and set real len etc,
        then we queue it.  Raw_msg is expected to include the transport goo
        placed in front of the RMR header and payload.
-
-       done -- FIX ME?? can we eliminate the buffer copy here?
 */
 static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd ) {
        rmr_mbuf_t*             mbuf;
 */
 static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd ) {
        rmr_mbuf_t*             mbuf;
@@ -64,23 +70,18 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd
        chute_t*                chute;
        unsigned int    call_id;        // the id assigned to the call generated message
 
        chute_t*                chute;
        unsigned int    call_id;        // the id assigned to the call generated message
 
-       if( PARINOID_CHECKS ) {                                                                 // PARINOID mode is slower; off by default
+       if( PARANOID_CHECKS ) {                                                                 // PARANOID mode is slower; off by default
                if( raw_msg == NULL || msg_size <= 0 ) {
                        return;
                }
        }
 
                if( raw_msg == NULL || msg_size <= 0 ) {
                        return;
                }
        }
 
-/*
-       if( (mbuf = (rmr_mbuf_t *) malloc( sizeof( *mbuf ))) != NULL ) {                // alloc mbuf and point at various bits of payload
-               memset( mbuf, 0, sizeof( *mbuf ) );
-               mbuf->tp_buf = raw_msg;
-               mbuf->ring = ctx->zcb_mring;
-*/
        if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) {
                mbuf->tp_buf = raw_msg;
                mbuf->rts_fd = sender_fd;
        if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) {
                mbuf->tp_buf = raw_msg;
                mbuf->rts_fd = sender_fd;
-
-               // eliminated :)   memcpy( mbuf->tp_buf, river->accum + offset, river->msg_size );
+               if( msg_size > ctx->max_ibm + 1024 ) {
+                       mbuf->flags |= MFL_HUGE;                                // prevent caching of oversized buffers
+               }
 
                ref_tpbuf( mbuf, msg_size );                            // point mbuf at bits in the datagram
                hdr = mbuf->header;                                                     // convenience
 
                ref_tpbuf( mbuf, msg_size );                            // point mbuf at bits in the datagram
                hdr = mbuf->header;                                                     // convenience
@@ -100,9 +101,51 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd
                                }
                        }
                }
                                }
                        }
                }
+       } else {
+               free( raw_msg );
        }
 }
 
        }
 }
 
+/*
+       Given a buffer, extract the size. We assume the buffer contains one of:
+               <int1><int2><mark>
+               <int1>
+
+       where <int1> is the size in native storage order (v1) and <int2>
+       is the size in network order. If <mark> is present then we assume
+       that <int2> is present and we use that after translating from net
+       byte order. If <mark> is not present, we use <int1>. This allows
+       old versions of RMR to continue to work with new versions that now
+       do the right thing with byte ordering.
+
+       If the receiver of a message is a backlevel RMR, and it uses RTS to
+       return a message, it will only update the old size, but when the
+       message is received back at a new RMR application it will appear that
+       the message came from a new instance.  Therefore, we must compare
+       the old and new sizes and if they are different we must use the old
+       size assuming that this is the case.
+*/
+static inline uint32_t extract_mlen( unsigned char* buf ) {
+       uint32_t        size;           // adjusted (if needed) size for return
+       uint32_t        osize;          // old size
+       uint32_t*       blen;           // length in the buffer to extract
+
+       blen = (uint32_t *) buf;
+       if( *(buf + sizeof( int ) * 2 ) == TP_SZ_MARKER ) {
+               osize = *blen;                                                  // old size
+               size = ntohl( *(blen+1) );                              // pick up the second integer
+               if( osize != size ) {                                   // assume back level return to sender
+                       size = osize;                                           // MUST use old size
+               }
+               if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len converted from net order to: %d\n", size );
+       } else {
+               size = *blen;                                                   // old sender didn't encode size
+               if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len no conversion: %d\n", size );
+       }
+
+       return size;
+}
+
 /*
        This is the callback invoked when tcp data is received. It adds the data
        to the buffer for the connection and if a complete message is received
 /*
        This is the callback invoked when tcp data is received. It adds the data
        to the buffer for the connection and if a complete message is received
@@ -110,42 +153,48 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd
 
        Return value indicates only that we handled the buffer and SI should continue
        or that SI should terminate, so on error it's NOT wrong to return "ok".
 
        Return value indicates only that we handled the buffer and SI should continue
        or that SI should terminate, so on error it's NOT wrong to return "ok".
-
-
-       FUTURE: to do this better, SI needs to support a 'ready to read' callback
-       which allows us to to the actual receive directly into our buffer.
 */
 static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
        uta_ctx_t*              ctx;
        river_t*                river;                  // river associated with the fd passed in
 */
 static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
        uta_ctx_t*              ctx;
        river_t*                river;                  // river associated with the fd passed in
+       unsigned char*  old_accum;              // old accumulator reference should we need to realloc
        int                             bidx = 0;               // transport buffer index
        int                             remain;                 // bytes in transport buf that need to be moved
        int                             bidx = 0;               // transport buffer index
        int                             remain;                 // bytes in transport buf that need to be moved
-       int*                    mlen;                   // pointer to spot in buffer for conversion to int
+       int*                    mlen;                   // pointer to spot in buffer for conversion to int
        int                             need;                   // bytes needed for something
        int                             i;
 
        int                             need;                   // bytes needed for something
        int                             i;
 
-       // for speed these checks should be enabled only in debug mode and assume we always get a good context
-       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
-               return SI_RET_OK;
+       if( PARANOID_CHECKS ) {                                                                 // PARANOID mode is slower; off by default
+               if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+                       return SI_RET_OK;
+               }
+       } else {
+               ctx = (uta_ctx_t *) vctx;
        }
 
        }
 
-       if( fd >= ctx->nrivers || fd < 0 ) {
-               if( DEBUG ) fprintf( stderr, "[DBUG] callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
+       if( buflen <= 0 || fd < 0 ) {                   // no buffer or invalid fd
                return SI_RET_OK;
        }
 
                return SI_RET_OK;
        }
 
-       // -------- end debug checks -----------------
-
-       if( buflen <= 0 ) {
-               return SI_RET_OK;
+       if( fd >= ctx->nrivers ) {
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
+               if( (river = (river_t *) rmr_sym_pull( ctx->river_hash, (uint64_t) fd )) == NULL ) {
+                       river = (river_t *) malloc( sizeof( *river ) );
+                       memset( river, 0, sizeof( *river ) );
+                       rmr_sym_map( ctx->river_hash, (uint64_t) fd, river );
+                       river->state = RS_NEW;
+               }
+       } else {
+               river = &ctx->rivers[fd];                               // quick index for fd values < MAX_FD
        }
 
        }
 
-       river = &ctx->rivers[fd];
        if( river->state != RS_GOOD ) {                         // all states which aren't good require reset first
                if( river->state == RS_NEW ) {
        if( river->state != RS_GOOD ) {                         // all states which aren't good require reset first
                if( river->state == RS_NEW ) {
+                       if( river->accum != NULL ) {
+                               free( river->accum );
+                       }
                        memset( river, 0, sizeof( *river ) );
                        memset( river, 0, sizeof( *river ) );
-                       //river->nbytes = sizeof( char ) * (8 * 1024);
-                       river->nbytes = sizeof( char ) * ctx->max_ibm;                  // max inbound message size
+                       river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024);         // start with what user said would be the "normal" max inbound msg size
                        river->accum = (char *) malloc( river->nbytes );
                        river->ipt = 0;
                } else {
                        river->accum = (char *) malloc( river->nbytes );
                        river->ipt = 0;
                } else {
@@ -155,28 +204,18 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
        }
 
        river->state = RS_GOOD;
        }
 
        river->state = RS_GOOD;
-
-/*
-fprintf( stderr, "\n>>>>> data callback for %d bytes from %d\n", buflen, fd );
-for( i = 0; i < 40; i++ ) {
-fprintf( stderr, "%02x ", (unsigned char) *(buf+i) );
-}
-fprintf( stderr, "\n" );
-*/
-
        remain = buflen;
        while( remain > 0 ) {                                                           // until we've done something with all bytes passed in
        remain = buflen;
        while( remain > 0 ) {                                                           // until we've done something with all bytes passed in
-               if( DEBUG )  fprintf( stderr, "[DBUG] ====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
+               if( DEBUG )  rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
 
 
-               // FIX ME: size in the message  needs to be network byte order  
-               if( river->msg_size <= 0 ) {                            // don't have a size yet
+               if( river->msg_size <= 0 ) {                            // don't have a message length  yet
                                                                                                        // FIX ME: we need a frame indicator to ensure alignment
                                                                                                        // FIX ME: we need a frame indicator to ensure alignment
-                       need = sizeof( int ) - river->ipt;                                                      // what we need from transport buffer
-                       if( need > remain ) {                                                                           // the whole size isn't there
-                               if( DEBUG > 1 ) fprintf( stderr, "[DBUG] need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
+                       need = TP_SZFIELD_LEN - river->ipt;             // what we need to compute the total message length
+                       if( need > remain ) {                                                                           // the whole message len  information isn't in this transport buf
+                               if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
                                memcpy( &river->accum[river->ipt], buf+bidx, remain );                  // grab what we can and depart
                                river->ipt += remain;
                                memcpy( &river->accum[river->ipt], buf+bidx, remain );                  // grab what we can and depart
                                river->ipt += remain;
-                               if( DEBUG > 1 ) fprintf( stderr, "[DBUG] data callback not enough bytes to compute size; need=%d have=%d\n", need, remain );
+                               if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough bytes to compute size; need=%d have=%d\n", need, remain );
                                return SI_RET_OK;
                        }
 
                                return SI_RET_OK;
                        }
 
@@ -185,39 +224,111 @@ fprintf( stderr, "\n" );
                                river->ipt += need;
                                bidx += need;
                                remain -= need;
                                river->ipt += need;
                                bidx += need;
                                remain -= need;
-                               river->msg_size = *((int *) river->accum);                              
-                               if( DEBUG > 1 ) {
-                                       fprintf( stderr, "[DBUG] size from accumulator =%d\n", river->msg_size );
-                                       if( river->msg_size > 500 ) {
-                                               dump_40( river->accum, "msg size way too large accum:"  );
+                               river->msg_size = extract_mlen( river->accum );
+                               if( DEBUG ) {
+                                       rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size );
+                                       if( DEBUG > 1 ) {
+                                               dump_40( river->accum, "from accumulator:"  );
+                                               if( river->msg_size > 100 ) {
+                                                       dump_40( river->accum + 50, "from rmr buf:"  );
+                                               }
                                        }
                                }
                        } else {
                                        }
                                }
                        } else {
-                               river->msg_size = *((int *) &buf[bidx]);                                        // snarf directly and copy with rest later
+                               river->msg_size = extract_mlen( &buf[bidx] );                   // pull from buf as it's all there; it will copy later
+                       }
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size );
+
+                       if( river->msg_size > river->nbytes ) {                                         // message bigger than app max size; grab huge buffer
+                               //river->flags |= RF_DROP;                                                              //  uncomment to drop large messages
+                               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "received message is huge (%d) reallocating buffer\n", river->msg_size );
+                               old_accum = river->accum;                                       // need to copy any bytes we snarfed getting the size, so hold
+                               river->nbytes = river->msg_size + 128;                                  // buffer large enough with a bit of fudge room
+                               river->accum = (char *) malloc( river->nbytes );
+                               if( river->ipt > 0 ) {
+                                       memcpy( river->accum, old_accum, river->ipt + 1 );              // copy anything snarfed in getting the sie
+                               }
+
+                               free( old_accum );
                        }
                        }
-                       if( DEBUG ) fprintf( stderr, "[DBUG] data callback setting msg size: %d\n", river->msg_size );
                }
 
                }
 
-               if( river->msg_size > (river->ipt + remain) ) {                                 // need more than is left in buffer
-                       if( DEBUG > 1 ) fprintf( stderr, "[DBUG] data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
-                       memcpy( &river->accum[river->ipt], buf+bidx, remain );          // buffer and go wait for more
+               if( river->msg_size > (river->ipt + remain) ) {                                 // need more than is left in receive buffer
+                       if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "data callback not enough in the buffer size=%d remain=%d\n", river->msg_size, remain );
+                       if( (river->flags & RF_DROP) == 0  ) {                                                  // ok to keep this message; copy bytes
+                               memcpy( &river->accum[river->ipt], buf+bidx, remain );          // grab what is in the rcv buffer and go wait for more
+                       }
                        river->ipt += remain;
                        remain = 0;
                } else {
                        need = river->msg_size - river->ipt;                                            // bytes from transport we need to have complete message
                        river->ipt += remain;
                        remain = 0;
                } else {
                        need = river->msg_size - river->ipt;                                            // bytes from transport we need to have complete message
-                       if( DEBUG ) fprintf( stderr, "[DBUG] data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain );
-                       memcpy( &river->accum[river->ipt], buf+bidx, need );            // grab just what is needed (might be more)
-                       buf2mbuf( ctx, river->accum, river->msg_size, fd );                             // build an RMR mbuf and queue
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d flgs=%02x\n", river->msg_size, need, remain, river->flags );
+                       if( (river->flags & RF_DROP) == 0  ) {                                                                  // keeping this message, copy and pass it on
+                               memcpy( &river->accum[river->ipt], buf+bidx, need );                            // grab just what is needed (might be more)
+                               buf2mbuf( ctx, river->accum, river->nbytes, fd );                                       // build an RMR mbuf and queue
+                               river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024);                         // prevent huge size from persisting
+                               river->accum = (char *) malloc( sizeof( char ) *  river->nbytes );      // fresh accumulator
+                       } else {
+                               if( !(river->flags & RF_NOTIFIED) ) {                                                           // not keeping huge messages; notify once per stream
+                                       rmr_vlog( RMR_VL_WARN, "message larger than allocated buffer (%d) arrived on fd %d\n", river->nbytes, fd );
+                                       river->flags |= RF_NOTIFIED;
+                               }
+                       }
 
 
-                       river->accum = (char *) malloc( sizeof( char ) *  river->nbytes );      // fresh accumulator
                        river->msg_size = -1;
                        river->ipt = 0;
                        bidx += need;
                        river->msg_size = -1;
                        river->ipt = 0;
                        bidx += need;
-                       remain -= need; 
+                       remain -= need;
                }
        }
 
                }
        }
 
-       if( DEBUG >2 ) fprintf( stderr, "[DBUG] ##### data callback finished\n" );
+       if( DEBUG >2 ) rmr_vlog( RMR_VL_DEBUG, "##### data callback finished\n" );
+       return SI_RET_OK;
+}
+
+/*
+       Callback driven on a disconnect notification. We will attempt to find the related
+       endpoint via the fd2ep hash maintained in the context. If we find it, then we
+       remove it from the hash, and mark the endpoint as closed so that the next attempt
+       to send forces a reconnect attempt.
+
+       Future: put the ep on a queue to automatically attempt to reconnect.
+*/
+static int mt_disc_cb( void* vctx, int fd ) {
+       uta_ctx_t*      ctx;
+       endpoint_t*     ep;
+       river_t*        river = NULL;
+
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               return SI_RET_OK;
+       }
+
+       if( fd < ctx->nrivers && fd >= 0 ) {
+               river = &ctx->rivers[fd];
+       } else {
+               if( fd > 0 ) {
+                       river = rmr_sym_pull( ctx->river_hash, (uint64_t) fd );
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "river reset on disconnect: fd=%d\n", fd );
+               }
+       }
+
+       if( river != NULL ) {
+               river->state = RS_NEW;                  // if one connects here later; ensure it's new
+               if( river->accum != NULL ) {
+                       free( river->accum );
+                       river->accum = NULL;
+                       river->state = RS_NEW;          // force realloc if the fd is used again
+               }
+       }
+
+       ep = fd2ep_del( ctx, fd );              // find ep and remove the fd from the hash
+       if( ep != NULL ) {
+               pthread_mutex_lock( &ep->gate );            // wise to lock this
+               ep->open = FALSE;
+               ep->nn_sock = -1;
+               pthread_mutex_unlock( &ep->gate );
+       }
+
        return SI_RET_OK;
 }
 
        return SI_RET_OK;
 }
 
@@ -226,7 +337,7 @@ fprintf( stderr, "\n" );
        This is expected to execute in a separate thread. It is responsible for
        _all_ receives and queues them on the appropriate ring, or chute.
        It does this by registering the callback function above with the SI world
        This is expected to execute in a separate thread. It is responsible for
        _all_ receives and queues them on the appropriate ring, or chute.
        It does this by registering the callback function above with the SI world
-       and then caling SIwait() to drive the callback when data has arrived.
+       and then calling SIwait() to drive the callback when data has arrived.
 
 
        The "state" of the message is checked which determines where the message
 
 
        The "state" of the message is checked which determines where the message
@@ -246,12 +357,15 @@ static void* mt_receive( void* vctx ) {
        uta_ctx_t*      ctx;
 
        if( (ctx = (uta_ctx_t*) vctx) == NULL ) {
        uta_ctx_t*      ctx;
 
        if( (ctx = (uta_ctx_t*) vctx) == NULL ) {
-               fprintf( stderr, "[CRI], unable to start mt-receive: ctx was nil\n" );
+               rmr_vlog( RMR_VL_CRIT, "unable to start mt-receive: ctx was nil\n" );
                return NULL;
        }
 
                return NULL;
        }
 
-       if( DEBUG ) fprintf( stderr, "[DBUG] mt_receive: registering SI95 data callback and waiting\n" );
+       rmr_vlog( RMR_VL_INFO, "mt_receive: pid=%lld  registering SI95 data callback and waiting\n", (long long) pthread_self() );
+
        SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx );                  // our callback called only for "cooked" (tcp) data
        SIcbreg( ctx->si_ctx, SI_CB_CDATA, mt_data_cb, vctx );                  // our callback called only for "cooked" (tcp) data
+       SIcbreg( ctx->si_ctx, SI_CB_DISC, mt_disc_cb, vctx );                   // our callback for handling disconnects
+
        SIwait( ctx->si_ctx );
 
        return NULL;            // keep the compiler happy though never can be reached as SI wait doesn't return
        SIwait( ctx->si_ctx );
 
        return NULL;            // keep the compiler happy though never can be reached as SI wait doesn't return