Correct bug identified in static analysis
[ric-plt/lib/rmr.git] / src / rmr / si / src / sr_si_static.c
index 703e80c..4f86728 100644 (file)
@@ -1,8 +1,8 @@
 // vim: ts=4 sw=4 noet :
 /*
 ==================================================================================
-       Copyright (c) 2019-2020 Nokia
-       Copyright (c) 2018-2020 AT&T Intellectual Property.
+       Copyright (c) 2019-2021 Nokia
+       Copyright (c) 2018-2021 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
@@ -21,7 +21,7 @@
 /*
        Mnemonic:       sr_si_static.c
        Abstract:       These are static send/receive primatives which (sadly)
-                               differ based on the underlying protocol (nng vs nanomsg).
+                               differ based on the underlying protocol (nng vs SI95).
                                Split from rmr_nng.c  for easier wormhole support.
 
        Author:         E. Scott Daniels
 #ifndef _sr_si_static_c
 #define _sr_si_static_c
 
-#include <nng/nng.h>
-#include <nng/protocol/pubsub0/pub.h>
-#include <nng/protocol/pubsub0/sub.h>
-#include <nng/protocol/pipeline0/push.h>
-#include <nng/protocol/pipeline0/pull.h>
+static void dump_n( char *p, char* label, int n ) {
+       int i;
+       int j;
+       int t = 0;
+       int     rows;
 
 
-static void dump_40( char *p, char* label ) {
-       int i;
+       if( label ) {
+               fprintf( stderr, "[DUMP] %s p=%p %d bytes\n", label, p, n );
+       }
+
+       rows = (n/16) + ((n % 16) ? 1 : 0);
 
-       if( label )
-               fprintf( stderr, ">>>>> %s p=%p\n", label, p );
+       for( j = 0; j < rows; j++ ) {
+               fprintf( stderr, "[DUMP] %04x: ", j * 16 );
 
-       for( i = 0; i < 40; i++ ) {
-               fprintf( stderr, "%02x ", (unsigned char) *(p+i) );
+               for( i = 0; t < n && i < 16; i++, t++ ) {
+                       fprintf( stderr, "%02x ", (unsigned char) *p );
+                       p++;
+               }
+               fprintf( stderr, "\n" );
        }
-       fprintf( stderr, "\n" );
+}
+
+/*
+       backwards compatability.
+*/
+static void dump_40( char *p, char* label ) {
+       dump_n( p, label, 40 );
 }
 
 /*
@@ -58,7 +70,7 @@ static void dump_40( char *p, char* label ) {
 
        The addition of the connection shut error code to the switch requires
        that the NNG version at commit e618abf8f3db2a94269a (or after) be
-       used for compiling RMR. 
+       used for compiling RMR.
 */
 static inline int xlate_si_state( int state, int def_state ) {
 
@@ -98,6 +110,26 @@ static inline int xlate_si_state( int state, int def_state ) {
        return state;
 }
 
+/*
+       Given a message size and a buffer (assumed to be TP_SZFIELD_LEN or larger)
+       this will put in the size such that it is compatable with old versions
+       of RMR (that expect the message size to not be in network byte order)
+       and with new versions that do. See extract function in mt_call_si_static.c
+       for details on what ends up in the buffer.
+*/
+static inline void insert_mlen( uint32_t len, char* buf ) {
+       uint32_t* blen;                                                 // pointer into buffer where we'll add the len
+
+       blen = (uint32_t *) buf;                                // old systems expect an unconverted integer
+       *blen = len;
+
+       blen++;
+       *blen = htonl( len );                                   // new systems want a converted integer
+
+       memset( &buf[TP_SZFIELD_LEN], 0, 4 );   // clear to prevent future conversion issues
+       buf[TP_SZFIELD_LEN-1] = TP_SZ_MARKER;   // marker to flag this is generated by a new message
+}
+
 /*
        Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
        a new message struct as well. Size is the size of the zc buffer to allocate (not
@@ -126,7 +158,7 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
        if( msg == NULL && (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) == NULL ) {
                msg = (rmr_mbuf_t *) malloc( sizeof *msg );
                if( msg == NULL ) {
-                       fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
+                       rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for message\n" );
                        return NULL;                                                            // we used to exit -- that seems wrong
                }
                memset( msg, 0, sizeof( *msg ) );       // tp_buffer will be allocated below
@@ -135,24 +167,27 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
                        msg->alloc_len = 0;                             // force tp_buffer realloc below
                        if( msg->tp_buf ) {
                                free( msg->tp_buf );
+                               msg->tp_buf = NULL;
                        }
                } else {
                        mlen = msg->alloc_len;                                                  // msg given, allocate the same size as before
                }
        }
 
+       msg->rts_fd = -1;                                       // must force to be invalid; not a received message that can be returned
 
        if( !msg->alloc_len && (msg->tp_buf = (void *) malloc( mlen )) == NULL ) {
-               fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for zero copy buffer: %d bytes\n", (int) mlen );
+               rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for zero copy buffer: %d bytes\n", (int) mlen );
                abort( );                                                                                       // toss out a core file for this
        }
 
-/*
-       memset( msg->tp_buf, 0, mlen );    // NOT for production (debug only)   valgrind will complain about uninitalised use if we don't set
-       memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 );                // NOT for production -- debugging eyecatcher
-*/
-       alen = (int *) msg->tp_buf;
-       *alen = mlen;                                           // FIX ME: need a stuct to go in these first bytes, not just dummy len
+       if( DEBUG ) {
+               // for speed we don't do this in production; for testing valgrind will complain about uninitialised use if not set
+               memset( msg->tp_buf, 0, mlen );
+               memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!==", 34 );                // do NOT use a $ in this string!
+       }
+
+       insert_mlen( (uint32_t) mlen, msg->tp_buf );                    // this will likely be overwriten on send to shirnk
 
        msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
        memset( msg->header, 0, sizeof( uta_mhdr_t ) );                         // ensure no junk in the header area
@@ -165,18 +200,19 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
                //SET_HDR_D2_LEN( hdr, ctx->d2_len );                           // future
        }
        msg->len = 0;                                                                                   // length of data in the payload
