fix(rtable): Potential memory leak in rte replace
[ric-plt/lib/rmr.git] / src / nng / src / sr_nng_static.c
index cfea829..7c17589 100644 (file)
@@ -1,14 +1,14 @@
 // : vi ts=4 sw=4 noet :
 /*
 ==================================================================================
-       Copyright (c) 2019 Nokia 
+       Copyright (c) 2019 Nokia
        Copyright (c) 2018-2019 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at
 
-       http://www.apache.org/licenses/LICENSE-2.0
+          http://www.apache.org/licenses/LICENSE-2.0
 
    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,7 +20,7 @@
 
 /*
        Mnemonic:       sr_nng_static.c
-       Abstract:       These are static send/receive primatives which (sadly) 
+       Abstract:       These are static send/receive primatives which (sadly)
                                differ based on the underlying protocol (nng vs nanomsg).
                                Split from rmr_nng.c  for easier wormhole support.
 
@@ -37,7 +37,6 @@
 #include <nng/protocol/pipeline0/push.h>
 #include <nng/protocol/pipeline0/pull.h>
 
-
 /*
        Translates the nng state passed in to one of ours that is suitable to put
        into the message, and sets errno to something that might be useful.
@@ -56,7 +55,7 @@ static inline int xlate_nng_state( int state, int def_state ) {
                        state = RMR_ERR_RETRY;
                        errno = EAGAIN;
                        break;
-                       
+
                case NNG_ETIMEDOUT:
                        state = RMR_ERR_RETRY;
                        errno = EAGAIN;
@@ -86,7 +85,7 @@ static inline int xlate_nng_state( int state, int def_state ) {
                        errno  = EBADFD;                                // file des not in a good state for the operation
                        state = def_state;
                        break;
-               
+
                default:
                        errno = EBADE;
                        state = def_state;
@@ -102,16 +101,22 @@ static inline int xlate_nng_state( int state, int def_state ) {
        including our header). If size is 0, then the buffer allocated is the size previously
        allocated (if msg is !nil) or the default size given at initialisation).
 
+       The trlo (trace data lengh override) is used for trace length if >0. If <= 0, then
+       the context value is used.
+
        NOTE:  while accurate, the nng doc implies that both the msg buffer and data buffer
                are zero copy, however ONLY the message is zero copy. We now allocate and use
                nng messages.
 */
-static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state ) {
-       size_t          mlen;
+static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
+       size_t          mlen;                   // size of the transport buffer that we'll allocate
        uta_mhdr_t*     hdr;                    // convenience pointer
+       int                     tr_len;                 // trace data len (default or override)
+
+       tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
 
-       mlen = sizeof( uta_mhdr_t );                                            // figure size should we not have a msg buffer
-       mlen += (size > 0 ? size  : ctx->max_plen);                     // add user requested size or size set during init
+       mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len;       // start with header and trace/data lengths
+       mlen += (size > 0 ? size  : ctx->max_plen);                                                     // add user requested size or size set during init
 
        if( msg == NULL ) {
                msg = (rmr_mbuf_t *) malloc( sizeof *msg );
@@ -131,12 +136,20 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
        }
 
        msg->header = nng_msg_body( msg->tp_buf );
+       memset( msg->header, 0, sizeof( uta_mhdr_t ) );                         // ensure no junk in the header area
        if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
-               hdr->rmr_ver = RMR_MSG_VER;                                                             // version info should we need to recognised old style messages someday
+               hdr->rmr_ver = htonl( RMR_MSG_VER );                                    // set current version
+               hdr->sub_id = htonl( UNSET_SUBID );
+               SET_HDR_LEN( hdr );                                                                             // ensure these are converted to net byte order
+               SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
+               //SET_HDR_D1_LEN( hdr, ctx->d1_len );                                   // no need until we start using them
+               //SET_HDR_D2_LEN( hdr, ctx->d2_len );
        }
        msg->len = 0;                                                                                   // length of data in the payload
-       msg->alloc_len = mlen;                                                                  // length of allocated payload
-       msg->payload = msg->header + sizeof( uta_mhdr_t );              // point past header to payload (single buffer allocation above)
+       msg->alloc_len = mlen;                                                                  // length of allocated transport buffer
+       msg->sub_id = UNSET_SUBID;
+       msg->mtype = UNSET_MSGTYPE;
+       msg->payload = PAYLOAD_ADDR( hdr );                                             // point to payload (past all header junk)
        msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
        msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
        msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
