feat(msgs): Add header v2 support 52/52/1
authorE. Scott Daniels <daniels@research.att.com>
Fri, 12 Apr 2019 15:14:40 +0000 (15:14 +0000)
committerE. Scott Daniels <daniels@research.att.com>
Tue, 16 Apr 2019 14:18:49 +0000 (14:18 +0000)
Change-Id: Iebfb91b0b42d99e1cc54fa012d1bdfc24d7d57c2
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Nanomsg changes for v2 header

Change-Id: I2d01c346e743556b04c43db9885132ee1107c059
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Fix warning message

Change-Id: Iafa54e4701b00ba7b7e2ee88b531c66482f40d46
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Remvoe test/debug/RBF things

Change-Id: I40feef30b9622fc42299ab50b5badc558796e910
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Add tag based versioning to cmake file

Change-Id: I2a37c4cd793ff298af37898371025d1b48dd9990
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
CMakeLists.txt
src/common/include/rmr.h
src/common/include/rmr_agnostic.h
src/nanomsg/include/rmr_private.h
src/nanomsg/src/rmr.c
src/nanomsg/src/sr_static.c
src/nng/include/rmr_nng_private.h
src/nng/src/rmr_nng.c
src/nng/src/sr_nng_static.c

index 9371ce3..c34f17a 100644 (file)
@@ -22,9 +22,9 @@ project( rmr LANGUAGES C )
 cmake_minimum_required( VERSION 3.5 )
 
 
-set( major_version "1" )
+set( major_version "0" )               # automatically populated from git tag later
 set( minor_version "0" )
-set( patch_level "16" )
+set( patch_level "0" )
 
 set( install_root "${CMAKE_INSTALL_PREFIX}" )
 set( install_lib "lib" )
@@ -43,17 +43,42 @@ if( NOT BUILD_LIB )
        set( BUILD_LIB lib )
 endif()
 
-# ---------------- set version info (not perfect, but better than nothing) ----
+# ---------------- extract some things from git ------------------------------
+
+# commit id for the version string
 execute_process( 
        COMMAND bash -c "git rev-parse --short HEAD|awk '{printf\"%s\", $0}'" 
        OUTPUT_VARIABLE git_id
 )
 
+# version information for library names and version string
+execute_process( 
+       COMMAND bash -c "git describe --tags --abbrev=0 HEAD 2>/dev/null | awk -v tag=0.0.4095 ' { tag=$1 } END{ print  tag suffix }'|sed 's/\\./;/g' "
+       OUTPUT_VARIABLE mmp_version_str
+       ERROR_QUIET OUTPUT_STRIP_TRAILING_WHITESPACE
+)
+
+# extra indicator to show that the build was based on modified file(s) and not the true commit
+# (no hope of reproducing the exact library for debugging). Used only for the internal version 
+# string.
+execute_process( 
+       COMMAND bash -c "git diff --shortstat|awk -v fmt=%s -v r=-rotten '{ s=r } END { printf( fmt, s ) }'" 
+       OUTPUT_VARIABLE spoiled_str
+)
+
+set( mmp_version ${mmp_version_str} )
+list( GET mmp_version 0 major_version )
+list( GET mmp_version 1 minor_version )
+list( GET mmp_version 2 patch_level )
+
+message( "+++ building ${major_version}.${minor_version}.${patch_level}${spoiled_str}" )
+
+# define constants used in the version string
 add_definitions( 
        -DGIT_ID=${git_id} 
        -DMAJOR_VER=${major_version}
        -DMINOR_VER=${minor_version}
-       -DPATCH_VER=${patch_level}
+       -DPATCH_VER=${patch_level}${spoiled_str}
 )
 
 
@@ -153,13 +178,22 @@ add_subdirectory( doc )                           # this will auto skip if {X}fm is not available
 add_library( rmr_shared SHARED "$<TARGET_OBJECTS:nano_objects>;$<TARGET_OBJECTS:common_objects>" )
 add_library( rmr_static STATIC "$<TARGET_OBJECTS:nano_objects>;$<TARGET_OBJECTS:common_objects>" )
 
-# both libraries to be named with librmr prefix
-set_target_properties( rmr_shared PROPERTIES OUTPUT_NAME "rmr" )
-set_target_properties( rmr_static PROPERTIES OUTPUT_NAME "rmr" )
+# both libraries to be named with librmr prefix and given version numbers with sym links
+set_target_properties( rmr_shared 
+       PROPERTIES 
+               OUTPUT_NAME "rmr" 
+               SOVERSION ${major_version} 
+               VERSION ${major_version}.${minor_version}.${patch_level} )
+
+set_target_properties( rmr_static 
+       PROPERTIES 
+               OUTPUT_NAME "rmr" 
+               SOVERSION ${major_version} 
+               VERSION ${major_version}.${minor_version}.${patch_level} )
 
 
 # NNG based library (librmr_nng )
-# library is built by pulling objects from nng and common subdirs
+# library is built by pulling objects from nng and common subdirs.
 #
 add_library( rmr_nng_shared SHARED "$<TARGET_OBJECTS:nng_objects>;$<TARGET_OBJECTS:common_objects>" )
 add_library( rmr_nng_static STATIC "$<TARGET_OBJECTS:nng_objects>;$<TARGET_OBJECTS:common_objects>" )
@@ -173,10 +207,18 @@ if( need_ext )
 endif()
 
 
-# both libraries to be named with librmr_nng prefix
-#
-set_target_properties( rmr_nng_shared PROPERTIES OUTPUT_NAME "rmr_nng" )
-set_target_properties( rmr_nng_static PROPERTIES OUTPUT_NAME "rmr_nng" )
+# both libraries to be named with librmr_nng prefix and given version numbers with sym links
+set_target_properties( rmr_nng_shared 
+       PROPERTIES 
+               OUTPUT_NAME "rmr_nng" 
+               SOVERSION ${major_version} 
+               VERSION ${major_version}.${minor_version}.${patch_level} )
+
+set_target_properties( rmr_nng_static 
+       PROPERTIES 
+               OUTPUT_NAME "rmr_nng" 
+               SOVERSION ${major_version} 
+               VERSION ${major_version}.${minor_version}.${patch_level} )
 
 #
 if( APPLE  )
@@ -191,6 +233,7 @@ endif()
 # the moment, there are no header files specific to either nano or nng, so to the public
 # header directive is moot, but needed if some day there is one.
 #
+#install( TARGETS rmr_nng_shared;rmr_nng_static;rmr_shared;rmr_static;rmr_nng_shared_mm EXPORT LibraryConfig
 install( TARGETS rmr_nng_shared;rmr_nng_static;rmr_shared;rmr_static EXPORT LibraryConfig
     ARCHIVE  DESTINATION ${install_lib}
     LIBRARY  DESTINATION ${install_lib}
@@ -231,7 +274,7 @@ IF( EXISTS "${CMAKE_ROOT}/Modules/CPack.cmake" )
        set( CPACK_PACKAGE_VERSION_MAJOR "${major_version}" )
        set( CPACK_PACKAGE_VERSION_MINOR "${minor_version}" )
        set( CPACK_PACKAGE_VERSION_PATCH "${patch_level}" )
-       set( CPACK_PACKAGE_FILE_NAME "${CMAKE_PROJECT_NAME}_${major_version}.${minor_version}.${CPACK_PACKAGE_VERSION_PATCH}" )
+       set( CPACK_PACKAGE_FILE_NAME "${CMAKE_PROJECT_NAME}_${major_version}.${minor_version}.${CPACK_PACKAGE_VERSION_PATCH}${spoiled_str}" )
        set( CPACK_SOURCE_PACKAGE_FILE_NAME "vric${CMAKE_PROJECT_NAME}_${major_version}.${minor_version}.${CPACK_PACKAGE_VERSION_PATCH}" )
 
        # we build and ship the libraries, so there is NO dependency
index 2f58eaf..9ef563f 100644 (file)
@@ -38,7 +38,7 @@ extern "C" {
 #define RMR_MAX_XID                    32              // space in header reserved for user xaction id
 #define RMR_MAX_SID                    32              // spece in header reserved for sender id
 #define RMR_MAX_MEID           32              // spece in header reserved for managed element id
-#define RMR_MAX_SRC                    16              // max length of hostname
+#define RMR_MAX_SRC                    64              // max length of hostname (which could be IPv6 addr with [...]:port so more than the 39 bytes of a plain addr
 #define RMR_MAX_RCV_BYTES      4096    // max bytes we support in a receive message
 
                                                                        // various flags for function calls
@@ -130,6 +130,7 @@ extern int rmr_send_to( void* vctx, int time );             // DEPRECATED -- replaced with
 
 // --- uta compatability defs if needed user should define UTA_COMPAT  ----------------------------------
 #ifdef UTA_COMPAT
+#pragma message( "use of UTA_COMPAT is deprecated and soon to be removed" )
 
 #define UTA_MAX_XID RMR_MAX_XID 
 #define UTA_MAX_SID    RMR_MAX_SID 
index 687330b..fc45429 100644 (file)
@@ -44,7 +44,7 @@ typedef struct uta_ctx  uta_ctx_t;
 #define QUOTE_DEF(a) QUOTE(a)  // allow a #define value to be quoted (e.g. QUOTE(MAJOR_VERSION) )
 
 
-#define RMR_MSG_VER    1                       // potental to treat messages differently if from backlevel version
+#define RMR_MSG_VER    2                       // message version this code was designed to handle
 
                                                                        // environment variable names we'll suss out
 #define ENV_BIND_IF "RMR_BIND_IF"      // the interface to bind to for both normal comma and RTG (0.0.0.0 if missing) 
@@ -73,12 +73,43 @@ typedef struct uta_ctx  uta_ctx_t;
 #define DEF_RTG_PORT   "tcp:4561"              // default port that we accept rtg connections on
 #define DEF_COMM_PORT  "tcp:4560"              // default port we use for normal communications
 
+// -- header length/offset macros which ensure network conversion ----
+#define RMR_HDR_LEN(h)         (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3))   // convert from net byte order
+#define PAYLOAD_OFFSET(h)      (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3))
+#define TRACE_OFFSET(h)                (ntohl(((uta_mhdr_t *)h)->len0))
+#define DATA1_OFFSET(h)                (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1))
+#define DATA2_OFFSET(h)                (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl((uta_mhdr_t *)h)->len2)
+#define RMR_TR_LEN(h)          (ntohl(((uta_mhdr_t *)h)->len1))
+#define RMR_D1_LEN(h)          (ntohl(((uta_mhdr_t *)h)->len2))
+#define RMR_D2_LEN(h)          (ntohl(((uta_mhdr_t *)h)->len3))
+
+#define SET_HDR_LEN(h)         (((uta_mhdr_t *)h)->len0=htonl((int32_t)sizeof(uta_mhdr_t)))            // convert to network byte order on insert
+#define SET_HDR_TR_LEN(h,l)    (((uta_mhdr_t *)h)->len1=htonl((int32_t)l))
+#define SET_HDR_D1_LEN(h,l)    (((uta_mhdr_t *)h)->len2=htonl((int32_t)l))
+#define SET_HDR_D2_LEN(h,l)    (((uta_mhdr_t *)h)->len3=htonl((int32_t)l))
+
+
+#define V1_PAYLOAD_OFFSET(h)   (sizeof(uta_v1mhdr_t))
+
+                                                                               // v2 header flags
+#define HFL_HAS_TRACE  0x01                    // Trace data is populated
+#define HFL_SUBID              0x02                    // subscription ID is populated
+
 /*
        Message header; interpreted by the other side, but never seen by
        the user application.
 
        DANGER: Add new fields AT THE END of the struct. Adding them any where else
                        will break any code that is currently running.
+
+       The transport layer buffer allocated will be divided this way:
+               | RMr header | Trace data | data1 | data2 | User paylaod |
+
+               Len 0 is the length of the RMr header
+               Len 1 is the length of the trace data
+               Len 2 and 3 are lengths of data1 and data2 and are unused at the moment
+
+       To point at the payload, we take the address of the header and add all 4 lengths.
 */
 typedef struct {
        int32_t mtype;                                          // message type  ("long" network integer)
@@ -86,11 +117,31 @@ typedef struct {
        int32_t rmr_ver;                                        // our internal message version number
        unsigned char xid[RMR_MAX_XID];         // space for user transaction id or somesuch
        unsigned char sid[RMR_MAX_SID];         // sender ID for return to sender needs
-       unsigned char src[RMR_MAX_SRC];         // name of the sender (source)
+       unsigned char src[RMR_MAX_SRC];         // name:port of the sender (source)
        unsigned char meid[RMR_MAX_MEID];       // managed element id.
        struct timespec ts;                                     // timestamp ???
+
+                                                                               // V2 extension
+       int32_t flags;                                          // HFL_* constants      
+       int32_t len0;                                           // length of the RMr header data
+       int32_t len1;                                           // length of the tracing data
+       int32_t len2;                                           // length of data 1 (d1)
+       int32_t len3;                                           // length of data 2 (d2)
+
 } uta_mhdr_t;
 
+
+typedef struct {                                               // old (inflexible) v1 header
+       int32_t mtype;                                          // message type  ("long" network integer)
+       int32_t plen;                                           // payload length
+       int32_t rmr_ver;                                        // our internal message version number
+       unsigned char xid[RMR_MAX_XID];         // space for user transaction id or somesuch
+       unsigned char sid[RMR_MAX_SID];         // sender ID for return to sender needs
+       unsigned char src[16];                          // name of the sender (source) (old size was 16)
+       unsigned char meid[RMR_MAX_MEID];       // managed element id.
+       struct timespec ts;                                     // timestamp ???
+} uta_v1mhdr_t;
+
 /*
        Round robin group.
 */
@@ -138,8 +189,6 @@ typedef struct {
        int             naddrs;                 // num actually used
 } if_addrs_t;
 
-
-
 // --------------- ring things  -------------------------------------------------
 typedef struct ring {
        uint16_t head;                          // index of the head of the ring (insert point)
index 3b5d3d3..9c00675 100644 (file)
@@ -59,6 +59,9 @@ struct uta_ctx {
        int     flags;                                  // CTXFL_ constants
        int nrtele;                                     // number of elements in the routing table
        int     nn_sock;                                // our general listen socket
+       int     trace_data_len;                 // len of tracing data that sits just past header (0 means none)
+       int     d1_len;                                 // lengths for additional post header, pre payload data areas
+       int d2_len;
        route_table_t* rtable;          // the active route table
        route_table_t* old_rtable;      // the previously used rt, sits here to allow for draining
        route_table_t* new_rtable;      // route table under construction
index 3fe4247..2900117 100644 (file)
@@ -154,7 +154,7 @@ extern int rmr_payload_size( rmr_mbuf_t* msg ) {
        }
 
        errno = 0;
-       return msg->alloc_len - sizeof( uta_mhdr_t );                                           // figure size should we not have a msg buffer
+       return msg->alloc_len - RMR_HDR_LEN( msg->header );                     // transport buffer less header and other data bits
 }
 
 /*
index cddb662..44e00ea 100644 (file)
@@ -69,6 +69,7 @@
 */
 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state ) {
        int     mlen;
+       uta_mhdr_t*     hdr;
 
        mlen = sizeof( uta_mhdr_t );                                            // figure size should we not have a msg buffer
        mlen += (size > 0 ? size  : ctx->max_plen);                     // add user requested size or size set during init
@@ -90,10 +91,16 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
                exit( 1 );
        }
 
-       ((uta_mhdr_t *) msg->header)->rmr_ver = RMR_MSG_VER;    // version info should we need to recognised old style messages someday
+       hdr = (uta_mhdr_t *) msg->header;
+       hdr->rmr_ver = htonl( RMR_MSG_VER );                                                            // current version
+       SET_HDR_LEN( hdr );
+       SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
+       //SET_HDR_D1_LEN( hdr, ctx->d1_len );                                   // moot until we actually need these data areas
+       //SET_HDR_D2_LEN( hdr, ctx->d1_len );
+
        msg->len = 0;                                                                                   // length of data in the payload
        msg->alloc_len = mlen;                                                                  // length of allocated payload
-       msg->payload = msg->header + sizeof( uta_mhdr_t );              // point past header to payload (single buffer allocation above)
+       msg->payload = msg->header + PAYLOAD_OFFSET( hdr );             // point at the payload in transport
        msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                                                // point at transaction id in header area
        msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
        msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
@@ -128,14 +135,15 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
                exit( 1 );
        }
 
+       memcpy( nm->header, old_msg->header, RMR_HDR_LEN( old_msg->header ) );     // copy complete header, trace and other data
+
        nm->mtype = old_msg->mtype;
        nm->len = old_msg->len;                                                                 // length of data in the payload
        nm->alloc_len = mlen;                                                                   // length of allocated payload
-       nm->payload = nm->header + sizeof( uta_mhdr_t );                // point past header to payload (single buffer allocation above)
+       nm->payload = nm->header + PAYLOAD_OFFSET( nm->header );
        nm->xaction = ((uta_mhdr_t *)nm->header)->xid;                  // point at transaction id in header area
        nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
        nm->flags |= MFL_ZEROCOPY;                                                              // this is a zerocopy sendable message
-       memcpy( ((uta_mhdr_t *)nm->header)->src, ((uta_mhdr_t *)old_msg->header)->src, RMR_MAX_SID );
        memcpy( nm->payload, old_msg->payload, old_msg->len );
 
        return nm;
@@ -166,14 +174,14 @@ static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
        if( msg->state > (int) sizeof( uta_mhdr_t ) ) {                                         // we need more than just a header here
                hdr = (uta_mhdr_t *) msg->header;
                msg->len = ntohl( hdr->plen );                                          // length of data in the payload (likely < payload size)
-               if( msg->len > msg->state - sizeof( uta_mhdr_t ) ) {
-                       fprintf( stderr, "[WARN] rmr_rcv indicated payload length < rcvd payload: expected %d got %ld\n", 
-                               msg->len, msg->state - sizeof( uta_mhdr_t ) );
+               if( msg->len > msg->state - RMR_HDR_LEN( hdr ) ) {
+                       msg->state = RMR_ERR_TRUNC;
+                       msg->len = msg->state - RMR_HDR_LEN( hdr );
                }
                msg->mtype = ntohl( hdr->mtype );                                                               // capture and convert from network order to local order
                msg->state = RMR_OK;
                msg->flags |= MFL_ADDSRC;                                                                               // turn on so if user app tries to send this buffer we reset src
-               msg->payload = msg->header + sizeof( uta_mhdr_t );
+               msg->payload = msg->header + PAYLOAD_OFFSET( msg->header );
                msg->xaction = &hdr->xid[0];                                                    // provide user with ref to fixed space xaction id
                if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n", 
                                msg->mtype, msg->state, msg->len,  msg->payload - (unsigned char *) msg->header );
index 83db500..29be7d3 100644 (file)
@@ -70,6 +70,9 @@ struct uta_ctx {
        int     flags;                                  // CTXFL_ constants
        int nrtele;                                     // number of elements in the routing table
        int send_retries;                       // number of retries send_msg() should attempt if eagain/timeout indicated by nng
+       int     trace_data_len;                 // number of bytes to allocate in header for trace data
+       int d1_len;                                     // extra header data 1 length   (future)
+       int d2_len;                                     // extra header data 2 length   (future)
        nng_socket      nn_sock;                // our general listen socket
        route_table_t* rtable;          // the active route table
        route_table_t* old_rtable;      // the previously used rt, sits here to allow for draining
index 9a9a043..af8ce6f 100644 (file)
@@ -104,7 +104,7 @@ extern int rmr_payload_size( rmr_mbuf_t* msg ) {
        }
 
        errno = 0;
-       return msg->alloc_len - sizeof( uta_mhdr_t );                                           // figure size should we not have a msg buffer
+       return msg->alloc_len - RMR_HDR_LEN( msg->header );                             // allocated transport size less the header and other data bits
 }
 
 /*
@@ -575,8 +575,8 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        int             state;
 
        if( ! announced ) {
-               fprintf( stderr, "[INFO] ric message routing library on NNG (%s %s.%s.%s built: %s)\n", 
-                       QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
+               fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d (%s %s.%s.%s built: %s)\n", 
+                       RMR_MSG_VER, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
                announced = 1;
        }
 
@@ -596,17 +596,11 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
        ctx->mring = uta_mk_ring( 128 );                                // message ring to hold asynch msgs received while waiting for call response
 
-       ctx->max_plen = RMR_MAX_RCV_BYTES + sizeof( uta_mhdr_t );               // default max buffer size
+       ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
        if( max_msg_size > 0 ) {
-               if( max_msg_size <= ctx->max_plen ) {                                           // user defined len can be smaller
-                       ctx->max_plen = max_msg_size;
-               } else {
-                       fprintf( stderr, "[WRN] rmr_init: attempt to set max payload len > than allowed maximum; capped at %d bytes\n", ctx->max_plen );
-               }
+               ctx->max_plen = max_msg_size;
        }
 
-       ctx->max_mlen = ctx->max_plen + sizeof( uta_mhdr_t );
-
        // we're using a listener to get rtg updates, so we do NOT need this.
        //uta_lookup_rtg( ctx );                                                        // attempt to fill in rtg info; rtc will handle missing values/errors
 
@@ -719,7 +713,7 @@ extern int rmr_get_rcvfd( void* vctx ) {
        }
 
        if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
-               fprintf( stderr, ">>> cannot get recv fd: %s\n", nng_strerror( state ) );
+               fprintf( stderr, "[WRN] rmr cannot get recv fd: %s\n", nng_strerror( state ) );
                return -1;
        }
 
index cfea829..369c68e 100644 (file)
@@ -107,10 +107,10 @@ static inline int xlate_nng_state( int state, int def_state ) {
                nng messages.
 */
 static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state ) {
-       size_t          mlen;
+       size_t          mlen;                   // size of the transport buffer that we'll allocate
        uta_mhdr_t*     hdr;                    // convenience pointer
 
-       mlen = sizeof( uta_mhdr_t );                                            // figure size should we not have a msg buffer
+       mlen = sizeof( uta_mhdr_t ) + ctx->trace_data_len + ctx->d1_len + ctx->d2_len;  // start with header and trace/data lengths
        mlen += (size > 0 ? size  : ctx->max_plen);                     // add user requested size or size set during init
 
        if( msg == NULL ) {
@@ -131,12 +131,17 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
        }
 
        msg->header = nng_msg_body( msg->tp_buf );
+       memset( msg->header, 0, sizeof( uta_mhdr_t ) );                         // ensure no junk in the header area
        if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
-               hdr->rmr_ver = RMR_MSG_VER;                                                             // version info should we need to recognised old style messages someday
+               hdr->rmr_ver = htonl( RMR_MSG_VER );                                    // set current version
+               SET_HDR_LEN( hdr );                                                                             // ensure these are converted to net byte order
+               SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
+               //SET_HDR_D1_LEN( hdr, ctx->d1_len );                                   // no need until we start using them
+               //SET_HDR_D2_LEN( hdr, ctx->d2_len );
        }
        msg->len = 0;                                                                                   // length of data in the payload
-       msg->alloc_len = mlen;                                                                  // length of allocated payload
-       msg->payload = msg->header + sizeof( uta_mhdr_t );              // point past header to payload (single buffer allocation above)
+       msg->alloc_len = mlen;                                                                  // length of allocated transport buffer
+       msg->payload = msg->header + PAYLOAD_OFFSET( hdr );             // past header, trace and other data bits
        msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
        msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
        msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
@@ -185,21 +190,59 @@ static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) {
        The alen parm is the assumed allocated length; assumed because it's a value likely
        to have come from nng receive and the actual alloc len might be larger, but we
        can only assume this is the total usable space.
+
+       This function returns the message with an error state set if it detects that the
+       received message might have been truncated.  Check is done here as the calculation
+       is somewhat based on header version.
 */
 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
-       uta_mhdr_t* hdr;
+       uta_mhdr_t* hdr;                                // current header
+       uta_v1mhdr_t* v1hdr;                    // version 1 header
+       int ver;
+       int     hlen;                                           // header len to use for a truncation check
 
        msg->header = nng_msg_body( msg->tp_buf );                              // header is the start of the transport buffer
+       v1hdr = (uta_v1mhdr_t *) msg->header;                                   // v1 will always allow us to suss out the version
 
-       hdr = (uta_mhdr_t *) msg->header;
-       hdr->rmr_ver = RMR_MSG_VER;                                                             // version info should we need to recognised old style messages someday
-       msg->len = ntohl( hdr->plen );                                                  // length sender says is in the payload (received length could be larger)
-       msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header)
-       msg->payload = msg->header + sizeof( uta_mhdr_t );              // point past header to payload (single buffer allocation above)
-       msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
-       msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
-       msg->mtype = ntohl( hdr->mtype );                                                               // capture and convert from network order to local order
-       msg->state = RMR_OK;
+       if( v1hdr->rmr_ver == 1 ) {                     // bug in verion 1 didn't encode the version in network byte order 
+               ver = 1;
+               v1hdr->rmr_ver = htonl( 1 );            // save it correctly in case we clone the message
+       } else {
+               ver = ntohl( v1hdr->rmr_ver );
+       }
+
+       switch( ver ) {
+               case 1:
+                       msg->len = ntohl( v1hdr->plen );                                                        // length sender says is in the payload (received length could be larger)
+                       msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
+                       msg->payload = msg->header + sizeof( uta_v1mhdr_t );    // point past header to payload (single buffer allocation above)
+
+                       msg->xaction = &v1hdr->xid[0];                                                  // point at transaction id in header area
+                       msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
+                       msg->mtype = ntohl( v1hdr->mtype );                                             // capture and convert from network order to local order
+                       msg->state = RMR_OK;
+                       hlen = sizeof( uta_v1mhdr_t );
+                       break;
+
+               default:                                                                                                        // current version always lands here
+                       hdr = (uta_mhdr_t *) msg->header;
+                       msg->len = ntohl( hdr->plen );                                                  // length sender says is in the payload (received length could be larger)
+                       msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
+
+                       msg->payload = msg->header + PAYLOAD_OFFSET( hdr );             // past header, trace and other data bits
+                       msg->xaction = &hdr->xid[0];                                                    // point at transaction id in header area
+                       msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
+                       msg->mtype = ntohl( hdr->mtype );                                               // capture and convert from network order to local order
+                       hlen = RMR_HDR_LEN( hdr );                                                              // len to use for truncated check later         
+                       break;
+       }
+
+       if( msg->len > (msg->alloc_len - hlen ) ) {                                             // more than we should have had room for; error
+               msg->state = RMR_ERR_TRUNC;
+               msg->len = msg->alloc_len -  hlen;                                                      // adjust len down so user app doesn't overrun
+       } else {
+               msg->state = RMR_OK;
+       }
 }
 
 /*
@@ -209,6 +252,8 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
        rmr_mbuf_t* nm;                 // new message buffer
        size_t  mlen;
        int state;
+       uta_mhdr_t* hdr;
+       uta_v1mhdr_t* v1hdr;
 
        nm = (rmr_mbuf_t *) malloc( sizeof *nm );
        if( nm == NULL ) {
@@ -223,16 +268,29 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
                exit( 1 );
        }
 
-       nm->header = nng_msg_body( nm->tp_buf );
+       nm->header = nng_msg_body( nm->tp_buf );                                // set and copy the header from old message
+       v1hdr = (uta_v1mhdr_t *) old_msg->header;               // v1 will work to dig header out of any version
+       switch( ntohl( v1hdr->rmr_ver ) ) {
+               case 1:
+                       memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
+                       nm->payload = (void *) v1hdr + sizeof( *v1hdr );
+                       break;
+
+               default:                                                                                        // current message always caught  here
+                       hdr = nm->header;
+                       memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) );         // copy complete header, trace and other data
+                       nm->payload = nm->header + PAYLOAD_OFFSET( hdr );               // point at the payload
+                       break;
+       }
+               
+       // --- these are all version agnostic -----------------------------------
        nm->mtype = old_msg->mtype;
        nm->len = old_msg->len;                                                                 // length of data in the payload
        nm->alloc_len = mlen;                                                                   // length of allocated payload
-       nm->payload = nm->header + sizeof( uta_mhdr_t );                // point past header to payload (single buffer allocation above)
-       nm->xaction = ((uta_mhdr_t *)nm->header)->xid;                  // point at transaction id in header area
+
+       nm->xaction = hdr->xid;                                                                 // reference xaction
        nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
        nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
-
-       memcpy( ((uta_mhdr_t *)nm->header)->src, ((uta_mhdr_t *)old_msg->header)->src, RMR_MAX_SID );
        memcpy( nm->payload, old_msg->payload, old_msg->len );
 
        return nm;
@@ -276,16 +334,14 @@ static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
 
                msg->tp_buf = NULL;
        } else {
-               //msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK );                    // will abort on failure, no need to check
                msg = alloc_mbuf( ctx, RMR_OK );                                // msg without a transport buffer
        }
 
+       msg->alloc_len = 0;
        msg->len = 0;
        msg->payload = NULL;
        msg->xaction = NULL;
 
-       //rsize = msg->alloc_len;                                                                                                               // set to max, and we'll get len back here too
-       //msg->state = nng_recv( ctx->nn_sock, msg->header, &rsize, NO_FLAGS );         // total space (header + payload len) allocated
        msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                  // blocks hard until received
        if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
                return msg;
@@ -297,20 +353,22 @@ static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
        }
 
        rsize = nng_msg_len( msg->tp_buf );
-       if( rsize >= sizeof( uta_mhdr_t ) ) {                                                           // we need at least a full header here
-
-               ref_tpbuf( msg, rsize );                                                // point payload, header etc to the just received tp buffer
+       if( rsize >= sizeof( uta_v1mhdr_t ) ) {                 // we need at least a full type 1 (smallest) header here
+               ref_tpbuf( msg, rsize );                                        // point payload, header etc to the data and set trunc error if needed
                hdr = (uta_mhdr_t *) msg->header;
-               msg->flags |= MFL_ADDSRC;                               // turn on so if user app tries to send this buffer we reset src
-               if( msg->len > (msg->alloc_len - sizeof( uta_mhdr_t )) ) {              // way more than we should have had room for; error
-                       msg->state = RMR_ERR_TRUNC;
-               }
+               msg->flags |= MFL_ADDSRC;                                       // turn on so if user app tries to send this buffer we reset src
+
 
                if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n", 
                                msg->mtype, msg->state, msg->len,  msg->payload - (unsigned char *) msg->header );
        } else {
-               msg->len = 0;
                msg->state = RMR_ERR_EMPTY;
+               msg->len = 0;
+               msg->alloc_len = rsize;
+               msg->payload = NULL;
+               msg->xaction = NULL;
+               msg->flags |= MFL_ZEROCOPY;                                                                     // this is a zerocopy sendable message
+               msg->mtype = -1;
        }
 
        return msg;
@@ -386,8 +444,6 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock
        errno = 0;
        msg->state = RMR_OK;
        if( msg->flags & MFL_ZEROCOPY ) {                                                                       // faster sending with zcopy buffer
-               //nng_flags |= NNG_FLAG_ALLOC;                                                                  // indicate a zc buffer that nng is expected to free
-
                do {
                        if( (state = nng_sendmsg( nn_sock, (nng_msg *) msg->tp_buf, nng_flags )) != 0 ) {               // must check and retry some if transient failure
                                msg->state = state;
@@ -409,6 +465,7 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock
                        }
                } while( state && retries > 0 );
        } else {
+               // future: this should not happen as all buffers we deal with are zc buffers; might make sense to remove the test and else
                msg->state = RMR_ERR_SENDFAILED;
                errno = ENOTSUP;
                return msg;
@@ -434,8 +491,6 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock
                        msg->state = RMR_ERR_RETRY;                                     // errno will have nano reason
                } else {
                        msg->state = xlate_nng_state( msg->state, RMR_ERR_SENDFAILED );         // xlate to our state and set errno
-                       //errno = -msg->state;
-                       //msg->state = RMR_ERR_SENDFAILED;                                      // errno will have nano reason
                }
 
                if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );