Correct bug identified in static analysis
[ric-plt/lib/rmr.git] / src / rmr / si / src / sr_si_static.c
index 50aaeb8..4f86728 100644 (file)
@@ -1,8 +1,8 @@
 // vim: ts=4 sw=4 noet :
 /*
 ==================================================================================
-       Copyright (c) 2019-2020 Nokia
-       Copyright (c) 2018-2020 AT&T Intellectual Property.
+       Copyright (c) 2019-2021 Nokia
+       Copyright (c) 2018-2021 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
@@ -39,13 +39,13 @@ static void dump_n( char *p, char* label, int n ) {
 
 
        if( label ) {
-               fprintf( stderr, ">>>>> %s p=%p %d bytes\n", label, p, n );
+               fprintf( stderr, "[DUMP] %s p=%p %d bytes\n", label, p, n );
        }
-       
+
        rows = (n/16) + ((n % 16) ? 1 : 0);
-       
+
        for( j = 0; j < rows; j++ ) {
-               fprintf( stderr, "%04x: ", j * 16 );
+               fprintf( stderr, "[DUMP] %04x: ", j * 16 );
 
                for( i = 0; t < n && i < 16; i++, t++ ) {
                        fprintf( stderr, "%02x ", (unsigned char) *p );
@@ -59,7 +59,7 @@ static void dump_n( char *p, char* label, int n ) {
        backwards compatability.
 */
 static void dump_40( char *p, char* label ) {
-       dump_n( p, label, 40 ); 
+       dump_n( p, label, 40 );
 }
 
 /*
@@ -70,7 +70,7 @@ static void dump_40( char *p, char* label ) {
 
        The addition of the connection shut error code to the switch requires
        that the NNG version at commit e618abf8f3db2a94269a (or after) be
-       used for compiling RMR. 
+       used for compiling RMR.
 */
 static inline int xlate_si_state( int state, int def_state ) {
 
@@ -110,6 +110,26 @@ static inline int xlate_si_state( int state, int def_state ) {
        return state;
 }
 
+/*
+       Given a message size and a buffer (assumed to be TP_SZFIELD_LEN or larger)
+       this will put in the size such that it is compatable with old versions
+       of RMR (that expect the message size to not be in network byte order)
+       and with new versions that do. See extract function in mt_call_si_static.c
+       for details on what ends up in the buffer.
+*/
+static inline void insert_mlen( uint32_t len, char* buf ) {
+       uint32_t* blen;                                                 // pointer into buffer where we'll add the len
+
+       blen = (uint32_t *) buf;                                // old systems expect an unconverted integer
+       *blen = len;
+
+       blen++;
+       *blen = htonl( len );                                   // new systems want a converted integer
+
+       memset( &buf[TP_SZFIELD_LEN], 0, 4 );   // clear to prevent future conversion issues
+       buf[TP_SZFIELD_LEN-1] = TP_SZ_MARKER;   // marker to flag this is generated by a new message
+}
+
 /*
        Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
        a new message struct as well. Size is the size of the zc buffer to allocate (not
@@ -147,6 +167,7 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
                        msg->alloc_len = 0;                             // force tp_buffer realloc below
                        if( msg->tp_buf ) {
                                free( msg->tp_buf );
+                               msg->tp_buf = NULL;
                        }
                } else {
                        mlen = msg->alloc_len;                                                  // msg given, allocate the same size as before
@@ -160,12 +181,13 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
                abort( );                                                                                       // toss out a core file for this
        }
 
-/*
-       memset( msg->tp_buf, 0, mlen );    // NOT for production (debug only)   valgrind will complain about uninitalised use if we don't set
-       memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", TP_HDR_LEN );                // NOT for production -- debugging eyecatcher
-*/
-       alen = (int *) msg->tp_buf;
-       *alen = mlen;                                           // FIX ME: need a stuct to go in these first bytes, not just dummy len
+       if( DEBUG ) {
+               // for speed we don't do this in production; for testing valgrind will complain about uninitialised use if not set
+               memset( msg->tp_buf, 0, mlen );
+               memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!==", 34 );                // do NOT use a $ in this string!
+       }
+
+       insert_mlen( (uint32_t) mlen, msg->tp_buf );                    // this will likely be overwriten on send to shirnk
 
        msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
        memset( msg->header, 0, sizeof( uta_mhdr_t ) );                         // ensure no junk in the header area
@@ -185,10 +207,10 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
        msg->payload = PAYLOAD_ADDR( hdr );                                             // point to payload (past all header junk)
        msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
        msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
-       msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
+       msg->flags = MFL_ZEROCOPY;                                                              // this is a zerocopy sendable message
        msg->ring = ctx->zcb_mring;                                                             // original msg_free() api doesn't get context so must dup on eaach :(
-       strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
-       strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
+       zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
+       zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
 
        if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
 
@@ -259,27 +281,14 @@ static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
        msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
 
        v1hdr = (uta_v1mhdr_t *) msg->header;                                   // v1 will always allow us to suss out the version
-
-       if( v1hdr->rmr_ver == 1 ) {                     // bug in verion 1 didn't encode the version in network byte order
-               ver = 1;
-               v1hdr->rmr_ver = htonl( 1 );            // save it correctly in case we clone the message
-       } else {
-               ver = ntohl( v1hdr->rmr_ver );
-       }
+       ver = ntohl( v1hdr->rmr_ver );
 
        switch( ver ) {
-               case 1:
-                       msg->len = ntohl( v1hdr->plen );                                                // length sender says is in the payload (received length could be larger)
-                       msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
-                       msg->payload = msg->header + sizeof( uta_v1mhdr_t );    // point past header to payload (single buffer allocation above)
+               // version 1 is deprecated  case 1:
+               // version 2 is deprecated  case 2:
 
-                       msg->xaction = &v1hdr->xid[0];                                                  // point at transaction id in header area
-                       msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
-                       msg->mtype = ntohl( v1hdr->mtype );                                             // capture and convert from network order to local order
-                       msg->sub_id = UNSET_SUBID;                                                              // type 1 messages didn't have this
-                       msg->state = RMR_OK;
-                       hlen = sizeof( uta_v1mhdr_t );
-                       break;
+               case 3:
+                       // fall-through
 
                default:                                                                                                        // current version always lands here
                        hdr = (uta_mhdr_t *) msg->header;
@@ -329,11 +338,11 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
        nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
        v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
        switch( ntohl( v1hdr->rmr_ver ) ) {
-               case 1:
-                       memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
-                       nm->payload = (void *) v1hdr + sizeof( *v1hdr );
-                       break;
+               // version 1 deprecated case 1:
+               // version 2 deprecated 
 
+               case 3:
+                       // fall-through
                default:                                                                                        // current message always caught  here
                        hdr = nm->header;
                        memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header ));  // copy complete header, trace and other data
@@ -389,19 +398,21 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
                rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
                exit( 1 );
        }