+       msg->cookie = 0x4942;
        msg->alloc_len = mlen;                                                                  // length of allocated transport buffer (caller size + rmr header)
        msg->sub_id = UNSET_SUBID;
        msg->mtype = UNSET_MSGTYPE;
        msg->payload = PAYLOAD_ADDR( hdr );                                             // point to payload (past all header junk)
        msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
        msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
-       msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
+       msg->flags = MFL_ZEROCOPY;                                                              // this is a zerocopy sendable message
        msg->ring = ctx->zcb_mring;                                                             // original msg_free() api doesn't get context so must dup on eaach :(
-       strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
-       strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
+       zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
+       zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
 
-       if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
+       if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
 
        return msg;
 }
@@ -196,13 +232,14 @@ static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
                }
        } else {
                if( (msg = (rmr_mbuf_t *) malloc( sizeof *msg )) == NULL ) {
-                       fprintf( stderr, "[CRI] rmr_alloc_mbuf: cannot get memory for message\n" );
+                       rmr_vlog( RMR_VL_CRIT, "rmr_alloc_mbuf: cannot get memory for message\n" );
                        return NULL;                                                    // this used to exit, but that seems wrong
                }
        }
 
        memset( msg, 0, sizeof( *msg ) );
 
+       msg->cookie = 0x4942;
        msg->sub_id = UNSET_SUBID;
        msg->mtype = UNSET_MSGTYPE;
        msg->tp_buf = NULL;
@@ -244,27 +281,14 @@ static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
        msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
 
        v1hdr = (uta_v1mhdr_t *) msg->header;                                   // v1 will always allow us to suss out the version