@@ -148,7 +161,7 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
 }
 
 /*
-       Allocates only the mbuf and does NOT allocate an underlying transport buffer since 
+       Allocates only the mbuf and does NOT allocate an underlying transport buffer since
        NNG receive must allocate that on its own.
 */
 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
@@ -164,6 +177,8 @@ static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
 
        memset( msg, 0, sizeof( *msg ) );
 
+       msg->sub_id = UNSET_SUBID;
+       msg->mtype = UNSET_MSGTYPE;
        msg->tp_buf = NULL;
        msg->header = NULL;
        msg->len = -1;                                                                                  // no payload; invalid len
@@ -179,27 +194,67 @@ static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
 /*
        This accepts a message with the assumption that only the tp_buf pointer is valid. It
        sets all of the various header/payload/xaction pointers in the mbuf to the proper
-       spot in the transport layer buffer.  The len in the header is assumed to be the 
+       spot in the transport layer buffer.  The len in the header is assumed to be the
        allocated len (a receive buffer that nng created);
 
        The alen parm is the assumed allocated length; assumed because it's a value likely
        to have come from nng receive and the actual alloc len might be larger, but we
        can only assume this is the total usable space.
+
+       This function returns the message with an error state set if it detects that the
+       received message might have been truncated.  Check is done here as the calculation
+       is somewhat based on header version.
 */
 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
-       uta_mhdr_t* hdr;
+       uta_mhdr_t* hdr;                                // current header
+       uta_v1mhdr_t* v1hdr;                    // version 1 header
+       int ver;
+       int     hlen;                                           // header len to use for a truncation check
 
        msg->header = nng_msg_body( msg->tp_buf );                              // header is the start of the transport buffer
+       v1hdr = (uta_v1mhdr_t *) msg->header;                                   // v1 will always allow us to suss out the version
 