-       memset( nm->tp_buf, 0, tpb_len );
-       memcpy( nm->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 );         // DEBUGGING
-       alen = (int *) nm->tp_buf;
-       *alen = tpb_len;                                                // FIX ME: need a stuct to go in these first bytes, not just dummy len
+       if( DEBUG ) {
+               memset( nm->tp_buf, 0, tpb_len );
+               memcpy( nm->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!==", 34 );         // DEBUGGING do NOT use $ in this string!!
+       }
+
+       insert_mlen( (uint32_t) tpb_len, nm->tp_buf );                  // this len will likely be reset on send to shrink
 
        nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
 
        v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
        switch( ntohl( v1hdr->rmr_ver ) ) {
-               case 1:
-                       memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
-                       nm->payload = (void *) v1hdr + sizeof( *v1hdr );
-                       break;
+               // version 1 not supported
+               // version 2 not supported
+               case 3:
+                       // fall-through
 
                default:                                                                                        // current message version always caught  here
                        hdr = nm->header;
@@ -434,7 +445,7 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
 }
 
 /*
-       Realloc the message such that the payload is at least payload_len bytes.  
+       Realloc the message such that the payload is at least payload_len bytes.
        The clone and copy options affect what portion of the original payload is copied to
        the reallocated message, and whether or not the original payload is lost after the
        reallocation process has finished.
@@ -457,7 +468,7 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
                clone == false
                The old payload will be lost after reallocation. The message buffer pointer which
                is returned will likely reference the same structure (don't depend on that).
-               
+
 
        CAUTION:
        If the message is not a message which was received, the mtype, sub-id, length values in the
@@ -518,7 +529,7 @@ static inline rmr_mbuf_t* realloc_payload( rmr_mbuf_t* old_msg, int payload_len,
        omhdr = old_msg->header;
        mlen = hdr_len + (payload_len > old_psize ? payload_len : old_psize);           // must have larger in case copy is true
 
-       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "reallocate for payload increase. new message size: %d\n", (int) mlen );    
+       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "reallocate for payload increase. new message size: %d\n", (int) mlen );
        if( (nm->tp_buf = (char *) malloc( sizeof( char ) * mlen )) == NULL ) {
                rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for zero copy buffer. bytes requested: %d\n", (int) mlen );
                free( nm );
@@ -528,12 +539,12 @@ static inline rmr_mbuf_t* realloc_payload( rmr_mbuf_t* old_msg, int payload_len,
        nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;                        // point at the new header and copy from old
        SET_HDR_LEN( nm->header );
 
-       if( copy ) {                                                                                                                            // if we need to copy the old payload too
-               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy payload into new message: %d bytes\n", old_psize );
+       if( copy != 0 ) {                                                                                       // if we need to copy the old payload too
                memcpy( nm->header, omhdr, sizeof( char ) * (old_psize + RMR_HDR_LEN( omhdr )) );
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy payload into new message: %d bytes\n", old_psize );
        } else {                                                                                                                                        // just need to copy header
-               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy only header into new message: %d bytes\n", RMR_HDR_LEN( nm->header ) );
                memcpy( nm->header, omhdr, sizeof( char ) * RMR_HDR_LEN( omhdr ) );
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy only header into new message: %d bytes\n", RMR_HDR_LEN( nm->header ) );
        }
 
        ref_tpbuf( nm, mlen );                  // set payload and other pointers in the message to the new tp buffer
@@ -555,19 +566,6 @@ static inline rmr_mbuf_t* realloc_payload( rmr_mbuf_t* old_msg, int payload_len,
        return nm;
 }
 
-/*
-       For SI95 based transport all receives are driven through the threaded
-       ring and thus this function should NOT be called. If it is we will panic
-       and abort straight away.
-*/
-static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
-
-fprintf( stderr, "\n\n>>> rcv_msg: bad things just happened!\n\n>>>>>> abort!  rcv_msg called and it shouldn't be\n" );
-exit( 1 );
-
-       return NULL;
-}
-
 /*
        This does the hard work of actually sending the message to the given socket. On success,
        a new message struct is returned. On error, the original msg is returned with the state
@@ -578,7 +576,7 @@ exit( 1 );
        Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
        validation has been done prior.
 
-       When msg->state is not ok, this function must set tp_state in the message as some API 
+       When msg->state is not ok, this function must set tp_state in the message as some API
        fucntions return the message directly and do not propigate errno into the message.
 */
 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int retries ) {
@@ -597,8 +595,8 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int r
        tr_len = RMR_TR_LEN( hdr );                                                                             // snarf trace len before sending as hdr is invalid after send
 
        if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
-               strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );                                        // must overlay the source to be ours
-               strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
+               zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );                                    // must overlay the source to be ours
+               zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
        }
 
        if( retries == 0 ) {
@@ -613,7 +611,7 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int r
                if( tot_len > msg->alloc_len ) {
                        tot_len = msg->alloc_len;                                                                       // likely bad length from user :(
                }
-               *((int*) msg->tp_buf) = tot_len;
+               insert_mlen( tot_len, msg->tp_buf );    // shrink to fit
 
                if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg: ending %d (%x) bytes  usr_len=%d alloc=%d retries=%d\n", tot_len, tot_len, msg->len, msg->alloc_len, retries );
                if( DEBUG > 2 ) dump_40( msg->tp_buf, "sending" );
@@ -647,11 +645,12 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int r
                        rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
                        return NULL;
                }