-
-       if( v1hdr->rmr_ver == 1 ) {                     // bug in verion 1 didn't encode the version in network byte order
-               ver = 1;
-               v1hdr->rmr_ver = htonl( 1 );            // save it correctly in case we clone the message
-       } else {
-               ver = ntohl( v1hdr->rmr_ver );
-       }
+       ver = ntohl( v1hdr->rmr_ver );
 
        switch( ver ) {
-               case 1:
-                       msg->len = ntohl( v1hdr->plen );                                                // length sender says is in the payload (received length could be larger)
-                       msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
-                       msg->payload = msg->header + sizeof( uta_v1mhdr_t );    // point past header to payload (single buffer allocation above)
+               // version 1 is deprecated  case 1:
+               // version 2 is deprecated  case 2:
 
-                       msg->xaction = &v1hdr->xid[0];                                                  // point at transaction id in header area
-                       msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
-                       msg->mtype = ntohl( v1hdr->mtype );                                             // capture and convert from network order to local order
-                       msg->sub_id = UNSET_SUBID;                                                              // type 1 messages didn't have this
-                       msg->state = RMR_OK;
-                       hlen = sizeof( uta_v1mhdr_t );
-                       break;
+               case 3:
+                       // fall-through
 
                default:                                                                                                        // current version always lands here
                        hdr = (uta_mhdr_t *) msg->header;
@@ -300,25 +324,25 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
 
        nm = (rmr_mbuf_t *) malloc( sizeof *nm );
        if( nm == NULL ) {
-               fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
+               rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" );
                exit( 1 );
        }
        memset( nm, 0, sizeof( *nm ) );
 
        mlen = old_msg->alloc_len;                                                                              // length allocated before
        if( (nm->tp_buf = (void *) malloc( sizeof( char ) * (mlen + TP_HDR_LEN) )) == NULL ) {
-               fprintf( stderr, "[CRI] rmr_si_clone: cannot get memory for zero copy buffer: %d\n", (int) mlen );
+               rmr_vlog( RMR_VL_CRIT, "rmr_si_clone: cannot get memory for zero copy buffer: %d\n", (int) mlen );
                abort();
        }
 
        nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
        v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
        switch( ntohl( v1hdr->rmr_ver ) ) {
-               case 1:
-                       memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
-                       nm->payload = (void *) v1hdr + sizeof( *v1hdr );
-                       break;
+               // version 1 deprecated case 1:
+               // version 2 deprecated 
 
+               case 3:
+                       // fall-through
                default:                                                                                        // current message always caught  here
                        hdr = nm->header;
                        memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header ));  // copy complete header, trace and other data
@@ -332,7 +356,7 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
        nm->len = old_msg->len;                                                                 // length of data in the payload
        nm->alloc_len = mlen;                                                                   // length of allocated payload
 
-       nm->xaction = hdr->xid;                                                                 // reference xaction
+       nm->xaction = &hdr->xid[0];                                                             // reference xaction
        nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
        nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
        memcpy( nm->payload, old_msg->payload, old_msg->len );
@@ -358,7 +382,7 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
 
        nm = (rmr_mbuf_t *) malloc( sizeof *nm );
        if( nm == NULL ) {
-               fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
+               rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" );
                exit( 1 );
        }
        memset( nm, 0, sizeof( *nm ) );
@@ -367,26 +391,28 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
        tr_old_len = RMR_TR_LEN( hdr );                         // bytes in old header for trace
 
        mlen = old_msg->alloc_len + (tr_len - tr_old_len);                                                      // new length with trace adjustment
