Address complaints by code scanner
[ric-plt/lib/rmr.git] / src / rmr / si / src / sr_si_static.c
index 703e80c..b56b6dc 100644 (file)
@@ -21,7 +21,7 @@
 /*
        Mnemonic:       sr_si_static.c
        Abstract:       These are static send/receive primatives which (sadly)
 /*
        Mnemonic:       sr_si_static.c
        Abstract:       These are static send/receive primatives which (sadly)
-                               differ based on the underlying protocol (nng vs nanomsg).
+                               differ based on the underlying protocol (nng vs SI95).
                                Split from rmr_nng.c  for easier wormhole support.
 
        Author:         E. Scott Daniels
                                Split from rmr_nng.c  for easier wormhole support.
 
        Author:         E. Scott Daniels
 #ifndef _sr_si_static_c
 #define _sr_si_static_c
 
 #ifndef _sr_si_static_c
 #define _sr_si_static_c
 
-#include <nng/nng.h>
-#include <nng/protocol/pubsub0/pub.h>
-#include <nng/protocol/pubsub0/sub.h>
-#include <nng/protocol/pipeline0/push.h>
-#include <nng/protocol/pipeline0/pull.h>
+static void dump_n( char *p, char* label, int n ) {
+       int i;
+       int j;
+       int t = 0;
+       int     rows;
 
 
 
 
-static void dump_40( char *p, char* label ) {
-       int i;
+       if( label ) {
+               fprintf( stderr, "[DUMP] %s p=%p %d bytes\n", label, p, n );
+       }
+
+       rows = (n/16) + ((n % 16) ? 1 : 0);
 
 
-       if( label )
-               fprintf( stderr, ">>>>> %s p=%p\n", label, p );
+       for( j = 0; j < rows; j++ ) {
+               fprintf( stderr, "[DUMP] %04x: ", j * 16 );
 
 
-       for( i = 0; i < 40; i++ ) {
-               fprintf( stderr, "%02x ", (unsigned char) *(p+i) );
+               for( i = 0; t < n && i < 16; i++, t++ ) {
+                       fprintf( stderr, "%02x ", (unsigned char) *p );
+                       p++;
+               }
+               fprintf( stderr, "\n" );
        }
        }
-       fprintf( stderr, "\n" );
+}
+
+/*
+       backwards compatability.
+*/
+static void dump_40( char *p, char* label ) {
+       dump_n( p, label, 40 );
 }
 
 /*
 }
 
 /*
@@ -58,7 +70,7 @@ static void dump_40( char *p, char* label ) {
 
        The addition of the connection shut error code to the switch requires
        that the NNG version at commit e618abf8f3db2a94269a (or after) be
 
        The addition of the connection shut error code to the switch requires
        that the NNG version at commit e618abf8f3db2a94269a (or after) be
-       used for compiling RMR. 
+       used for compiling RMR.
 */
 static inline int xlate_si_state( int state, int def_state ) {
 
 */
 static inline int xlate_si_state( int state, int def_state ) {
 
@@ -98,6 +110,26 @@ static inline int xlate_si_state( int state, int def_state ) {
        return state;
 }
 
        return state;
 }
 
+/*
+       Given a message size and a buffer (assumed to be TP_SZFIELD_LEN or larger)
+       this will put in the size such that it is compatable with old versions
+       of RMR (that expect the message size to not be in network byte order)
+       and with new versions that do. See extract function in mt_call_si_static.c
+       for details on what ends up in the buffer.
+*/
+static inline void insert_mlen( uint32_t len, char* buf ) {
+       uint32_t* blen;                                                 // pointer into buffer where we'll add the len
+
+       blen = (uint32_t *) buf;                                // old systems expect an unconverted integer
+       *blen = len;
+
+       blen++;
+       *blen = htonl( len );                                   // new systems want a converted integer
+
+       memset( &buf[TP_SZFIELD_LEN], 0, 4 );   // clear to prevent future conversion issues
+       buf[TP_SZFIELD_LEN-1] = TP_SZ_MARKER;   // marker to flag this is generated by a new message
+}
+
 /*
        Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
        a new message struct as well. Size is the size of the zc buffer to allocate (not
 /*
        Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
        a new message struct as well. Size is the size of the zc buffer to allocate (not
@@ -126,7 +158,7 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
        if( msg == NULL && (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) == NULL ) {
                msg = (rmr_mbuf_t *) malloc( sizeof *msg );
                if( msg == NULL ) {
        if( msg == NULL && (msg = (rmr_mbuf_t *) uta_ring_extract( ctx->zcb_mring )) == NULL ) {
                msg = (rmr_mbuf_t *) malloc( sizeof *msg );
                if( msg == NULL ) {
-                       fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for message\n" );
+                       rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for message\n" );
                        return NULL;                                                            // we used to exit -- that seems wrong
                }
                memset( msg, 0, sizeof( *msg ) );       // tp_buffer will be allocated below
                        return NULL;                                                            // we used to exit -- that seems wrong
                }
                memset( msg, 0, sizeof( *msg ) );       // tp_buffer will be allocated below
@@ -135,24 +167,27 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
                        msg->alloc_len = 0;                             // force tp_buffer realloc below
                        if( msg->tp_buf ) {
                                free( msg->tp_buf );
                        msg->alloc_len = 0;                             // force tp_buffer realloc below
                        if( msg->tp_buf ) {
                                free( msg->tp_buf );
+                               msg->tp_buf = NULL;
                        }
                } else {
                        mlen = msg->alloc_len;                                                  // msg given, allocate the same size as before
                }
        }
 
                        }
                } else {
                        mlen = msg->alloc_len;                                                  // msg given, allocate the same size as before
                }
        }
 
+       msg->rts_fd = -1;                                       // must force to be invalid; not a received message that can be returned
 
        if( !msg->alloc_len && (msg->tp_buf = (void *) malloc( mlen )) == NULL ) {
 
        if( !msg->alloc_len && (msg->tp_buf = (void *) malloc( mlen )) == NULL ) {
-               fprintf( stderr, "[CRI] rmr_alloc_zc: cannot get memory for zero copy buffer: %d bytes\n", (int) mlen );
+               rmr_vlog( RMR_VL_CRIT, "rmr_alloc_zc: cannot get memory for zero copy buffer: %d bytes\n", (int) mlen );
                abort( );                                                                                       // toss out a core file for this
        }
 
                abort( );                                                                                       // toss out a core file for this
        }
 
-/*
-       memset( msg->tp_buf, 0, mlen );    // NOT for production (debug only)   valgrind will complain about uninitalised use if we don't set
-       memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 );                // NOT for production -- debugging eyecatcher
-*/
-       alen = (int *) msg->tp_buf;
-       *alen = mlen;                                           // FIX ME: need a stuct to go in these first bytes, not just dummy len
+       if( DEBUG ) {
+               // for speed we don't do this in production; for testing valgrind will complain about uninitialised use if not set
+               memset( msg->tp_buf, 0, mlen );
+               memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!==", 34 );                // do NOT use a $ in this string!
+       }
+
+       insert_mlen( (uint32_t) mlen, msg->tp_buf );                    // this will likely be overwriten on send to shirnk
 
        msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
        memset( msg->header, 0, sizeof( uta_mhdr_t ) );                         // ensure no junk in the header area
 
        msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
        memset( msg->header, 0, sizeof( uta_mhdr_t ) );                         // ensure no junk in the header area
@@ -165,18 +200,19 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
                //SET_HDR_D2_LEN( hdr, ctx->d2_len );                           // future
        }
        msg->len = 0;                                                                                   // length of data in the payload
                //SET_HDR_D2_LEN( hdr, ctx->d2_len );                           // future
        }
        msg->len = 0;                                                                                   // length of data in the payload