-       } else {                                                                                        // send failed -- return original message
-               if( msg->state == 98 ) {                // FIX ME: this is just broken, but needs SI changes to work correctly for us
+       } else {                                                                                        // send failed or would block -- return original message
+               if( state == SI_ERR_BLOCKED || errno == EAGAIN ) {
                        errno = EAGAIN;
-                       msg->state = RMR_ERR_RETRY;                                     // errno will have nano reason
+                       msg->state = RMR_ERR_RETRY;
                } else {
+                       rmr_vlog( RMR_VL_WARN, "send failed: mt=%d errno=%d %s\n", msg->mtype, errno, strerror( errno ) );
                        msg->state = RMR_ERR_SENDFAILED;
                }
 
@@ -677,8 +676,8 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int r
        message type is used.  If the initial lookup, with a subid, fails, then a
        second lookup using just the mtype is tried.
 
-       When msg->state is not OK, this function must set tp_state in the message as 
-       some API fucntions return the message directly and do not propigate errno into 
+       When msg->state is not OK, this function must set tp_state in the message as
+       some API fucntions return the message directly and do not propigate errno into
        the message.
 
        CAUTION: this is a non-blocking send.  If the message cannot be sent, then
@@ -698,6 +697,7 @@ static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
        int                     sock_ok;                        // got a valid socket from round robin select
        char*           d1;
        int                     ok_sends = 0;           // track number of ok sends
+       route_table_t*  rt;                             // active route table
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
@@ -722,7 +722,9 @@ static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
                max_to = ctx->send_retries;             // convert to retries
        }
 
-       if( (rte = uta_get_rte( ctx->rtable, msg->sub_id, msg->mtype, TRUE )) == NULL ) {               // find the entry which matches subid/type allow fallback to type only key
+       rt = get_rt( ctx );                                                                             // get active route table and up ref count
+       if( (rte = uta_get_rte( rt, msg->sub_id, msg->mtype, TRUE )) == NULL ) {                // find the entry which matches subid/type allow fallback to type only key
+               release_rt( ctx, rt );
                rmr_vlog( RMR_VL_WARN, "no route table entry for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
                msg->state = RMR_ERR_NOENDPT;
                errno = ENXIO;                                                                          // must ensure it's not eagain
@@ -736,7 +738,7 @@ static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
                if( rte->nrrgroups > 0 ) {                                                      // this is a round robin entry if groups are listed
                        sock_ok = uta_epsock_rr( ctx, rte, group, &send_again, &nn_sock, &ep );         // select endpt from rr group and set again if more groups
                } else {
-                       sock_ok = epsock_meid( ctx, ctx->rtable, msg, &nn_sock, &ep );
+                       sock_ok = epsock_meid( ctx, rt, msg, &nn_sock, &ep );
                        send_again = 0;
                }
 
@@ -749,6 +751,7 @@ static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
                        if( send_again ) {
                                clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
                                if( clone_m == NULL ) {
+                                       release_rt( ctx, rt );
                                        msg->state = RMR_ERR_SENDFAILED;
                                        errno = ENOMEM;
                                        msg->tp_state = errno;
@@ -759,38 +762,26 @@ static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
                                if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
                                msg->flags |= MFL_NOALLOC;                                                              // keep send from allocating a new message; we have a clone to use
                                msg = send_msg( ctx, msg, nn_sock, max_to );                    // do the hard work, msg should be nil on success
-       
+
                                if( msg != NULL ) {                                                                             // returned message indicates send error of some sort
                                        rmr_free_msg( msg );                                                            // must ditchone; pick msg so we don't have to unfiddle flags
                                        msg = clone_m;
                                } else {
                                        ok_sends++;
                                        msg = clone_m;                                                                          // clone will be the next to send
+                                       msg->state = RMR_OK;
                                }
                        } else {
                                msg = send_msg( ctx, msg, nn_sock, max_to );                    // send the last, and allocate a new buffer; drops the clone if it was
                                if( DEBUG ) {
                                        if( msg == NULL ) {
-                                               rmr_vlog( RMR_VL_DEBUG, "mtosend_msg:  send returned nil message!\n" );         
+                                               rmr_vlog( RMR_VL_DEBUG, "mtosend_msg:  send returned nil message!\n" );
                                        }
                                }
                        }
 
-                       if( ep != NULL && msg != NULL ) {
-                               switch( msg->state ) {
-                                       case RMR_OK:
-                                               ep->scounts[EPSC_GOOD]++;
-                                               break;
-                               
-                                       case RMR_ERR_RETRY:
-                                               ep->scounts[EPSC_TRANS]++;
-                                               break;
-
-                                       default:
-                                               ep->scounts[EPSC_FAIL]++;
-                                               uta_ep_failed( ep );                                                            // sending to ep failed; set up to reconnect
-                                               break;
-                               }
+                       if( msg != NULL ) {
+                               incr_ep_counts( msg->state, ep );
                        }
                } else {
                        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
@@ -799,14 +790,16 @@ static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
                }
        }
 
+       release_rt( ctx, rt );                          // we can safely dec the ref counter now
+
        if( msg ) {                                                     // call functions don't get a buffer back, so a nil check is required
                msg->flags &= ~MFL_NOALLOC;             // must return with this flag off
                if( ok_sends ) {                                // multiple rr-groups and one was successful; report ok
                        msg->state = RMR_OK;
                }
-       
+
                if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "final send stats: ok=%d group=%d state=%d\n", ok_sends, group, msg->state );
-       
+
                msg->tp_state = errno;
        }