-       if( DEBUG ) fprintf( stderr, "[DBUG] tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
+       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
 
        tpb_len = mlen + TP_HDR_LEN;
        if( (nm->tp_buf = (void *) malloc( tpb_len)) == NULL ) {
-               fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
+               rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
                exit( 1 );
        }
-       memset( nm->tp_buf, 0, tpb_len );
-       memcpy( nm->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 );         // DEBUGGING
-       alen = (int *) nm->tp_buf;
-       *alen = tpb_len;                                                // FIX ME: need a stuct to go in these first bytes, not just dummy len
+       if( DEBUG ) {
+               memset( nm->tp_buf, 0, tpb_len );
+               memcpy( nm->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!==", 34 );         // DEBUGGING do NOT use $ in this string!!
+       }
+
+       insert_mlen( (uint32_t) tpb_len, nm->tp_buf );                  // this len will likely be reset on send to shrink
 
        nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
 
        v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
        switch( ntohl( v1hdr->rmr_ver ) ) {
-               case 1:
-                       memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
-                       nm->payload = (void *) v1hdr + sizeof( *v1hdr );
-                       break;
+               // version 1 not supported
+               // version 2 not supported
+               case 3:
+                       // fall-through
 
                default:                                                                                        // current message version always caught  here
                        hdr = nm->header;
@@ -410,7 +436,7 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
        nm->len = old_msg->len;                                                                 // length of data in the payload
        nm->alloc_len = mlen;                                                                   // length of allocated payload
 
-       nm->xaction = hdr->xid;                                                                 // reference xaction
+       nm->xaction = &hdr->xid[0];                                                             // reference xaction
        nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
        nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
        memcpy( nm->payload, old_msg->payload, old_msg->len );
@@ -419,62 +445,125 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
 }
 
 /*
-       For SI95 based transport all receives are driven through the threaded
-       ring and thus this function should NOT be called. If it is we will panic
-       and abort straight away.
+       Realloc the message such that the payload is at least payload_len bytes.
+       The clone and copy options affect what portion of the original payload is copied to
+       the reallocated message, and whether or not the original payload is lost after the
+       reallocation process has finished.
+
+               copy == true
+               The entire payload from the original message will be coppied to the reallocated
+               payload.
+
+               copy == false
+               Only the header (preserving return to sender information, message type, etc)
+               is preserved after reallocation; the payload used lengrh is set to 0 and the
+               payload is NOT initialised/cleared.
+
+               clone == true
+               The orignal message is preserved and a completely new message buffer and payload
+               are allocated (even if the size given is the same). A pointer to the new message
+               buffer is returned and it is the user application's responsibility to manage the
+               old buffer (e.g. free when not needed).
+
+               clone == false
+               The old payload will be lost after reallocation. The message buffer pointer which
+               is returned will likely reference the same structure (don't depend on that).
+
+
+       CAUTION:
+       If the message is not a message which was received, the mtype, sub-id, length values in the
+       RMR header in the allocated transport buffer will NOT be accurate and will cause the resulting
+       mbuffer information for mtype and subid to be reset even when copy is true. To avoid silently
+       resetting information in the mbuffer, this funciton will reset the mbuf values from the current
+       settings and NOT from the copied RMR header in transport buffer.
 */
-static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
+static inline rmr_mbuf_t* realloc_payload( rmr_mbuf_t* old_msg, int payload_len, int copy, int clone ) {
+       rmr_mbuf_t* nm = NULL;  // new message buffer when cloning
+       size_t  mlen;
+       uta_mhdr_t* omhdr;              // old message header
+       int             tr_old_len;             // tr size in new buffer
+       int             old_psize = 0;  // size of payload in the message passed in (alloc size - tp header and rmr header lengths)
+       int             hdr_len = 0;    // length of RMR and transport headers in old msg
+       void*   old_tp_buf;             // pointer to the old tp buffer
+       int             free_tp = 1;    // free the transport buffer (old) when done (when not cloning)
+       int             old_mt;                 // msg type and sub-id from the message passed in
+       int             old_sid;
+       int             old_len;
+       int             old_rfd;                // rts file descriptor from old message
+
+       if( old_msg == NULL || payload_len <= 0 ) {
+               errno = EINVAL;
+               return NULL;
+       }
 
-fprintf( stderr, "\n\n>>> rcv_msg: bad things just happened!\n\n>>>>>> abort!  rcv_msg called and it shouldn't be\n" );
-exit( 1 );
+       old_mt = old_msg->mtype;                        // preserve mbuf info
+       old_sid = old_msg->sub_id;
+       old_len = old_msg->len;
+       old_rfd = old_msg->rts_fd;
 
-       return NULL;
-}
+       old_psize = old_msg->alloc_len - (RMR_HDR_LEN( old_msg->header ) + TP_HDR_LEN);         // user payload size in orig message
 
-/*
-       Receives a 'raw' message from a non-RMr sender (no header expected). The returned
-       message buffer cannot be used to send, and the length information may or may
-       not be correct (it is set to the length received which might be more than the
-       bytes actually in the payload).
+       if( !clone  && payload_len <= old_psize ) {                                                                             // not cloning and old is large enough; nothing to do
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: old msg payload larger than requested: cur=%d need=%d\n", old_psize, payload_len );
+               return old_msg;
+       }
 
-       Mostly this supports the route table collector, but could be extended with an
-       API external function.
-*/
-static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
-       return NULL;
-/*
-FIXME: not implemented yet
-       int state;
-       rmr_mbuf_t*     msg = NULL;             // msg received
-       size_t  rsize;                          // nng needs to write back the size received... grrr
+       hdr_len = RMR_HDR_LEN( old_msg->header ) + TP_HDR_LEN;                          // with SI we manage the transport header; must include in len
+       old_tp_buf = old_msg->tp_buf;
 
-       if( old_msg ) {
-               msg = old_msg;
+       if( clone ) {
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: cloning message\n" );
+               free_tp = 0;
+
+               nm = (rmr_mbuf_t *) malloc( sizeof( *nm ) );
+               if( nm == NULL ) {
+                       rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for message buffer. bytes requested: %d\n", (int) sizeof(*nm) );
+                       return NULL;
+               }
+               memset( nm, 0, sizeof( *nm ) );
+               nm->rts_fd = old_rfd;                           // this is managed only in the mbuf; dup now
        } else {
-               msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
+               nm = old_msg;
        }
 
-       //msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                        // blocks hard until received
-       if( (msg->state = xlate_si_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
-               return msg;
+       omhdr = old_msg->header;
+       mlen = hdr_len + (payload_len > old_psize ? payload_len : old_psize);           // must have larger in case copy is true
+
+       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "reallocate for payload increase. new message size: %d\n", (int) mlen );
+       if( (nm->tp_buf = (char *) malloc( sizeof( char ) * mlen )) == NULL ) {
+               rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for zero copy buffer. bytes requested: %d\n", (int) mlen );
+               free( nm );
+               return NULL;
        }
-       rsize = nng_msg_len( msg->tp_buf );
 
-       // do NOT use ref_tpbuf() here! Must fill these in manually.
-       msg->header = nng_msg_body( msg->tp_buf );
-       msg->len = rsize;                                                       // len is the number of bytes received
-       msg->alloc_len = rsize;
-       msg->mtype = UNSET_MSGTYPE;                                     // raw message has no type
-       msg->sub_id = UNSET_SUBID;                                      // nor a subscription id
-       msg->state = RMR_OK;
-       msg->flags = MFL_RAW;
-       msg->payload = msg->header;                                     // payload is the whole thing; no header
-       msg->xaction = NULL;
+       nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;                        // point at the new header and copy from old
+       SET_HDR_LEN( nm->header );
 
-       if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
+       if( copy != 0 ) {                                                                                       // if we need to copy the old payload too
+               memcpy( nm->header, omhdr, sizeof( char ) * (old_psize + RMR_HDR_LEN( omhdr )) );
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy payload into new message: %d bytes\n", old_psize );
+       } else {                                                                                                                                        // just need to copy header
+               memcpy( nm->header, omhdr, sizeof( char ) * RMR_HDR_LEN( omhdr ) );
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy only header into new message: %d bytes\n", RMR_HDR_LEN( nm->header ) );
+       }
 
-       return msg;
-*/
+       ref_tpbuf( nm, mlen );                  // set payload and other pointers in the message to the new tp buffer
+
+       if( !copy ) {
+               nm->mtype = -1;                                         // didn't copy payload, so mtype, sub-id, and rts fd are invalid
+               nm->sub_id = -1;
+               nm->len = 0;                                            // and len is 0
+       } else {
+               nm->len = old_len;                                      // we must force these to avoid losing info if msg wasn't a received message
+               nm->mtype = old_mt;
+               nm->sub_id = old_sid;
+       }
+
+       if( free_tp ) {
+               free( old_tp_buf );                             // we did not clone, so free b/c no references
+       }
+
+       return nm;
 }
 
 /*
@@ -487,7 +576,7 @@ FIXME: not implemented yet
        Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
        validation has been done prior.
 
-       When msg->state is not ok, this function must set tp_state in the message as some API 
+       When msg->state is not ok, this function must set tp_state in the message as some API
        fucntions return the message directly and do not propigate errno into the message.
 */
 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int retries ) {
@@ -506,8 +595,8 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int r
        tr_len = RMR_TR_LEN( hdr );                                                                             // snarf trace len before sending as hdr is invalid after send
 
        if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
-               strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );                                        // must overlay the source to be ours
-               strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
+               zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );                                    // must overlay the source to be ours
+               zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
        }
 
        if( retries == 0 ) {
@@ -519,13 +608,16 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int r
        msg->state = RMR_OK;
        do {
                tot_len = msg->len + PAYLOAD_OFFSET( hdr ) + TP_HDR_LEN;                        // we only send what was used + header lengths
-               *((int*) msg->tp_buf) = tot_len;
+               if( tot_len > msg->alloc_len ) {
+                       tot_len = msg->alloc_len;                                                                       // likely bad length from user :(
+               }
+               insert_mlen( tot_len, msg->tp_buf );    // shrink to fit
 
-               if( DEBUG > 1 ) fprintf( stderr, "[DEBUG] send_msg: ending %d (%x) bytes  usr_len=%d alloc=%d retries=%d\n", tot_len, tot_len, msg->len, msg->alloc_len, retries );
+               if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg: ending %d (%x) bytes  usr_len=%d alloc=%d retries=%d\n", tot_len, tot_len, msg->len, msg->alloc_len, retries );
                if( DEBUG > 2 ) dump_40( msg->tp_buf, "sending" );
 
                if( (state = SIsendt( ctx->si_ctx, nn_sock, msg->tp_buf, tot_len )) != SI_OK ) {
-                       if( DEBUG > 1 ) fprintf( stderr, "[DBUG] send_msg:  error!! sent state=%d\n", state );
+                       if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg:  error!! sent state=%d\n", state );
                        msg->state = state;
                        if( retries > 0 && state == SI_ERR_BLOCKED ) {
                                if( --spin_retries <= 0 ) {                             // don't give up the processor if we don't have to
@@ -539,7 +631,7 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int r
                                state = 0;                      // don't loop
                        }
                } else {
-                       if( DEBUG > 2 ) fprintf( stderr, "[DBUG] sent OK state=%d\n", state );
+                       if( DEBUG > 2 ) rmr_vlog( RMR_VL_DEBUG, "sent OK state=%d\n", state );
                        state = 0;
                        msg->state = RMR_OK;
                        hdr = NULL;
@@ -553,15 +645,16 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int r
                        rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
                        return NULL;
                }
-       } else {                                                                                        // send failed -- return original message
-               if( msg->state == 98 ) {                // FIX ME: this is just broken, but needs SI changes to work correctly for us
+       } else {                                                                                        // send failed or would block -- return original message
+               if( state == SI_ERR_BLOCKED || errno == EAGAIN ) {
                        errno = EAGAIN;
-                       msg->state = RMR_ERR_RETRY;                                     // errno will have nano reason
+                       msg->state = RMR_ERR_RETRY;
                } else {
+                       rmr_vlog( RMR_VL_WARN, "send failed: mt=%d errno=%d %s\n", msg->mtype, errno, strerror( errno ) );
                        msg->state = RMR_ERR_SENDFAILED;
                }
 
-               if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
        }
 
        return msg;
@@ -583,8 +676,8 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int r
        message type is used.  If the initial lookup, with a subid, fails, then a
        second lookup using just the mtype is tried.
 
-       When msg->state is not OK, this function must set tp_state in the message as 
-       some API fucntions return the message directly and do not propigate errno into 
+       When msg->state is not OK, this function must set tp_state in the message as
+       some API fucntions return the message directly and do not propigate errno into
        the message.
 
        CAUTION: this is a non-blocking send.  If the message cannot be sent, then
@@ -604,6 +697,7 @@ static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
        int                     sock_ok;                        // got a valid socket from round robin select
        char*           d1;
        int                     ok_sends = 0;           // track number of ok sends
+       route_table_t*  rt;                             // active route table
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
@@ -628,10 +722,10 @@ static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
                max_to = ctx->send_retries;             // convert to retries
        }
 
-       if( (rte = uta_get_rte( ctx->rtable, msg->sub_id, msg->mtype, TRUE )) == NULL ) {               // find the entry which matches subid/type allow fallback to type only key
-               if( ctx->flags & CTXFL_WARN ) {
-                       fprintf( stderr, "[WARN] no route table entry for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
-               }
+       rt = get_rt( ctx );                                                                             // get active route table and up ref count
+       if( (rte = uta_get_rte( rt, msg->sub_id, msg->mtype, TRUE )) == NULL ) {                // find the entry which matches subid/type allow fallback to type only key
+               release_rt( ctx, rt );
+               rmr_vlog( RMR_VL_WARN, "no route table entry for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
                msg->state = RMR_ERR_NOENDPT;
                errno = ENXIO;                                                                          // must ensure it's not eagain
                msg->tp_state = errno;
@@ -641,9 +735,14 @@ static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
        send_again = 1;                                                                                 // force loop entry
        group = 0;                                                                                              // always start with group 0
        while( send_again ) {
-               sock_ok = uta_epsock_rr( rte, group, &send_again, &nn_sock, &ep, ctx->si_ctx );         // select endpt from rr group and set again if more groups
+               if( rte->nrrgroups > 0 ) {                                                      // this is a round robin entry if groups are listed
+                       sock_ok = uta_epsock_rr( ctx, rte, group, &send_again, &nn_sock, &ep );         // select endpt from rr group and set again if more groups
+               } else {
+                       sock_ok = epsock_meid( ctx, rt, msg, &nn_sock, &ep );
+                       send_again = 0;
+               }
 
-               if( DEBUG ) fprintf( stderr, "[DBUG] mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
                                msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );
 
                group++;
@@ -652,70 +751,55 @@ static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
                        if( send_again ) {
                                clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
                                if( clone_m == NULL ) {
+                                       release_rt( ctx, rt );
                                        msg->state = RMR_ERR_SENDFAILED;
                                        errno = ENOMEM;
                                        msg->tp_state = errno;
-                                       if( ctx->flags & CTXFL_WARN ) {
-                                               fprintf( stderr, "[WARN] unable to clone message for multiple rr-group send\n" );
-                                       }
+                                       rmr_vlog( RMR_VL_WARN, "unable to clone message for multiple rr-group send\n" );
                                        return msg;
                                }
 
-                               if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
+                               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
                                msg->flags |= MFL_NOALLOC;                                                              // keep send from allocating a new message; we have a clone to use
                                msg = send_msg( ctx, msg, nn_sock, max_to );                    // do the hard work, msg should be nil on success
-       
+
                                if( msg != NULL ) {                                                                             // returned message indicates send error of some sort
                                        rmr_free_msg( msg );                                                            // must ditchone; pick msg so we don't have to unfiddle flags
                                        msg = clone_m;
                                } else {
                                        ok_sends++;
                                        msg = clone_m;                                                                          // clone will be the next to send
+                                       msg->state = RMR_OK;
                                }
                        } else {
                                msg = send_msg( ctx, msg, nn_sock, max_to );                    // send the last, and allocate a new buffer; drops the clone if it was
                                if( DEBUG ) {
                                        if( msg == NULL ) {
-                                               fprintf( stderr, "[DBUG] mtosend_msg:  send returned nil message!\n" );         
+                                               rmr_vlog( RMR_VL_DEBUG, "mtosend_msg:  send returned nil message!\n" );
                                        }
                                }
                        }
 
-                       if( ep != NULL && msg != NULL ) {
-                               switch( msg->state ) {
-                                       case RMR_OK:
-                                               ep->scounts[EPSC_GOOD]++;
-                                               break;
-                               
-                                       case RMR_ERR_RETRY:
-                                               ep->scounts[EPSC_TRANS]++;
-                                               break;
-
-                                       default:
-                                               ep->scounts[EPSC_FAIL]++;
-                                               uta_ep_failed( ep );                                                            // sending to ep failed; set up to reconnect
-                                               break;
-                               }
+                       if( msg != NULL ) {
+                               incr_ep_counts( msg->state, ep );
                        }
                } else {
-/*
-                       if( ctx->flags & CTXFL_WARN ) {
-                               fprintf( stderr, "[WARN] invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
-                       }
-*/
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
                        msg->state = RMR_ERR_NOENDPT;
                        errno = ENXIO;
                }
        }
 
+       release_rt( ctx, rt );                          // we can safely dec the ref counter now
+
        if( msg ) {                                                     // call functions don't get a buffer back, so a nil check is required
                msg->flags &= ~MFL_NOALLOC;             // must return with this flag off
                if( ok_sends ) {                                // multiple rr-groups and one was successful; report ok
                        msg->state = RMR_OK;
                }
-       
-               if( DEBUG ) fprintf( stderr, "[DBUG] final send stats: ok=%d group=%d state=%d\n\n", ok_sends, group, msg->state );
-       
+
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "final send stats: ok=%d group=%d state=%d\n", ok_sends, group, msg->state );
+
                msg->tp_state = errno;
        }