+       msg->cookie = 0x4942;
        msg->alloc_len = mlen;                                                                  // length of allocated transport buffer (caller size + rmr header)
        msg->sub_id = UNSET_SUBID;
        msg->mtype = UNSET_MSGTYPE;
        msg->payload = PAYLOAD_ADDR( hdr );                                             // point to payload (past all header junk)
        msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
        msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
        msg->alloc_len = mlen;                                                                  // length of allocated transport buffer (caller size + rmr header)
        msg->sub_id = UNSET_SUBID;
        msg->mtype = UNSET_MSGTYPE;
        msg->payload = PAYLOAD_ADDR( hdr );                                             // point to payload (past all header junk)
        msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
        msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
-       msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
+       msg->flags = MFL_ZEROCOPY;                                                              // this is a zerocopy sendable message
        msg->ring = ctx->zcb_mring;                                                             // original msg_free() api doesn't get context so must dup on eaach :(
        msg->ring = ctx->zcb_mring;                                                             // original msg_free() api doesn't get context so must dup on eaach :(
-       strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
-       strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
+       zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
+       zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
 
 
-       if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
+       if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
 
        return msg;
 }
 
        return msg;
 }
@@ -196,13 +232,14 @@ static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
                }
        } else {
                if( (msg = (rmr_mbuf_t *) malloc( sizeof *msg )) == NULL ) {
                }
        } else {
                if( (msg = (rmr_mbuf_t *) malloc( sizeof *msg )) == NULL ) {
-                       fprintf( stderr, "[CRI] rmr_alloc_mbuf: cannot get memory for message\n" );
+                       rmr_vlog( RMR_VL_CRIT, "rmr_alloc_mbuf: cannot get memory for message\n" );
                        return NULL;                                                    // this used to exit, but that seems wrong
                }
        }
 
        memset( msg, 0, sizeof( *msg ) );
 
                        return NULL;                                                    // this used to exit, but that seems wrong
                }
        }
 
        memset( msg, 0, sizeof( *msg ) );
 