-       hdr = (uta_mhdr_t *) msg->header;
-       hdr->rmr_ver = RMR_MSG_VER;                                                             // version info should we need to recognised old style messages someday
-       msg->len = ntohl( hdr->plen );                                                  // length sender says is in the payload (received length could be larger)
-       msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header)
-       msg->payload = msg->header + sizeof( uta_mhdr_t );              // point past header to payload (single buffer allocation above)
-       msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
-       msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
-       msg->mtype = ntohl( hdr->mtype );                                                               // capture and convert from network order to local order
-       msg->state = RMR_OK;
+       if( v1hdr->rmr_ver == 1 ) {                     // bug in verion 1 didn't encode the version in network byte order
+               ver = 1;
+               v1hdr->rmr_ver = htonl( 1 );            // save it correctly in case we clone the message
+       } else {
+               ver = ntohl( v1hdr->rmr_ver );
+       }
+
+       switch( ver ) {
+               case 1:
+                       msg->len = ntohl( v1hdr->plen );                                                // length sender says is in the payload (received length could be larger)
+                       msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
+                       msg->payload = msg->header + sizeof( uta_v1mhdr_t );    // point past header to payload (single buffer allocation above)
+
+                       msg->xaction = &v1hdr->xid[0];                                                  // point at transaction id in header area
+                       msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
+                       msg->mtype = ntohl( v1hdr->mtype );                                             // capture and convert from network order to local order
+                       msg->sub_id = UNSET_SUBID;                                                              // type 1 messages didn't have this
+                       msg->state = RMR_OK;
+                       hlen = sizeof( uta_v1mhdr_t );
+                       break;
+
+               default:                                                                                                        // current version always lands here
+                       hdr = (uta_mhdr_t *) msg->header;
+                       msg->len = ntohl( hdr->plen );                                                  // length sender says is in the payload (received length could be larger)
+                       msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
+
+                       msg->payload = PAYLOAD_ADDR( hdr );                                             // at user payload
+                       msg->xaction = &hdr->xid[0];                                                    // point at transaction id in header area
+                       msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
+                       msg->mtype = ntohl( hdr->mtype );                                               // capture and convert from network order to local order
+                       msg->sub_id = ntohl( hdr->sub_id );
+                       hlen = RMR_HDR_LEN( hdr );                                                              // len to use for truncated check later
+                       break;
+       }
+
+       if( msg->len > (msg->alloc_len - hlen ) ) {                                             // more than we should have had room for; error
+               msg->state = RMR_ERR_TRUNC;
+               msg->len = msg->alloc_len -  hlen;                                                      // adjust len down so user app doesn't overrun
+       } else {
+               msg->state = RMR_OK;
+       }
 }
 
 /*
@@ -209,6 +264,8 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
        rmr_mbuf_t* nm;                 // new message buffer
        size_t  mlen;
        int state;
+       uta_mhdr_t* hdr;
+       uta_v1mhdr_t* v1hdr;
 
        nm = (rmr_mbuf_t *) malloc( sizeof *nm );
        if( nm == NULL ) {
@@ -223,16 +280,103 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
                exit( 1 );
        }
 
-       nm->header = nng_msg_body( nm->tp_buf );
+       nm->header = nng_msg_body( nm->tp_buf );                                // set and copy the header from old message
+       v1hdr = (uta_v1mhdr_t *) old_msg->header;               // v1 will work to dig header out of any version
+       switch( ntohl( v1hdr->rmr_ver ) ) {
+               case 1:
+                       memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
+                       nm->payload = (void *) v1hdr + sizeof( *v1hdr );
+                       break;
+
+               default:                                                                                        // current message always caught  here
+                       hdr = nm->header;
+                       memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header ));  // copy complete header, trace and other data
+                       nm->payload = PAYLOAD_ADDR( hdr );                              // at user payload
+                       break;
+       }
+
+       // --- these are all version agnostic -----------------------------------
        nm->mtype = old_msg->mtype;
+       nm->sub_id = old_msg->sub_id;
        nm->len = old_msg->len;                                                                 // length of data in the payload
        nm->alloc_len = mlen;                                                                   // length of allocated payload
-       nm->payload = nm->header + sizeof( uta_mhdr_t );                // point past header to payload (single buffer allocation above)
-       nm->xaction = ((uta_mhdr_t *)nm->header)->xid;                  // point at transaction id in header area
+
+       nm->xaction = hdr->xid;                                                                 // reference xaction
        nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
        nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
+       memcpy( nm->payload, old_msg->payload, old_msg->len );
 
-       memcpy( ((uta_mhdr_t *)nm->header)->src, ((uta_mhdr_t *)old_msg->header)->src, RMR_MAX_SID );
+       return nm;
+}
+
+/*
+       This will clone a message with a change to the trace area in the header such that
+       it will be tr_len passed in. The trace area in the cloned message will be uninitialised.
+       The orignal message will be left unchanged, and a pointer to the new message is returned.
+       It is not possible to realloc buffers and change the data sizes.
+*/
+static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
+       rmr_mbuf_t* nm;                 // new message buffer
+       size_t  mlen;
+       int state;
+       uta_mhdr_t* hdr;
+       uta_v1mhdr_t* v1hdr;
+       int     tr_old_len;                     // tr size in new buffer
+       int     coffset;                        // an offset to something in the header for copy
+
+       nm = (rmr_mbuf_t *) malloc( sizeof *nm );
+       if( nm == NULL ) {
+               fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
+               exit( 1 );
+       }
+       memset( nm, 0, sizeof( *nm ) );
+
+       hdr = old_msg->header;
+       tr_old_len = RMR_TR_LEN( hdr );                         // bytes in old header for trace
+
+       mlen = old_msg->alloc_len + (tr_len - tr_old_len);                                                      // new length with trace adjustment
+       if( DEBUG ) fprintf( stderr, "[DBUG] tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
+       if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
+               fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
+               exit( 1 );
+       }
+
+       nm->header = nng_msg_body( nm->tp_buf );                                // set and copy the header from old message
+       v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
+       switch( ntohl( v1hdr->rmr_ver ) ) {
+               case 1:
+                       memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
+                       nm->payload = (void *) v1hdr + sizeof( *v1hdr );
+                       break;
+
+               default:                                                                                        // current message always caught  here
+                       hdr = nm->header;
+                       memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) );           // ONLY copy the header portion; trace and data might have changed
+                       if( RMR_D1_LEN( hdr )  ) {
+                               coffset = DATA1_OFFSET( hdr );                                                                                          // offset to d1
+                               memcpy( hdr + coffset, old_msg->header + coffset, RMR_D1_LEN( hdr ) );          // copy data1 and data2 if necessary
+
+                       }
+                       if( RMR_D2_LEN( hdr )  ) {
+                               coffset = DATA2_OFFSET( hdr );                                                                                          // offset to d2
+                               memcpy( hdr + coffset, old_msg->header + coffset, RMR_D2_LEN( hdr ) );          // copy data2 and data2 if necessary
+                       }
+
+                       SET_HDR_TR_LEN( hdr, tr_len );                                                                          // MUST set before pointing payload
+                       nm->payload = PAYLOAD_ADDR( hdr );                                                                      // directly at the payload
+                       SET_HDR_TR_LEN( hdr, tr_len );                                                                          // do NOT copy old trace data, just set the new header
+                       break;
+       }
+
+       // --- these are all version agnostic -----------------------------------
+       nm->mtype = old_msg->mtype;
+       nm->sub_id = old_msg->sub_id;
+       nm->len = old_msg->len;                                                                 // length of data in the payload
+       nm->alloc_len = mlen;                                                                   // length of allocated payload
+
+       nm->xaction = hdr->xid;                                                                 // reference xaction
+       nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
+       nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
        memcpy( nm->payload, old_msg->payload, old_msg->len );
 
        return nm;
