From 68c5cf1104e89f5c43786a3e48f5c6a1e757f59f Mon Sep 17 00:00:00 2001 From: "E. Scott Daniels" Date: Fri, 12 Apr 2019 15:14:40 +0000 Subject: [PATCH] feat(msgs): Add header v2 support Change-Id: Iebfb91b0b42d99e1cc54fa012d1bdfc24d7d57c2 Signed-off-by: E. Scott Daniels Nanomsg changes for v2 header Change-Id: I2d01c346e743556b04c43db9885132ee1107c059 Signed-off-by: E. Scott Daniels Fix warning message Change-Id: Iafa54e4701b00ba7b7e2ee88b531c66482f40d46 Signed-off-by: E. Scott Daniels Remvoe test/debug/RBF things Change-Id: I40feef30b9622fc42299ab50b5badc558796e910 Signed-off-by: E. Scott Daniels Add tag based versioning to cmake file Change-Id: I2a37c4cd793ff298af37898371025d1b48dd9990 Signed-off-by: E. Scott Daniels --- CMakeLists.txt | 69 +++++++++++++++++---- src/common/include/rmr.h | 3 +- src/common/include/rmr_agnostic.h | 57 +++++++++++++++-- src/nanomsg/include/rmr_private.h | 3 + src/nanomsg/src/rmr.c | 2 +- src/nanomsg/src/sr_static.c | 24 +++++--- src/nng/include/rmr_nng_private.h | 3 + src/nng/src/rmr_nng.c | 18 ++---- src/nng/src/sr_nng_static.c | 125 +++++++++++++++++++++++++++----------- 9 files changed, 230 insertions(+), 74 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 9371ce3..c34f17a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -22,9 +22,9 @@ project( rmr LANGUAGES C ) cmake_minimum_required( VERSION 3.5 ) -set( major_version "1" ) +set( major_version "0" ) # automatically populated from git tag later set( minor_version "0" ) -set( patch_level "16" ) +set( patch_level "0" ) set( install_root "${CMAKE_INSTALL_PREFIX}" ) set( install_lib "lib" ) @@ -43,17 +43,42 @@ if( NOT BUILD_LIB ) set( BUILD_LIB lib ) endif() -# ---------------- set version info (not perfect, but better than nothing) ---- +# ---------------- extract some things from git ------------------------------ + +# commit id for the version string execute_process( COMMAND bash -c "git rev-parse --short HEAD|awk '{printf\"%s\", $0}'" OUTPUT_VARIABLE git_id ) +# version information for library names and version string +execute_process( + COMMAND bash -c "git describe --tags --abbrev=0 HEAD 2>/dev/null | awk -v tag=0.0.4095 ' { tag=$1 } END{ print tag suffix }'|sed 's/\\./;/g' " + OUTPUT_VARIABLE mmp_version_str + ERROR_QUIET OUTPUT_STRIP_TRAILING_WHITESPACE +) + +# extra indicator to show that the build was based on modified file(s) and not the true commit +# (no hope of reproducing the exact library for debugging). Used only for the internal version +# string. +execute_process( + COMMAND bash -c "git diff --shortstat|awk -v fmt=%s -v r=-rotten '{ s=r } END { printf( fmt, s ) }'" + OUTPUT_VARIABLE spoiled_str +) + +set( mmp_version ${mmp_version_str} ) +list( GET mmp_version 0 major_version ) +list( GET mmp_version 1 minor_version ) +list( GET mmp_version 2 patch_level ) + +message( "+++ building ${major_version}.${minor_version}.${patch_level}${spoiled_str}" ) + +# define constants used in the version string add_definitions( -DGIT_ID=${git_id} -DMAJOR_VER=${major_version} -DMINOR_VER=${minor_version} - -DPATCH_VER=${patch_level} + -DPATCH_VER=${patch_level}${spoiled_str} ) @@ -153,13 +178,22 @@ add_subdirectory( doc ) # this will auto skip if {X}fm is not available add_library( rmr_shared SHARED "$;$" ) add_library( rmr_static STATIC "$;$" ) -# both libraries to be named with librmr prefix -set_target_properties( rmr_shared PROPERTIES OUTPUT_NAME "rmr" ) -set_target_properties( rmr_static PROPERTIES OUTPUT_NAME "rmr" ) +# both libraries to be named with librmr prefix and given version numbers with sym links +set_target_properties( rmr_shared + PROPERTIES + OUTPUT_NAME "rmr" + SOVERSION ${major_version} + VERSION ${major_version}.${minor_version}.${patch_level} ) + +set_target_properties( rmr_static + PROPERTIES + OUTPUT_NAME "rmr" + SOVERSION ${major_version} + VERSION ${major_version}.${minor_version}.${patch_level} ) # NNG based library (librmr_nng ) -# library is built by pulling objects from nng and common subdirs +# library is built by pulling objects from nng and common subdirs. # add_library( rmr_nng_shared SHARED "$;$" ) add_library( rmr_nng_static STATIC "$;$" ) @@ -173,10 +207,18 @@ if( need_ext ) endif() -# both libraries to be named with librmr_nng prefix -# -set_target_properties( rmr_nng_shared PROPERTIES OUTPUT_NAME "rmr_nng" ) -set_target_properties( rmr_nng_static PROPERTIES OUTPUT_NAME "rmr_nng" ) +# both libraries to be named with librmr_nng prefix and given version numbers with sym links +set_target_properties( rmr_nng_shared + PROPERTIES + OUTPUT_NAME "rmr_nng" + SOVERSION ${major_version} + VERSION ${major_version}.${minor_version}.${patch_level} ) + +set_target_properties( rmr_nng_static + PROPERTIES + OUTPUT_NAME "rmr_nng" + SOVERSION ${major_version} + VERSION ${major_version}.${minor_version}.${patch_level} ) # if( APPLE ) @@ -191,6 +233,7 @@ endif() # the moment, there are no header files specific to either nano or nng, so to the public # header directive is moot, but needed if some day there is one. # +#install( TARGETS rmr_nng_shared;rmr_nng_static;rmr_shared;rmr_static;rmr_nng_shared_mm EXPORT LibraryConfig install( TARGETS rmr_nng_shared;rmr_nng_static;rmr_shared;rmr_static EXPORT LibraryConfig ARCHIVE DESTINATION ${install_lib} LIBRARY DESTINATION ${install_lib} @@ -231,7 +274,7 @@ IF( EXISTS "${CMAKE_ROOT}/Modules/CPack.cmake" ) set( CPACK_PACKAGE_VERSION_MAJOR "${major_version}" ) set( CPACK_PACKAGE_VERSION_MINOR "${minor_version}" ) set( CPACK_PACKAGE_VERSION_PATCH "${patch_level}" ) - set( CPACK_PACKAGE_FILE_NAME "${CMAKE_PROJECT_NAME}_${major_version}.${minor_version}.${CPACK_PACKAGE_VERSION_PATCH}" ) + set( CPACK_PACKAGE_FILE_NAME "${CMAKE_PROJECT_NAME}_${major_version}.${minor_version}.${CPACK_PACKAGE_VERSION_PATCH}${spoiled_str}" ) set( CPACK_SOURCE_PACKAGE_FILE_NAME "vric${CMAKE_PROJECT_NAME}_${major_version}.${minor_version}.${CPACK_PACKAGE_VERSION_PATCH}" ) # we build and ship the libraries, so there is NO dependency diff --git a/src/common/include/rmr.h b/src/common/include/rmr.h index 2f58eaf..9ef563f 100644 --- a/src/common/include/rmr.h +++ b/src/common/include/rmr.h @@ -38,7 +38,7 @@ extern "C" { #define RMR_MAX_XID 32 // space in header reserved for user xaction id #define RMR_MAX_SID 32 // spece in header reserved for sender id #define RMR_MAX_MEID 32 // spece in header reserved for managed element id -#define RMR_MAX_SRC 16 // max length of hostname +#define RMR_MAX_SRC 64 // max length of hostname (which could be IPv6 addr with [...]:port so more than the 39 bytes of a plain addr #define RMR_MAX_RCV_BYTES 4096 // max bytes we support in a receive message // various flags for function calls @@ -130,6 +130,7 @@ extern int rmr_send_to( void* vctx, int time ); // DEPRECATED -- replaced with // --- uta compatability defs if needed user should define UTA_COMPAT ---------------------------------- #ifdef UTA_COMPAT +#pragma message( "use of UTA_COMPAT is deprecated and soon to be removed" ) #define UTA_MAX_XID RMR_MAX_XID #define UTA_MAX_SID RMR_MAX_SID diff --git a/src/common/include/rmr_agnostic.h b/src/common/include/rmr_agnostic.h index 687330b..fc45429 100644 --- a/src/common/include/rmr_agnostic.h +++ b/src/common/include/rmr_agnostic.h @@ -44,7 +44,7 @@ typedef struct uta_ctx uta_ctx_t; #define QUOTE_DEF(a) QUOTE(a) // allow a #define value to be quoted (e.g. QUOTE(MAJOR_VERSION) ) -#define RMR_MSG_VER 1 // potental to treat messages differently if from backlevel version +#define RMR_MSG_VER 2 // message version this code was designed to handle // environment variable names we'll suss out #define ENV_BIND_IF "RMR_BIND_IF" // the interface to bind to for both normal comma and RTG (0.0.0.0 if missing) @@ -73,12 +73,43 @@ typedef struct uta_ctx uta_ctx_t; #define DEF_RTG_PORT "tcp:4561" // default port that we accept rtg connections on #define DEF_COMM_PORT "tcp:4560" // default port we use for normal communications +// -- header length/offset macros which ensure network conversion ---- +#define RMR_HDR_LEN(h) (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3)) // convert from net byte order +#define PAYLOAD_OFFSET(h) (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3)) +#define TRACE_OFFSET(h) (ntohl(((uta_mhdr_t *)h)->len0)) +#define DATA1_OFFSET(h) (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)) +#define DATA2_OFFSET(h) (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl((uta_mhdr_t *)h)->len2) +#define RMR_TR_LEN(h) (ntohl(((uta_mhdr_t *)h)->len1)) +#define RMR_D1_LEN(h) (ntohl(((uta_mhdr_t *)h)->len2)) +#define RMR_D2_LEN(h) (ntohl(((uta_mhdr_t *)h)->len3)) + +#define SET_HDR_LEN(h) (((uta_mhdr_t *)h)->len0=htonl((int32_t)sizeof(uta_mhdr_t))) // convert to network byte order on insert +#define SET_HDR_TR_LEN(h,l) (((uta_mhdr_t *)h)->len1=htonl((int32_t)l)) +#define SET_HDR_D1_LEN(h,l) (((uta_mhdr_t *)h)->len2=htonl((int32_t)l)) +#define SET_HDR_D2_LEN(h,l) (((uta_mhdr_t *)h)->len3=htonl((int32_t)l)) + + +#define V1_PAYLOAD_OFFSET(h) (sizeof(uta_v1mhdr_t)) + + // v2 header flags +#define HFL_HAS_TRACE 0x01 // Trace data is populated +#define HFL_SUBID 0x02 // subscription ID is populated + /* Message header; interpreted by the other side, but never seen by the user application. DANGER: Add new fields AT THE END of the struct. Adding them any where else will break any code that is currently running. + + The transport layer buffer allocated will be divided this way: + | RMr header | Trace data | data1 | data2 | User paylaod | + + Len 0 is the length of the RMr header + Len 1 is the length of the trace data + Len 2 and 3 are lengths of data1 and data2 and are unused at the moment + + To point at the payload, we take the address of the header and add all 4 lengths. */ typedef struct { int32_t mtype; // message type ("long" network integer) @@ -86,11 +117,31 @@ typedef struct { int32_t rmr_ver; // our internal message version number unsigned char xid[RMR_MAX_XID]; // space for user transaction id or somesuch unsigned char sid[RMR_MAX_SID]; // sender ID for return to sender needs - unsigned char src[RMR_MAX_SRC]; // name of the sender (source) + unsigned char src[RMR_MAX_SRC]; // name:port of the sender (source) unsigned char meid[RMR_MAX_MEID]; // managed element id. struct timespec ts; // timestamp ??? + + // V2 extension + int32_t flags; // HFL_* constants + int32_t len0; // length of the RMr header data + int32_t len1; // length of the tracing data + int32_t len2; // length of data 1 (d1) + int32_t len3; // length of data 2 (d2) + } uta_mhdr_t; + +typedef struct { // old (inflexible) v1 header + int32_t mtype; // message type ("long" network integer) + int32_t plen; // payload length + int32_t rmr_ver; // our internal message version number + unsigned char xid[RMR_MAX_XID]; // space for user transaction id or somesuch + unsigned char sid[RMR_MAX_SID]; // sender ID for return to sender needs + unsigned char src[16]; // name of the sender (source) (old size was 16) + unsigned char meid[RMR_MAX_MEID]; // managed element id. + struct timespec ts; // timestamp ??? +} uta_v1mhdr_t; + /* Round robin group. */ @@ -138,8 +189,6 @@ typedef struct { int naddrs; // num actually used } if_addrs_t; - - // --------------- ring things ------------------------------------------------- typedef struct ring { uint16_t head; // index of the head of the ring (insert point) diff --git a/src/nanomsg/include/rmr_private.h b/src/nanomsg/include/rmr_private.h index 3b5d3d3..9c00675 100644 --- a/src/nanomsg/include/rmr_private.h +++ b/src/nanomsg/include/rmr_private.h @@ -59,6 +59,9 @@ struct uta_ctx { int flags; // CTXFL_ constants int nrtele; // number of elements in the routing table int nn_sock; // our general listen socket + int trace_data_len; // len of tracing data that sits just past header (0 means none) + int d1_len; // lengths for additional post header, pre payload data areas + int d2_len; route_table_t* rtable; // the active route table route_table_t* old_rtable; // the previously used rt, sits here to allow for draining route_table_t* new_rtable; // route table under construction diff --git a/src/nanomsg/src/rmr.c b/src/nanomsg/src/rmr.c index 3fe4247..2900117 100644 --- a/src/nanomsg/src/rmr.c +++ b/src/nanomsg/src/rmr.c @@ -154,7 +154,7 @@ extern int rmr_payload_size( rmr_mbuf_t* msg ) { } errno = 0; - return msg->alloc_len - sizeof( uta_mhdr_t ); // figure size should we not have a msg buffer + return msg->alloc_len - RMR_HDR_LEN( msg->header ); // transport buffer less header and other data bits } /* diff --git a/src/nanomsg/src/sr_static.c b/src/nanomsg/src/sr_static.c index cddb662..44e00ea 100644 --- a/src/nanomsg/src/sr_static.c +++ b/src/nanomsg/src/sr_static.c @@ -69,6 +69,7 @@ */ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state ) { int mlen; + uta_mhdr_t* hdr; mlen = sizeof( uta_mhdr_t ); // figure size should we not have a msg buffer mlen += (size > 0 ? size : ctx->max_plen); // add user requested size or size set during init @@ -90,10 +91,16 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s exit( 1 ); } - ((uta_mhdr_t *) msg->header)->rmr_ver = RMR_MSG_VER; // version info should we need to recognised old style messages someday + hdr = (uta_mhdr_t *) msg->header; + hdr->rmr_ver = htonl( RMR_MSG_VER ); // current version + SET_HDR_LEN( hdr ); + SET_HDR_TR_LEN( hdr, ctx->trace_data_len ); + //SET_HDR_D1_LEN( hdr, ctx->d1_len ); // moot until we actually need these data areas + //SET_HDR_D2_LEN( hdr, ctx->d1_len ); + msg->len = 0; // length of data in the payload msg->alloc_len = mlen; // length of allocated payload - msg->payload = msg->header + sizeof( uta_mhdr_t ); // point past header to payload (single buffer allocation above) + msg->payload = msg->header + PAYLOAD_OFFSET( hdr ); // point at the payload in transport msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area msg->state = state; // fill in caller's state (likely the state of the last operation) msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message @@ -128,14 +135,15 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg ) { exit( 1 ); } + memcpy( nm->header, old_msg->header, RMR_HDR_LEN( old_msg->header ) ); // copy complete header, trace and other data + nm->mtype = old_msg->mtype; nm->len = old_msg->len; // length of data in the payload nm->alloc_len = mlen; // length of allocated payload - nm->payload = nm->header + sizeof( uta_mhdr_t ); // point past header to payload (single buffer allocation above) + nm->payload = nm->header + PAYLOAD_OFFSET( nm->header ); nm->xaction = ((uta_mhdr_t *)nm->header)->xid; // point at transaction id in header area nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation) nm->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message - memcpy( ((uta_mhdr_t *)nm->header)->src, ((uta_mhdr_t *)old_msg->header)->src, RMR_MAX_SID ); memcpy( nm->payload, old_msg->payload, old_msg->len ); return nm; @@ -166,14 +174,14 @@ static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) { if( msg->state > (int) sizeof( uta_mhdr_t ) ) { // we need more than just a header here hdr = (uta_mhdr_t *) msg->header; msg->len = ntohl( hdr->plen ); // length of data in the payload (likely < payload size) - if( msg->len > msg->state - sizeof( uta_mhdr_t ) ) { - fprintf( stderr, "[WARN] rmr_rcv indicated payload length < rcvd payload: expected %d got %ld\n", - msg->len, msg->state - sizeof( uta_mhdr_t ) ); + if( msg->len > msg->state - RMR_HDR_LEN( hdr ) ) { + msg->state = RMR_ERR_TRUNC; + msg->len = msg->state - RMR_HDR_LEN( hdr ); } msg->mtype = ntohl( hdr->mtype ); // capture and convert from network order to local order msg->state = RMR_OK; msg->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src - msg->payload = msg->header + sizeof( uta_mhdr_t ); + msg->payload = msg->header + PAYLOAD_OFFSET( msg->header ); msg->xaction = &hdr->xid[0]; // provide user with ref to fixed space xaction id if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n", msg->mtype, msg->state, msg->len, msg->payload - (unsigned char *) msg->header ); diff --git a/src/nng/include/rmr_nng_private.h b/src/nng/include/rmr_nng_private.h index 83db500..29be7d3 100644 --- a/src/nng/include/rmr_nng_private.h +++ b/src/nng/include/rmr_nng_private.h @@ -70,6 +70,9 @@ struct uta_ctx { int flags; // CTXFL_ constants int nrtele; // number of elements in the routing table int send_retries; // number of retries send_msg() should attempt if eagain/timeout indicated by nng + int trace_data_len; // number of bytes to allocate in header for trace data + int d1_len; // extra header data 1 length (future) + int d2_len; // extra header data 2 length (future) nng_socket nn_sock; // our general listen socket route_table_t* rtable; // the active route table route_table_t* old_rtable; // the previously used rt, sits here to allow for draining diff --git a/src/nng/src/rmr_nng.c b/src/nng/src/rmr_nng.c index 9a9a043..af8ce6f 100644 --- a/src/nng/src/rmr_nng.c +++ b/src/nng/src/rmr_nng.c @@ -104,7 +104,7 @@ extern int rmr_payload_size( rmr_mbuf_t* msg ) { } errno = 0; - return msg->alloc_len - sizeof( uta_mhdr_t ); // figure size should we not have a msg buffer + return msg->alloc_len - RMR_HDR_LEN( msg->header ); // allocated transport size less the header and other data bits } /* @@ -575,8 +575,8 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { int state; if( ! announced ) { - fprintf( stderr, "[INFO] ric message routing library on NNG (%s %s.%s.%s built: %s)\n", - QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ ); + fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d (%s %s.%s.%s built: %s)\n", + RMR_MSG_VER, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ ); announced = 1; } @@ -596,17 +596,11 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { ctx->send_retries = 1; // default is not to sleep at all; RMr will retry about 10K times before returning ctx->mring = uta_mk_ring( 128 ); // message ring to hold asynch msgs received while waiting for call response - ctx->max_plen = RMR_MAX_RCV_BYTES + sizeof( uta_mhdr_t ); // default max buffer size + ctx->max_plen = RMR_MAX_RCV_BYTES; // max user payload lengh if( max_msg_size > 0 ) { - if( max_msg_size <= ctx->max_plen ) { // user defined len can be smaller - ctx->max_plen = max_msg_size; - } else { - fprintf( stderr, "[WRN] rmr_init: attempt to set max payload len > than allowed maximum; capped at %d bytes\n", ctx->max_plen ); - } + ctx->max_plen = max_msg_size; } - ctx->max_mlen = ctx->max_plen + sizeof( uta_mhdr_t ); - // we're using a listener to get rtg updates, so we do NOT need this. //uta_lookup_rtg( ctx ); // attempt to fill in rtg info; rtc will handle missing values/errors @@ -719,7 +713,7 @@ extern int rmr_get_rcvfd( void* vctx ) { } if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) { - fprintf( stderr, ">>> cannot get recv fd: %s\n", nng_strerror( state ) ); + fprintf( stderr, "[WRN] rmr cannot get recv fd: %s\n", nng_strerror( state ) ); return -1; } diff --git a/src/nng/src/sr_nng_static.c b/src/nng/src/sr_nng_static.c index cfea829..369c68e 100644 --- a/src/nng/src/sr_nng_static.c +++ b/src/nng/src/sr_nng_static.c @@ -107,10 +107,10 @@ static inline int xlate_nng_state( int state, int def_state ) { nng messages. */ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state ) { - size_t mlen; + size_t mlen; // size of the transport buffer that we'll allocate uta_mhdr_t* hdr; // convenience pointer - mlen = sizeof( uta_mhdr_t ); // figure size should we not have a msg buffer + mlen = sizeof( uta_mhdr_t ) + ctx->trace_data_len + ctx->d1_len + ctx->d2_len; // start with header and trace/data lengths mlen += (size > 0 ? size : ctx->max_plen); // add user requested size or size set during init if( msg == NULL ) { @@ -131,12 +131,17 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s } msg->header = nng_msg_body( msg->tp_buf ); + memset( msg->header, 0, sizeof( uta_mhdr_t ) ); // ensure no junk in the header area if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) { - hdr->rmr_ver = RMR_MSG_VER; // version info should we need to recognised old style messages someday + hdr->rmr_ver = htonl( RMR_MSG_VER ); // set current version + SET_HDR_LEN( hdr ); // ensure these are converted to net byte order + SET_HDR_TR_LEN( hdr, ctx->trace_data_len ); + //SET_HDR_D1_LEN( hdr, ctx->d1_len ); // no need until we start using them + //SET_HDR_D2_LEN( hdr, ctx->d2_len ); } msg->len = 0; // length of data in the payload - msg->alloc_len = mlen; // length of allocated payload - msg->payload = msg->header + sizeof( uta_mhdr_t ); // point past header to payload (single buffer allocation above) + msg->alloc_len = mlen; // length of allocated transport buffer + msg->payload = msg->header + PAYLOAD_OFFSET( hdr ); // past header, trace and other data bits msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area msg->state = state; // fill in caller's state (likely the state of the last operation) msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message @@ -185,21 +190,59 @@ static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state ) { The alen parm is the assumed allocated length; assumed because it's a value likely to have come from nng receive and the actual alloc len might be larger, but we can only assume this is the total usable space. + + This function returns the message with an error state set if it detects that the + received message might have been truncated. Check is done here as the calculation + is somewhat based on header version. */ static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen ) { - uta_mhdr_t* hdr; + uta_mhdr_t* hdr; // current header + uta_v1mhdr_t* v1hdr; // version 1 header + int ver; + int hlen; // header len to use for a truncation check msg->header = nng_msg_body( msg->tp_buf ); // header is the start of the transport buffer + v1hdr = (uta_v1mhdr_t *) msg->header; // v1 will always allow us to suss out the version - hdr = (uta_mhdr_t *) msg->header; - hdr->rmr_ver = RMR_MSG_VER; // version info should we need to recognised old style messages someday - msg->len = ntohl( hdr->plen ); // length sender says is in the payload (received length could be larger) - msg->alloc_len = alen; // length of whole tp buffer (including header) - msg->payload = msg->header + sizeof( uta_mhdr_t ); // point past header to payload (single buffer allocation above) - msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area - msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message - msg->mtype = ntohl( hdr->mtype ); // capture and convert from network order to local order - msg->state = RMR_OK; + if( v1hdr->rmr_ver == 1 ) { // bug in verion 1 didn't encode the version in network byte order + ver = 1; + v1hdr->rmr_ver = htonl( 1 ); // save it correctly in case we clone the message + } else { + ver = ntohl( v1hdr->rmr_ver ); + } + + switch( ver ) { + case 1: + msg->len = ntohl( v1hdr->plen ); // length sender says is in the payload (received length could be larger) + msg->alloc_len = alen; // length of whole tp buffer (including header, trace and data bits) + msg->payload = msg->header + sizeof( uta_v1mhdr_t ); // point past header to payload (single buffer allocation above) + + msg->xaction = &v1hdr->xid[0]; // point at transaction id in header area + msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message + msg->mtype = ntohl( v1hdr->mtype ); // capture and convert from network order to local order + msg->state = RMR_OK; + hlen = sizeof( uta_v1mhdr_t ); + break; + + default: // current version always lands here + hdr = (uta_mhdr_t *) msg->header; + msg->len = ntohl( hdr->plen ); // length sender says is in the payload (received length could be larger) + msg->alloc_len = alen; // length of whole tp buffer (including header, trace and data bits) + + msg->payload = msg->header + PAYLOAD_OFFSET( hdr ); // past header, trace and other data bits + msg->xaction = &hdr->xid[0]; // point at transaction id in header area + msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message + msg->mtype = ntohl( hdr->mtype ); // capture and convert from network order to local order + hlen = RMR_HDR_LEN( hdr ); // len to use for truncated check later + break; + } + + if( msg->len > (msg->alloc_len - hlen ) ) { // more than we should have had room for; error + msg->state = RMR_ERR_TRUNC; + msg->len = msg->alloc_len - hlen; // adjust len down so user app doesn't overrun + } else { + msg->state = RMR_OK; + } } /* @@ -209,6 +252,8 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg ) { rmr_mbuf_t* nm; // new message buffer size_t mlen; int state; + uta_mhdr_t* hdr; + uta_v1mhdr_t* v1hdr; nm = (rmr_mbuf_t *) malloc( sizeof *nm ); if( nm == NULL ) { @@ -223,16 +268,29 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg ) { exit( 1 ); } - nm->header = nng_msg_body( nm->tp_buf ); + nm->header = nng_msg_body( nm->tp_buf ); // set and copy the header from old message + v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version + switch( ntohl( v1hdr->rmr_ver ) ) { + case 1: + memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header + nm->payload = (void *) v1hdr + sizeof( *v1hdr ); + break; + + default: // current message always caught here + hdr = nm->header; + memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) ); // copy complete header, trace and other data + nm->payload = nm->header + PAYLOAD_OFFSET( hdr ); // point at the payload + break; + } + + // --- these are all version agnostic ----------------------------------- nm->mtype = old_msg->mtype; nm->len = old_msg->len; // length of data in the payload nm->alloc_len = mlen; // length of allocated payload - nm->payload = nm->header + sizeof( uta_mhdr_t ); // point past header to payload (single buffer allocation above) - nm->xaction = ((uta_mhdr_t *)nm->header)->xid; // point at transaction id in header area + + nm->xaction = hdr->xid; // reference xaction nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation) nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message - - memcpy( ((uta_mhdr_t *)nm->header)->src, ((uta_mhdr_t *)old_msg->header)->src, RMR_MAX_SID ); memcpy( nm->payload, old_msg->payload, old_msg->len ); return nm; @@ -276,16 +334,14 @@ static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) { msg->tp_buf = NULL; } else { - //msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK ); // will abort on failure, no need to check msg = alloc_mbuf( ctx, RMR_OK ); // msg without a transport buffer } + msg->alloc_len = 0; msg->len = 0; msg->payload = NULL; msg->xaction = NULL; - //rsize = msg->alloc_len; // set to max, and we'll get len back here too - //msg->state = nng_recv( ctx->nn_sock, msg->header, &rsize, NO_FLAGS ); // total space (header + payload len) allocated msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS ); // blocks hard until received if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) { return msg; @@ -297,20 +353,22 @@ static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) { } rsize = nng_msg_len( msg->tp_buf ); - if( rsize >= sizeof( uta_mhdr_t ) ) { // we need at least a full header here - - ref_tpbuf( msg, rsize ); // point payload, header etc to the just received tp buffer + if( rsize >= sizeof( uta_v1mhdr_t ) ) { // we need at least a full type 1 (smallest) header here + ref_tpbuf( msg, rsize ); // point payload, header etc to the data and set trunc error if needed hdr = (uta_mhdr_t *) msg->header; - msg->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src - if( msg->len > (msg->alloc_len - sizeof( uta_mhdr_t )) ) { // way more than we should have had room for; error - msg->state = RMR_ERR_TRUNC; - } + msg->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src + if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n", msg->mtype, msg->state, msg->len, msg->payload - (unsigned char *) msg->header ); } else { - msg->len = 0; msg->state = RMR_ERR_EMPTY; + msg->len = 0; + msg->alloc_len = rsize; + msg->payload = NULL; + msg->xaction = NULL; + msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message + msg->mtype = -1; } return msg; @@ -386,8 +444,6 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock errno = 0; msg->state = RMR_OK; if( msg->flags & MFL_ZEROCOPY ) { // faster sending with zcopy buffer - //nng_flags |= NNG_FLAG_ALLOC; // indicate a zc buffer that nng is expected to free - do { if( (state = nng_sendmsg( nn_sock, (nng_msg *) msg->tp_buf, nng_flags )) != 0 ) { // must check and retry some if transient failure msg->state = state; @@ -409,6 +465,7 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock } } while( state && retries > 0 ); } else { + // future: this should not happen as all buffers we deal with are zc buffers; might make sense to remove the test and else msg->state = RMR_ERR_SENDFAILED; errno = ENOTSUP; return msg; @@ -434,8 +491,6 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock msg->state = RMR_ERR_RETRY; // errno will have nano reason } else { msg->state = xlate_nng_state( msg->state, RMR_ERR_SENDFAILED ); // xlate to our state and set errno - //errno = -msg->state; - //msg->state = RMR_ERR_SENDFAILED; // errno will have nano reason } if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %d %s\n", (int) msg->state, strerror( msg->state ) ); -- 2.16.6