+       msg->cookie = 0x4942;
        msg->sub_id = UNSET_SUBID;
        msg->mtype = UNSET_MSGTYPE;
        msg->tp_buf = NULL;
        msg->sub_id = UNSET_SUBID;
        msg->mtype = UNSET_MSGTYPE;
        msg->tp_buf = NULL;
@@ -300,14 +337,14 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
 
        nm = (rmr_mbuf_t *) malloc( sizeof *nm );
        if( nm == NULL ) {
 
        nm = (rmr_mbuf_t *) malloc( sizeof *nm );
        if( nm == NULL ) {
-               fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
+               rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" );
                exit( 1 );
        }
        memset( nm, 0, sizeof( *nm ) );
 
        mlen = old_msg->alloc_len;                                                                              // length allocated before
        if( (nm->tp_buf = (void *) malloc( sizeof( char ) * (mlen + TP_HDR_LEN) )) == NULL ) {
                exit( 1 );
        }
        memset( nm, 0, sizeof( *nm ) );
 
        mlen = old_msg->alloc_len;                                                                              // length allocated before
        if( (nm->tp_buf = (void *) malloc( sizeof( char ) * (mlen + TP_HDR_LEN) )) == NULL ) {
-               fprintf( stderr, "[CRI] rmr_si_clone: cannot get memory for zero copy buffer: %d\n", (int) mlen );
+               rmr_vlog( RMR_VL_CRIT, "rmr_si_clone: cannot get memory for zero copy buffer: %d\n", (int) mlen );
                abort();
        }
 
                abort();
        }
 
@@ -315,7 +352,8 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
        v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
        switch( ntohl( v1hdr->rmr_ver ) ) {
                case 1:
        v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
        switch( ntohl( v1hdr->rmr_ver ) ) {
                case 1:
-                       memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
+                       hdr = nm->header;
+                       memcpy( hdr, old_msg->header, sizeof( *v1hdr ) );               // copy complete header
                        nm->payload = (void *) v1hdr + sizeof( *v1hdr );
                        break;
 
                        nm->payload = (void *) v1hdr + sizeof( *v1hdr );
                        break;
 
@@ -332,7 +370,7 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
        nm->len = old_msg->len;                                                                 // length of data in the payload
        nm->alloc_len = mlen;                                                                   // length of allocated payload
 
        nm->len = old_msg->len;                                                                 // length of data in the payload
        nm->alloc_len = mlen;                                                                   // length of allocated payload
 
-       nm->xaction = hdr->xid;                                                                 // reference xaction
+       nm->xaction = &hdr->xid[0];                                                             // reference xaction
        nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
        nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
        memcpy( nm->payload, old_msg->payload, old_msg->len );
        nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
        nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
        memcpy( nm->payload, old_msg->payload, old_msg->len );
@@ -358,7 +396,7 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
 
        nm = (rmr_mbuf_t *) malloc( sizeof *nm );
        if( nm == NULL ) {
 
        nm = (rmr_mbuf_t *) malloc( sizeof *nm );
        if( nm == NULL ) {
-               fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
+               rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for message buffer\n" );
                exit( 1 );
        }
        memset( nm, 0, sizeof( *nm ) );
                exit( 1 );
        }
        memset( nm, 0, sizeof( *nm ) );
@@ -367,23 +405,26 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
        tr_old_len = RMR_TR_LEN( hdr );                         // bytes in old header for trace
 
        mlen = old_msg->alloc_len + (tr_len - tr_old_len);                                                      // new length with trace adjustment
        tr_old_len = RMR_TR_LEN( hdr );                         // bytes in old header for trace
 
        mlen = old_msg->alloc_len + (tr_len - tr_old_len);                                                      // new length with trace adjustment
-       if( DEBUG ) fprintf( stderr, "[DBUG] tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
+       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
 
        tpb_len = mlen + TP_HDR_LEN;
        if( (nm->tp_buf = (void *) malloc( tpb_len)) == NULL ) {
 
        tpb_len = mlen + TP_HDR_LEN;
        if( (nm->tp_buf = (void *) malloc( tpb_len)) == NULL ) {
-               fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
+               rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
                exit( 1 );
        }
                exit( 1 );
        }
-       memset( nm->tp_buf, 0, tpb_len );
-       memcpy( nm->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 );         // DEBUGGING
-       alen = (int *) nm->tp_buf;
-       *alen = tpb_len;                                                // FIX ME: need a stuct to go in these first bytes, not just dummy len
+       if( DEBUG ) {
+               memset( nm->tp_buf, 0, tpb_len );
+               memcpy( nm->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!==", 34 );         // DEBUGGING do NOT use $ in this string!!
+       }
+
+       insert_mlen( (uint32_t) tpb_len, nm->tp_buf );                  // this len will likely be reset on send to shrink
 
        nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
 
        v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
        switch( ntohl( v1hdr->rmr_ver ) ) {
                case 1:
 
        nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
 
        v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
        switch( ntohl( v1hdr->rmr_ver ) ) {
                case 1:
+                       v1hdr = nm->header;
                        memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
                        nm->payload = (void *) v1hdr + sizeof( *v1hdr );
                        break;
                        memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
                        nm->payload = (void *) v1hdr + sizeof( *v1hdr );
                        break;
@@ -410,7 +451,7 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
        nm->len = old_msg->len;                                                                 // length of data in the payload
        nm->alloc_len = mlen;                                                                   // length of allocated payload
 
        nm->len = old_msg->len;                                                                 // length of data in the payload
        nm->alloc_len = mlen;                                                                   // length of allocated payload
 
-       nm->xaction = hdr->xid;                                                                 // reference xaction
+       nm->xaction = &hdr->xid[0];                                                             // reference xaction
        nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
        nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
        memcpy( nm->payload, old_msg->payload, old_msg->len );
        nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
        nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
        memcpy( nm->payload, old_msg->payload, old_msg->len );
@@ -419,62 +460,138 @@ static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
 }
 
 /*
 }
 
 /*
-       For SI95 based transport all receives are driven through the threaded
-       ring and thus this function should NOT be called. If it is we will panic
-       and abort straight away.
+       Realloc the message such that the payload is at least payload_len bytes.
+       The clone and copy options affect what portion of the original payload is copied to
+       the reallocated message, and whether or not the original payload is lost after the
+       reallocation process has finished.
+
+               copy == true
+               The entire payload from the original message will be coppied to the reallocated
+               payload.
+
+               copy == false
+               Only the header (preserving return to sender information, message type, etc)
+               is preserved after reallocation; the payload used lengrh is set to 0 and the
+               payload is NOT initialised/cleared.
+
+               clone == true
+               The orignal message is preserved and a completely new message buffer and payload
+               are allocated (even if the size given is the same). A pointer to the new message
+               buffer is returned and it is the user application's responsibility to manage the
+               old buffer (e.g. free when not needed).
+
+               clone == false
+               The old payload will be lost after reallocation. The message buffer pointer which
+               is returned will likely reference the same structure (don't depend on that).
+
+
+       CAUTION:
+       If the message is not a message which was received, the mtype, sub-id, length values in the
+       RMR header in the allocated transport buffer will NOT be accurate and will cause the resulting
+       mbuffer information for mtype and subid to be reset even when copy is true. To avoid silently
+       resetting information in the mbuffer, this funciton will reset the mbuf values from the current
+       settings and NOT from the copied RMR header in transport buffer.
 */
 */
-static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
+static inline rmr_mbuf_t* realloc_payload( rmr_mbuf_t* old_msg, int payload_len, int copy, int clone ) {
+       rmr_mbuf_t* nm = NULL;  // new message buffer when cloning
+       size_t  mlen;
+       uta_mhdr_t* omhdr;              // old message header
+       int             tr_old_len;             // tr size in new buffer
+       int             old_psize = 0;  // size of payload in the message passed in (alloc size - tp header and rmr header lengths)
+       int             hdr_len = 0;    // length of RMR and transport headers in old msg
+       void*   old_tp_buf;             // pointer to the old tp buffer
+       int             free_tp = 1;    // free the transport buffer (old) when done (when not cloning)
+       int             old_mt;                 // msg type and sub-id from the message passed in
+       int             old_sid;
+       int             old_len;
+       int             old_rfd;                // rts file descriptor from old message
+
+       if( old_msg == NULL || payload_len <= 0 ) {
+               errno = EINVAL;
+               return NULL;
+       }
 
 
-fprintf( stderr, "\n\n>>> rcv_msg: bad things just happened!\n\n>>>>>> abort!  rcv_msg called and it shouldn't be\n" );
-exit( 1 );
+       old_mt = old_msg->mtype;                        // preserve mbuf info
+       old_sid = old_msg->sub_id;
+       old_len = old_msg->len;
+       old_rfd = old_msg->rts_fd;
 
 
-       return NULL;
-}
+       old_psize = old_msg->alloc_len - (RMR_HDR_LEN( old_msg->header ) + TP_HDR_LEN);         // user payload size in orig message
 
 
-/*
-       Receives a 'raw' message from a non-RMr sender (no header expected). The returned
-       message buffer cannot be used to send, and the length information may or may
-       not be correct (it is set to the length received which might be more than the
-       bytes actually in the payload).
+       if( !clone  && payload_len <= old_psize ) {                                                                             // not cloning and old is large enough; nothing to do
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: old msg payload larger than requested: cur=%d need=%d\n", old_psize, payload_len );
+               return old_msg;
+       }
 
 
-       Mostly this supports the route table collector, but could be extended with an
-       API external function.
-*/
-static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
-       return NULL;
-/*
-FIXME: not implemented yet
-       int state;
-       rmr_mbuf_t*     msg = NULL;             // msg received
-       size_t  rsize;                          // nng needs to write back the size received... grrr
+       hdr_len = RMR_HDR_LEN( old_msg->header ) + TP_HDR_LEN;                          // with SI we manage the transport header; must include in len
+       old_tp_buf = old_msg->tp_buf;
 
 
-       if( old_msg ) {
-               msg = old_msg;
+       if( clone ) {
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: cloning message\n" );
+               free_tp = 0;
+
+               nm = (rmr_mbuf_t *) malloc( sizeof( *nm ) );
+               if( nm == NULL ) {
+                       rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for message buffer. bytes requested: %d\n", (int) sizeof(*nm) );
+                       return NULL;
+               }
+               memset( nm, 0, sizeof( *nm ) );
+               nm->rts_fd = old_rfd;                           // this is managed only in the mbuf; dup now
        } else {
        } else {
-               msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
+               nm = old_msg;
        }
 
        }
 
-       //msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                        // blocks hard until received
-       if( (msg->state = xlate_si_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
-               return msg;
+       omhdr = old_msg->header;
+       mlen = hdr_len + (payload_len > old_psize ? payload_len : old_psize);           // must have larger in case copy is true
+
+       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "reallocate for payload increase. new message size: %d\n", (int) mlen );
+       if( (nm->tp_buf = (char *) malloc( sizeof( char ) * mlen )) == NULL ) {
+               rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for zero copy buffer. bytes requested: %d\n", (int) mlen );
+               free( nm );
+               return NULL;
        }
        }
-       rsize = nng_msg_len( msg->tp_buf );
 
 
-       // do NOT use ref_tpbuf() here! Must fill these in manually.
-       msg->header = nng_msg_body( msg->tp_buf );
-       msg->len = rsize;                                                       // len is the number of bytes received
-       msg->alloc_len = rsize;
-       msg->mtype = UNSET_MSGTYPE;                                     // raw message has no type
-       msg->sub_id = UNSET_SUBID;                                      // nor a subscription id
-       msg->state = RMR_OK;
-       msg->flags = MFL_RAW;
-       msg->payload = msg->header;                                     // payload is the whole thing; no header
-       msg->xaction = NULL;
+       nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;                        // point at the new header and copy from old
+       SET_HDR_LEN( nm->header );
 
 
-       if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
+       if( copy ) {                                                                                                                            // if we need to copy the old payload too
+               memcpy( nm->header, omhdr, sizeof( char ) * (old_psize + RMR_HDR_LEN( omhdr )) );
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy payload into new message: %d bytes\n", old_psize );
+       } else {                                                                                                                                        // just need to copy header
+               memcpy( nm->header, omhdr, sizeof( char ) * RMR_HDR_LEN( omhdr ) );
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rmr_realloc_payload: copy only header into new message: %d bytes\n", RMR_HDR_LEN( nm->header ) );
+       }
 
 
-       return msg;
+       ref_tpbuf( nm, mlen );                  // set payload and other pointers in the message to the new tp buffer
+
+       if( !copy ) {
+               nm->mtype = -1;                                         // didn't copy payload, so mtype, sub-id, and rts fd are invalid
+               nm->sub_id = -1;
+               nm->len = 0;                                            // and len is 0
+       } else {
+               nm->len = old_len;                                      // we must force these to avoid losing info if msg wasn't a received message
+               nm->mtype = old_mt;
+               nm->sub_id = old_sid;
+       }
+
+       if( free_tp ) {
+               free( old_tp_buf );                             // we did not clone, so free b/c no references
+       }
+
+       return nm;
+}
+
+/*
+       For SI95 based transport all receives are driven through the threaded
+       ring and thus this function should NOT be called. If it is we will panic
+       and abort straight away.
 */
 */
+static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
+
+fprintf( stderr, "\n\n>>> rcv_msg: bad things just happened!\n\n>>>>>> abort!  rcv_msg called and it shouldn't be\n" );
+exit( 1 );
+
+       return NULL;
 }
 
 /*
 }
 
 /*
@@ -487,7 +604,7 @@ FIXME: not implemented yet
        Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
        validation has been done prior.
 
        Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
        validation has been done prior.
 
-       When msg->state is not ok, this function must set tp_state in the message as some API 
+       When msg->state is not ok, this function must set tp_state in the message as some API
        fucntions return the message directly and do not propigate errno into the message.
 */
 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int retries ) {
        fucntions return the message directly and do not propigate errno into the message.
 */
 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int retries ) {
@@ -506,8 +623,8 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int r
        tr_len = RMR_TR_LEN( hdr );                                                                             // snarf trace len before sending as hdr is invalid after send
 
        if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
        tr_len = RMR_TR_LEN( hdr );                                                                             // snarf trace len before sending as hdr is invalid after send
 
        if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
-               strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );                                        // must overlay the source to be ours
-               strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
+               zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );                                    // must overlay the source to be ours
+               zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
        }
 
        if( retries == 0 ) {
        }
 
        if( retries == 0 ) {
@@ -519,13 +636,16 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int r
        msg->state = RMR_OK;
        do {
                tot_len = msg->len + PAYLOAD_OFFSET( hdr ) + TP_HDR_LEN;                        // we only send what was used + header lengths
        msg->state = RMR_OK;
        do {
                tot_len = msg->len + PAYLOAD_OFFSET( hdr ) + TP_HDR_LEN;                        // we only send what was used + header lengths
-               *((int*) msg->tp_buf) = tot_len;
+               if( tot_len > msg->alloc_len ) {
+                       tot_len = msg->alloc_len;                                                                       // likely bad length from user :(
+               }
+               insert_mlen( tot_len, msg->tp_buf );    // shrink to fit
 
 
-               if( DEBUG > 1 ) fprintf( stderr, "[DEBUG] send_msg: ending %d (%x) bytes  usr_len=%d alloc=%d retries=%d\n", tot_len, tot_len, msg->len, msg->alloc_len, retries );
+               if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg: ending %d (%x) bytes  usr_len=%d alloc=%d retries=%d\n", tot_len, tot_len, msg->len, msg->alloc_len, retries );
                if( DEBUG > 2 ) dump_40( msg->tp_buf, "sending" );
 
                if( (state = SIsendt( ctx->si_ctx, nn_sock, msg->tp_buf, tot_len )) != SI_OK ) {
                if( DEBUG > 2 ) dump_40( msg->tp_buf, "sending" );
 
                if( (state = SIsendt( ctx->si_ctx, nn_sock, msg->tp_buf, tot_len )) != SI_OK ) {
-                       if( DEBUG > 1 ) fprintf( stderr, "[DBUG] send_msg:  error!! sent state=%d\n", state );
+                       if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg:  error!! sent state=%d\n", state );
                        msg->state = state;
                        if( retries > 0 && state == SI_ERR_BLOCKED ) {
                                if( --spin_retries <= 0 ) {                             // don't give up the processor if we don't have to
                        msg->state = state;
                        if( retries > 0 && state == SI_ERR_BLOCKED ) {
                                if( --spin_retries <= 0 ) {                             // don't give up the processor if we don't have to
@@ -539,7 +659,7 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int r
                                state = 0;                      // don't loop
                        }
                } else {
                                state = 0;                      // don't loop
                        }
                } else {
-                       if( DEBUG > 2 ) fprintf( stderr, "[DBUG] sent OK state=%d\n", state );
+                       if( DEBUG > 2 ) rmr_vlog( RMR_VL_DEBUG, "sent OK state=%d\n", state );
                        state = 0;
                        msg->state = RMR_OK;
                        hdr = NULL;
                        state = 0;
                        msg->state = RMR_OK;
                        hdr = NULL;
@@ -561,7 +681,7 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int r
                        msg->state = RMR_ERR_SENDFAILED;
                }
 
                        msg->state = RMR_ERR_SENDFAILED;
                }
 
-               if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );
        }
 
        return msg;
        }
 
        return msg;
@@ -583,8 +703,8 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int r
        message type is used.  If the initial lookup, with a subid, fails, then a
        second lookup using just the mtype is tried.
 
        message type is used.  If the initial lookup, with a subid, fails, then a
        second lookup using just the mtype is tried.
 
-       When msg->state is not OK, this function must set tp_state in the message as 
-       some API fucntions return the message directly and do not propigate errno into 
+       When msg->state is not OK, this function must set tp_state in the message as
+       some API fucntions return the message directly and do not propigate errno into
        the message.
 
        CAUTION: this is a non-blocking send.  If the message cannot be sent, then
        the message.
 
        CAUTION: this is a non-blocking send.  If the message cannot be sent, then
@@ -604,6 +724,7 @@ static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
        int                     sock_ok;                        // got a valid socket from round robin select
        char*           d1;
        int                     ok_sends = 0;           // track number of ok sends
        int                     sock_ok;                        // got a valid socket from round robin select
        char*           d1;
        int                     ok_sends = 0;           // track number of ok sends
+       route_table_t*  rt;                             // active route table
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
@@ -628,10 +749,10 @@ static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
                max_to = ctx->send_retries;             // convert to retries
        }
 
                max_to = ctx->send_retries;             // convert to retries
        }
 
-       if( (rte = uta_get_rte( ctx->rtable, msg->sub_id, msg->mtype, TRUE )) == NULL ) {               // find the entry which matches subid/type allow fallback to type only key
-               if( ctx->flags & CTXFL_WARN ) {
-                       fprintf( stderr, "[WARN] no route table entry for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
-               }
+       rt = get_rt( ctx );                                                                             // get active route table and up ref count
+       if( (rte = uta_get_rte( rt, msg->sub_id, msg->mtype, TRUE )) == NULL ) {                // find the entry which matches subid/type allow fallback to type only key
+               release_rt( ctx, rt );
+               rmr_vlog( RMR_VL_WARN, "no route table entry for mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
                msg->state = RMR_ERR_NOENDPT;
                errno = ENXIO;                                                                          // must ensure it's not eagain
                msg->tp_state = errno;
                msg->state = RMR_ERR_NOENDPT;
                errno = ENXIO;                                                                          // must ensure it's not eagain
                msg->tp_state = errno;
@@ -641,9 +762,14 @@ static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
        send_again = 1;                                                                                 // force loop entry
        group = 0;                                                                                              // always start with group 0
        while( send_again ) {
        send_again = 1;                                                                                 // force loop entry
        group = 0;                                                                                              // always start with group 0
        while( send_again ) {
-               sock_ok = uta_epsock_rr( rte, group, &send_again, &nn_sock, &ep, ctx->si_ctx );         // select endpt from rr group and set again if more groups
+               if( rte->nrrgroups > 0 ) {                                                      // this is a round robin entry if groups are listed
+                       sock_ok = uta_epsock_rr( ctx, rte, group, &send_again, &nn_sock, &ep );         // select endpt from rr group and set again if more groups
+               } else {
+                       sock_ok = epsock_meid( ctx, rt, msg, &nn_sock, &ep );
+                       send_again = 0;
+               }
 
 
-               if( DEBUG ) fprintf( stderr, "[DBUG] mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
                                msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );
 
                group++;
                                msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );
 
                group++;
@@ -652,31 +778,31 @@ static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
                        if( send_again ) {
                                clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
                                if( clone_m == NULL ) {
                        if( send_again ) {
                                clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
                                if( clone_m == NULL ) {
+                                       release_rt( ctx, rt );
                                        msg->state = RMR_ERR_SENDFAILED;
                                        errno = ENOMEM;
                                        msg->tp_state = errno;
                                        msg->state = RMR_ERR_SENDFAILED;
                                        errno = ENOMEM;
                                        msg->tp_state = errno;
-                                       if( ctx->flags & CTXFL_WARN ) {
-                                               fprintf( stderr, "[WARN] unable to clone message for multiple rr-group send\n" );
-                                       }
+                                       rmr_vlog( RMR_VL_WARN, "unable to clone message for multiple rr-group send\n" );
                                        return msg;
                                }
 
                                        return msg;
                                }
 
-                               if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
+                               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
                                msg->flags |= MFL_NOALLOC;                                                              // keep send from allocating a new message; we have a clone to use
                                msg = send_msg( ctx, msg, nn_sock, max_to );                    // do the hard work, msg should be nil on success
                                msg->flags |= MFL_NOALLOC;                                                              // keep send from allocating a new message; we have a clone to use
                                msg = send_msg( ctx, msg, nn_sock, max_to );                    // do the hard work, msg should be nil on success
-       
+
                                if( msg != NULL ) {                                                                             // returned message indicates send error of some sort
                                        rmr_free_msg( msg );                                                            // must ditchone; pick msg so we don't have to unfiddle flags
                                        msg = clone_m;
                                } else {
                                        ok_sends++;
                                        msg = clone_m;                                                                          // clone will be the next to send
                                if( msg != NULL ) {                                                                             // returned message indicates send error of some sort
                                        rmr_free_msg( msg );                                                            // must ditchone; pick msg so we don't have to unfiddle flags
                                        msg = clone_m;
                                } else {
                                        ok_sends++;
                                        msg = clone_m;                                                                          // clone will be the next to send
+                                       msg->state = RMR_OK;
                                }
                        } else {
                                msg = send_msg( ctx, msg, nn_sock, max_to );                    // send the last, and allocate a new buffer; drops the clone if it was
                                if( DEBUG ) {
                                        if( msg == NULL ) {
                                }
                        } else {
                                msg = send_msg( ctx, msg, nn_sock, max_to );                    // send the last, and allocate a new buffer; drops the clone if it was
                                if( DEBUG ) {
                                        if( msg == NULL ) {
-                                               fprintf( stderr, "[DBUG] mtosend_msg:  send returned nil message!\n" );         
+                                               rmr_vlog( RMR_VL_DEBUG, "mtosend_msg:  send returned nil message!\n" );
                                        }
                                }
                        }
                                        }
                                }
                        }
@@ -686,7 +812,7 @@ static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
                                        case RMR_OK:
                                                ep->scounts[EPSC_GOOD]++;
                                                break;
                                        case RMR_OK:
                                                ep->scounts[EPSC_GOOD]++;
                                                break;
-                               
+
                                        case RMR_ERR_RETRY:
                                                ep->scounts[EPSC_TRANS]++;
                                                break;
                                        case RMR_ERR_RETRY:
                                                ep->scounts[EPSC_TRANS]++;
                                                break;
@@ -698,24 +824,22 @@ static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
                                }
                        }
                } else {
                                }
                        }
                } else {
-/*
-                       if( ctx->flags & CTXFL_WARN ) {
-                               fprintf( stderr, "[WARN] invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
-                       }
-*/
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "invalid socket for rte, setting no endpoint err: mtype=%d sub_id=%d\n", msg->mtype, msg->sub_id );
                        msg->state = RMR_ERR_NOENDPT;
                        errno = ENXIO;
                }
        }
 
                        msg->state = RMR_ERR_NOENDPT;
                        errno = ENXIO;
                }
        }
 
+       release_rt( ctx, rt );                          // we can safely dec the ref counter now
+
        if( msg ) {                                                     // call functions don't get a buffer back, so a nil check is required
                msg->flags &= ~MFL_NOALLOC;             // must return with this flag off
                if( ok_sends ) {                                // multiple rr-groups and one was successful; report ok
                        msg->state = RMR_OK;
                }
        if( msg ) {                                                     // call functions don't get a buffer back, so a nil check is required
                msg->flags &= ~MFL_NOALLOC;             // must return with this flag off
                if( ok_sends ) {                                // multiple rr-groups and one was successful; report ok
                        msg->state = RMR_OK;
                }
-       
-               if( DEBUG ) fprintf( stderr, "[DBUG] final send stats: ok=%d group=%d state=%d\n\n", ok_sends, group, msg->state );
-       
+
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "final send stats: ok=%d group=%d state=%d\n", ok_sends, group, msg->state );
+
                msg->tp_state = errno;
        }
 
                msg->tp_state = errno;
        }