@@ -257,10 +401,10 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
 
 
        In the NNG msg world it must allocate the receive buffer rather
-       than accepting one that we allocated from their space and could 
+       than accepting one that we allocated from their space and could
        reuse.  They have their reasons I guess.  Thus, we will free
        the old transport buffer if user passes the message in; at least
-       our mbuf will be reused. 
+       our mbuf will be reused.
 */
 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
        int state;
@@ -276,16 +420,14 @@ static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
 
                msg->tp_buf = NULL;
        } else {
-               //msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK );                    // will abort on failure, no need to check
                msg = alloc_mbuf( ctx, RMR_OK );                                // msg without a transport buffer
        }
 
+       msg->alloc_len = 0;
        msg->len = 0;
        msg->payload = NULL;
        msg->xaction = NULL;
 
-       //rsize = msg->alloc_len;                                                                                                               // set to max, and we'll get len back here too
-       //msg->state = nng_recv( ctx->nn_sock, msg->header, &rsize, NO_FLAGS );         // total space (header + payload len) allocated
        msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                  // blocks hard until received
        if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
                return msg;
@@ -297,20 +439,22 @@ static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
        }
 
        rsize = nng_msg_len( msg->tp_buf );
-       if( rsize >= sizeof( uta_mhdr_t ) ) {                                                           // we need at least a full header here
-
-               ref_tpbuf( msg, rsize );                                                // point payload, header etc to the just received tp buffer
+       if( rsize >= sizeof( uta_v1mhdr_t ) ) {                 // we need at least a full type 1 (smallest) header here
+               ref_tpbuf( msg, rsize );                                        // point payload, header etc to the data and set trunc error if needed
                hdr = (uta_mhdr_t *) msg->header;
-               msg->flags |= MFL_ADDSRC;                               // turn on so if user app tries to send this buffer we reset src
-               if( msg->len > (msg->alloc_len - sizeof( uta_mhdr_t )) ) {              // way more than we should have had room for; error
-                       msg->state = RMR_ERR_TRUNC;
-               }
+               msg->flags |= MFL_ADDSRC;                                       // turn on so if user app tries to send this buffer we reset src
 
-               if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n", 
+               if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n",
                                msg->mtype, msg->state, msg->len,  msg->payload - (unsigned char *) msg->header );
        } else {
-               msg->len = 0;
                msg->state = RMR_ERR_EMPTY;
+               msg->len = 0;
+               msg->alloc_len = rsize;
+               msg->payload = NULL;
+               msg->xaction = NULL;
+               msg->flags |= MFL_ZEROCOPY;                                                                     // this is a zerocopy sendable message
+               msg->mtype = UNSET_MSGTYPE;
+               msg->sub_id = UNSET_SUBID;
        }
 
        return msg;
@@ -318,11 +462,11 @@ static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
 
 /*
        Receives a 'raw' message from a non-RMr sender (no header expected). The returned
-       message buffer cannot be used to send, and the length information may or may 
+       message buffer cannot be used to send, and the length information may or may
        not be correct (it is set to the length received which might be more than the
        bytes actually in the payload).
 
-       Mostly this supports the route table collector, but could be extended with an 
+       Mostly this supports the route table collector, but could be extended with an
        API external function.
 */
 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
@@ -333,7 +477,7 @@ static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
        if( old_msg ) {
                msg = old_msg;
        } else {
-               msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK );                      // will abort on failure, no need to check
+               msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
        }
 
        msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                  // blocks hard until received
@@ -346,7 +490,8 @@ static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
        msg->header = nng_msg_body( msg->tp_buf );
        msg->len = rsize;                                                       // len is the number of bytes received
        msg->alloc_len = rsize;
-       msg->mtype = -1;                                                        // raw message has no type
+       msg->mtype = UNSET_MSGTYPE;                                     // raw message has no type
+       msg->sub_id = UNSET_SUBID;                                      // nor a subscription id
        msg->state = RMR_OK;
        msg->flags = MFL_RAW;
        msg->payload = msg->header;                                     // payload is the whole thing; no header
@@ -359,8 +504,8 @@ static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
 
 /*
        This does the hard work of actually sending the message to the given socket. On success,
-       a new message struct is returned. On error, the original msg is returned with the state 
-       set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new 
+       a new message struct is returned. On error, the original msg is returned with the state
+       set to a reasonable value. If the message being sent as MFL_NOALLOC set, then a new
        buffer will not be allocated and returned (mostly for call() interal processing since
        the return message from call() is a received buffer, not a new one).
 
@@ -372,12 +517,15 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock
        uta_mhdr_t*     hdr;
        int nng_flags = NNG_FLAG_NONBLOCK;              // if we need to set any nng flags (zc buffer) add it to this
        int spin_retries = 1000;                                // if eagain/timeout we'll spin this many times before giving up the CPU
+       int     tr_len;                                                         // trace len in sending message so we alloc new message with same trace size
 
        // future: ensure that application did not overrun the XID buffer; last byte must be 0
 
        hdr = (uta_mhdr_t *) msg->header;
-       hdr->mtype = htonl( msg->mtype );                                                               // stash type/len in network byte order for transport
+       hdr->mtype = htonl( msg->mtype );                                                               // stash type/len/sub_id in network byte order for transport
+       hdr->sub_id = htonl( msg->sub_id );
        hdr->plen = htonl( msg->len );
+       tr_len = RMR_TR_LEN( hdr );                                                                             // snarf trace len before sending as hdr is invalid after send
 
        if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
                strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );                                        // must overlay the source to be ours
@@ -386,8 +534,6 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock
        errno = 0;
        msg->state = RMR_OK;
        if( msg->flags & MFL_ZEROCOPY ) {                                                                       // faster sending with zcopy buffer
-               //nng_flags |= NNG_FLAG_ALLOC;                                                                  // indicate a zc buffer that nng is expected to free
-
                do {
                        if( (state = nng_sendmsg( nn_sock, (nng_msg *) msg->tp_buf, nng_flags )) != 0 ) {               // must check and retry some if transient failure
                                msg->state = state;
@@ -406,9 +552,11 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock
                                msg->state = RMR_OK;
                                msg->header = NULL;                                                                                     // nano frees; don't risk accessing later by mistake
                                msg->tp_buf = NULL;
+                               hdr = NULL;
                        }
                } while( state && retries > 0 );
        } else {
+               // future: this should not happen as all buffers we deal with are zc buffers; might make sense to remove the test and else
                msg->state = RMR_ERR_SENDFAILED;
                errno = ENOTSUP;
                return msg;
@@ -417,13 +565,13 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock
                if( (state = nng_send( nn_sock, msg->header, sizeof( uta_mhdr_t ) + msg->len, nng_flags )) != 0 ) {
                        msg->state = state;
                        //if( DEBUG ) fprintf( stderr, ">>>>> copy buffer send failed: %s\n", nng_strerror( state ) );
-               } 
+               }
                */
        }
 
-       if( msg->state == RMR_OK ) {                                                            // successful send
-               if( !(msg->flags & MFL_NOALLOC) ) {                             // allocate another sendable zc buffer unless told otherwise
-                       return alloc_zcmsg( ctx, msg, 0, RMR_OK );      // preallocate a zero-copy buffer and return msg
+       if( msg->state == RMR_OK ) {                                                                    // successful send
+               if( !(msg->flags & MFL_NOALLOC) ) {                                                     // allocate another sendable zc buffer unless told otherwise
+                       return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len );              // preallocate a zero-copy buffer and return msg
                } else {
                        rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
                        return NULL;
@@ -434,8 +582,6 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock
                        msg->state = RMR_ERR_RETRY;                                     // errno will have nano reason
                } else {
                        msg->state = xlate_nng_state( msg->state, RMR_ERR_SENDFAILED );         // xlate to our state and set errno
-                       //errno = -msg->state;
-                       //msg->state = RMR_ERR_SENDFAILED;                                      // errno will have nano reason
                }
 
                if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );