cmake_minimum_required( VERSION 3.5 )
-set( major_version "1" )
+set( major_version "0" ) # automatically populated from git tag later
set( minor_version "0" )
-set( patch_level "16" )
+set( patch_level "0" )
set( install_root "${CMAKE_INSTALL_PREFIX}" )
set( install_lib "lib" )
set( BUILD_LIB lib )
endif()
-# ---------------- set version info (not perfect, but better than nothing) ----
+# ---------------- extract some things from git ------------------------------
+
+# commit id for the version string
execute_process(
COMMAND bash -c "git rev-parse --short HEAD|awk '{printf\"%s\", $0}'"
OUTPUT_VARIABLE git_id
)
+# version information for library names and version string
+execute_process(
+ COMMAND bash -c "git describe --tags --abbrev=0 HEAD 2>/dev/null | awk -v tag=0.0.4095 ' { tag=$1 } END{ print tag suffix }'|sed 's/\\./;/g' "
+ OUTPUT_VARIABLE mmp_version_str
+ ERROR_QUIET OUTPUT_STRIP_TRAILING_WHITESPACE
+)
+
+# extra indicator to show that the build was based on modified file(s) and not the true commit
+# (no hope of reproducing the exact library for debugging). Used only for the internal version
+# string.
+execute_process(
+ COMMAND bash -c "git diff --shortstat|awk -v fmt=%s -v r=-rotten '{ s=r } END { printf( fmt, s ) }'"
+ OUTPUT_VARIABLE spoiled_str
+)
+
+set( mmp_version ${mmp_version_str} )
+list( GET mmp_version 0 major_version )
+list( GET mmp_version 1 minor_version )
+list( GET mmp_version 2 patch_level )
+
+message( "+++ building ${major_version}.${minor_version}.${patch_level}${spoiled_str}" )
+
+# define constants used in the version string
add_definitions(
-DGIT_ID=${git_id}
-DMAJOR_VER=${major_version}
-DMINOR_VER=${minor_version}
- -DPATCH_VER=${patch_level}
+ -DPATCH_VER=${patch_level}${spoiled_str}
)
add_library( rmr_shared SHARED "$<TARGET_OBJECTS:nano_objects>;$<TARGET_OBJECTS:common_objects>" )
add_library( rmr_static STATIC "$<TARGET_OBJECTS:nano_objects>;$<TARGET_OBJECTS:common_objects>" )
-# both libraries to be named with librmr prefix
-set_target_properties( rmr_shared PROPERTIES OUTPUT_NAME "rmr" )
-set_target_properties( rmr_static PROPERTIES OUTPUT_NAME "rmr" )
+# both libraries to be named with librmr prefix and given version numbers with sym links
+set_target_properties( rmr_shared
+ PROPERTIES
+ OUTPUT_NAME "rmr"
+ SOVERSION ${major_version}
+ VERSION ${major_version}.${minor_version}.${patch_level} )
+
+set_target_properties( rmr_static
+ PROPERTIES
+ OUTPUT_NAME "rmr"
+ SOVERSION ${major_version}
+ VERSION ${major_version}.${minor_version}.${patch_level} )
# NNG based library (librmr_nng )
-# library is built by pulling objects from nng and common subdirs
+# library is built by pulling objects from nng and common subdirs.
#
add_library( rmr_nng_shared SHARED "$<TARGET_OBJECTS:nng_objects>;$<TARGET_OBJECTS:common_objects>" )
add_library( rmr_nng_static STATIC "$<TARGET_OBJECTS:nng_objects>;$<TARGET_OBJECTS:common_objects>" )
endif()
-# both libraries to be named with librmr_nng prefix
-#
-set_target_properties( rmr_nng_shared PROPERTIES OUTPUT_NAME "rmr_nng" )
-set_target_properties( rmr_nng_static PROPERTIES OUTPUT_NAME "rmr_nng" )
+# both libraries to be named with librmr_nng prefix and given version numbers with sym links
+set_target_properties( rmr_nng_shared
+ PROPERTIES
+ OUTPUT_NAME "rmr_nng"
+ SOVERSION ${major_version}
+ VERSION ${major_version}.${minor_version}.${patch_level} )
+
+set_target_properties( rmr_nng_static
+ PROPERTIES
+ OUTPUT_NAME "rmr_nng"
+ SOVERSION ${major_version}
+ VERSION ${major_version}.${minor_version}.${patch_level} )
#
if( APPLE )
# the moment, there are no header files specific to either nano or nng, so to the public
# header directive is moot, but needed if some day there is one.
#
+#install( TARGETS rmr_nng_shared;rmr_nng_static;rmr_shared;rmr_static;rmr_nng_shared_mm EXPORT LibraryConfig
install( TARGETS rmr_nng_shared;rmr_nng_static;rmr_shared;rmr_static EXPORT LibraryConfig
ARCHIVE DESTINATION ${install_lib}
LIBRARY DESTINATION ${install_lib}
set( CPACK_PACKAGE_VERSION_MAJOR "${major_version}" )
set( CPACK_PACKAGE_VERSION_MINOR "${minor_version}" )
set( CPACK_PACKAGE_VERSION_PATCH "${patch_level}" )
- set( CPACK_PACKAGE_FILE_NAME "${CMAKE_PROJECT_NAME}_${major_version}.${minor_version}.${CPACK_PACKAGE_VERSION_PATCH}" )
+ set( CPACK_PACKAGE_FILE_NAME "${CMAKE_PROJECT_NAME}_${major_version}.${minor_version}.${CPACK_PACKAGE_VERSION_PATCH}${spoiled_str}" )
set( CPACK_SOURCE_PACKAGE_FILE_NAME "vric${CMAKE_PROJECT_NAME}_${major_version}.${minor_version}.${CPACK_PACKAGE_VERSION_PATCH}" )
# we build and ship the libraries, so there is NO dependency
#define RMR_MAX_XID 32 // space in header reserved for user xaction id
#define RMR_MAX_SID 32 // spece in header reserved for sender id
#define RMR_MAX_MEID 32 // spece in header reserved for managed element id
-#define RMR_MAX_SRC 16 // max length of hostname
+#define RMR_MAX_SRC 64 // max length of hostname (which could be IPv6 addr with [...]:port so more than the 39 bytes of a plain addr
#define RMR_MAX_RCV_BYTES 4096 // max bytes we support in a receive message
// various flags for function calls
// --- uta compatability defs if needed user should define UTA_COMPAT ----------------------------------
#ifdef UTA_COMPAT
+#pragma message( "use of UTA_COMPAT is deprecated and soon to be removed" )
#define UTA_MAX_XID RMR_MAX_XID
#define UTA_MAX_SID RMR_MAX_SID
#define QUOTE_DEF(a) QUOTE(a) // allow a #define value to be quoted (e.g. QUOTE(MAJOR_VERSION) )
-#define RMR_MSG_VER 1 // potental to treat messages differently if from backlevel version
+#define RMR_MSG_VER 2 // message version this code was designed to handle
// environment variable names we'll suss out
#define ENV_BIND_IF "RMR_BIND_IF" // the interface to bind to for both normal comma and RTG (0.0.0.0 if missing)
#define DEF_RTG_PORT "tcp:4561" // default port that we accept rtg connections on
#define DEF_COMM_PORT "tcp:4560" // default port we use for normal communications
+// -- header length/offset macros which ensure network conversion ----
+#define RMR_HDR_LEN(h) (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3)) // convert from net byte order
+#define PAYLOAD_OFFSET(h) (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3))
+#define TRACE_OFFSET(h) (ntohl(((uta_mhdr_t *)h)->len0))
+#define DATA1_OFFSET(h) (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1))
+#define DATA2_OFFSET(h) (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl((uta_mhdr_t *)h)->len2)
+#define RMR_TR_LEN(h) (ntohl(((uta_mhdr_t *)h)->len1))
+#define RMR_D1_LEN(h) (ntohl(((uta_mhdr_t *)h)->len2))
+#define RMR_D2_LEN(h) (ntohl(((uta_mhdr_t *)h)->len3))
+
+#define SET_HDR_LEN(h) (((uta_mhdr_t *)h)->len0=htonl((int32_t)sizeof(uta_mhdr_t))) // convert to network byte order on insert
+#define SET_HDR_TR_LEN(h,l) (((uta_mhdr_t *)h)->len1=htonl((int32_t)l))
+#define SET_HDR_D1_LEN(h,l) (((uta_mhdr_t *)h)->len2=htonl((int32_t)l))
+#define SET_HDR_D2_LEN(h,l) (((uta_mhdr_t *)h)->len3=htonl((int32_t)l))
+
+
+#define V1_PAYLOAD_OFFSET(h) (sizeof(uta_v1mhdr_t))
+
+ // v2 header flags
+#define HFL_HAS_TRACE 0x01 // Trace data is populated
+#define HFL_SUBID 0x02 // subscription ID is populated
+
/*
Message header; interpreted by the other side, but never seen by
the user application.
DANGER: Add new fields AT THE END of the struct. Adding them any where else
will break any code that is currently running.
+
+ The transport layer buffer allocated will be divided this way:
+ | RMr header | Trace data | data1 | data2 | User paylaod |
+
+ Len 0 is the length of the RMr header
+ Len 1 is the length of the trace data
+ Len 2 and 3 are lengths of data1 and data2 and are unused at the moment
+
+ To point at the payload, we take the address of the header and add all 4 lengths.
*/
typedef struct {
int32_t mtype; // message type ("long" network integer)
int32_t rmr_ver; // our internal message version number
unsigned char xid[RMR_MAX_XID]; // space for user transaction id or somesuch
unsigned char sid[RMR_MAX_SID]; // sender ID for return to sender needs
- unsigned char src[RMR_MAX_SRC]; // name of the sender (source)
+ unsigned char src[RMR_MAX_SRC]; // name:port of the sender (source)
unsigned char meid[RMR_MAX_MEID]; // managed element id.
struct timespec ts; // timestamp ???
+
+ // V2 extension
+ int32_t flags; // HFL_* constants
+ int32_t len0; // length of the RMr header data
+ int32_t len1; // length of the tracing data
+ int32_t len2; // length of data 1 (d1)
+ int32_t len3; // length of data 2 (d2)
+
} uta_mhdr_t;
+
+typedef struct { // old (inflexible) v1 header
+ int32_t mtype; // message type ("long" network integer)
+ int32_t plen; // payload length
+ int32_t rmr_ver; // our internal message version number
+ unsigned char xid[RMR_MAX_XID]; // space for user transaction id or somesuch
+ unsigned char sid[RMR_MAX_SID]; // sender ID for return to sender needs
+ unsigned char src[16]; // name of the sender (source) (old size was 16)
+ unsigned char meid[RMR_MAX_MEID]; // managed element id.
+ struct timespec ts; // timestamp ???
+} uta_v1mhdr_t;
+
/*
Round robin group.
*/
int naddrs; // num actually used
} if_addrs_t;
-
-
// --------------- ring things -------------------------------------------------
typedef struct ring {
uint16_t head; // index of the head of the ring (insert point)
int flags; // CTXFL_ constants
int nrtele; // number of elements in the routing table
int nn_sock; // our general listen socket
+ int trace_data_len; // len of tracing data that sits just past header (0 means none)
+ int d1_len; // lengths for additional post header, pre payload data areas
+ int d2_len;
route_table_t* rtable; // the active route table
route_table_t* old_rtable; // the previously used rt, sits here to allow for draining
route_table_t* new_rtable; // route table under construction
}
errno = 0;
- return msg->alloc_len - sizeof( uta_mhdr_t ); // figure size should we not have a msg buffer
+ return msg->alloc_len - RMR_HDR_LEN( msg->header ); // transport buffer less header and other data bits
}
/*
*/
static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state ) {
int mlen;
+ uta_mhdr_t* hdr;
mlen = sizeof( uta_mhdr_t ); // figure size should we not have a msg buffer
mlen += (size > 0 ? size : ctx->max_plen); // add user requested size or size set during init
exit( 1 );
}
- ((uta_mhdr_t *) msg->header)->rmr_ver = RMR_MSG_VER; // version info should we need to recognised old style messages someday
+ hdr = (uta_mhdr_t *) msg->header;
+ hdr->rmr_ver = htonl( RMR_MSG_VER ); // current version
+ SET_HDR_LEN( hdr );
+ SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
+ //SET_HDR_D1_LEN( hdr, ctx->d1_len ); // moot until we actually need these data areas
+ //SET_HDR_D2_LEN( hdr, ctx->d1_len );
+
msg->len = 0; // length of data in the payload
msg->alloc_len = mlen; // length of allocated payload
- msg->payload = msg->header + sizeof( uta_mhdr_t ); // point past header to payload (single buffer allocation above)
+ msg->payload = msg->header + PAYLOAD_OFFSET( hdr ); // point at the payload in transport
msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area
msg->state = state; // fill in caller's state (likely the state of the last operation)
msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
exit( 1 );
}
+ memcpy( nm->header, old_msg->header, RMR_HDR_LEN( old_msg->header ) ); // copy complete header, trace and other data
+
nm->mtype = old_msg->mtype;
nm->len = old_msg->len; // length of data in the payload
nm->alloc_len = mlen; // length of allocated payload
- nm->payload = nm->header + sizeof( uta_mhdr_t ); // point past header to payload (single buffer allocation above)
+ nm->payload = nm->header + PAYLOAD_OFFSET( nm->header );
nm->xaction = ((uta_mhdr_t *)nm->header)->xid; // point at transaction id in header area
nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
nm->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
- memcpy( ((uta_mhdr_t *)nm->header)->src, ((uta_mhdr_t *)old_msg->header)->src, RMR_MAX_SID );
memcpy( nm->payload, old_msg->payload, old_msg->len );
return nm;
if( msg->state > (int) sizeof( uta_mhdr_t ) ) { // we need more than just a header here
hdr = (uta_mhdr_t *) msg->header;
msg->len = ntohl( hdr->plen ); // length of data in the payload (likely < payload size)
- if( msg->len > msg->state - sizeof( uta_mhdr_t ) ) {
- fprintf( stderr, "[WARN] rmr_rcv indicated payload length < rcvd payload: expected %d got %ld\n",
- msg->len, msg->state - sizeof( uta_mhdr_t ) );
+ if( msg->len > msg->state - RMR_HDR_LEN( hdr ) ) {
+ msg->state = RMR_ERR_TRUNC;
+ msg->len = msg->state - RMR_HDR_LEN( hdr );
}
msg->mtype = ntohl( hdr->mtype ); // capture and convert from network order to local order
msg->state = RMR_OK;
msg->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src
- msg->payload = msg->header + sizeof( uta_mhdr_t );
+ msg->payload = msg->header + PAYLOAD_OFFSET( msg->header );
msg->xaction = &hdr->xid[0]; // provide user with ref to fixed space xaction id
if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n",
msg->mtype, msg->state, msg->len, msg->payload - (unsigned char *) msg->header );
int flags; // CTXFL_ constants
int nrtele; // number of elements in the routing table
int send_retries; // number of retries send_msg() should attempt if eagain/timeout indicated by nng
+ int trace_data_len; // number of bytes to allocate in header for trace data
+ int d1_len; // extra header data 1 length (future)
+ int d2_len; // extra header data 2 length (future)
nng_socket nn_sock; // our general listen socket
route_table_t* rtable; // the active route table
route_table_t* old_rtable; // the previously used rt, sits here to allow for draining
}
errno = 0;
- return msg->alloc_len - sizeof( uta_mhdr_t ); // figure size should we not have a msg buffer
+ return msg->alloc_len - RMR_HDR_LEN( msg->header ); // allocated transport size less the header and other data bits
}
/*
int state;
if( ! announced ) {
- fprintf( stderr, "[INFO] ric message routing library on NNG (%s %s.%s.%s built: %s)\n",
- QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
+ fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d (%s %s.%s.%s built: %s)\n",
+ RMR_MSG_VER, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
announced = 1;
}
ctx->send_retries = 1; // default is not to sleep at all; RMr will retry about 10K times before returning
ctx->mring = uta_mk_ring( 128 ); // message ring to hold asynch msgs received while waiting for call response
- ctx->max_plen = RMR_MAX_RCV_BYTES + sizeof( uta_mhdr_t ); // default max buffer size
+ ctx->max_plen = RMR_MAX_RCV_BYTES; // max user payload lengh
if( max_msg_size > 0 ) {
- if( max_msg_size <= ctx->max_plen ) { // user defined len can be smaller
- ctx->max_plen = max_msg_size;
- } else {
- fprintf( stderr, "[WRN] rmr_init: attempt to set max payload len > than allowed maximum; capped at %d bytes\n", ctx->max_plen );
- }
+ ctx->max_plen = max_msg_size;
}
- ctx->max_mlen = ctx->max_plen + sizeof( uta_mhdr_t );
-
// we're using a listener to get rtg updates, so we do NOT need this.
//uta_lookup_rtg( ctx ); // attempt to fill in rtg info; rtc will handle missing values/errors
}
if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
- fprintf( stderr, ">>> cannot get recv fd: %s\n", nng_strerror( state ) );
+ fprintf( stderr, "[WRN] rmr cannot get recv fd: %s\n", nng_strerror( state ) );
return -1;
}
nng messages.
*/
static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state ) {
- size_t mlen;
+ size_t mlen; // size of the transport buffer that we'll allocate
uta_mhdr_t* hdr; // convenience pointer
- mlen = sizeof( uta_mhdr_t ); // figure size should we not have a msg buffer
+ mlen = sizeof( uta_mhdr_t ) + ctx->trace_data_len + ctx->d1_len + ctx->d2_len; // start with header and trace/data lengths
mlen += (size > 0 ? size : ctx->max_plen); // add user requested size or size set during init
if( msg == NULL ) {
}
msg->header = nng_msg_body( msg->tp_buf );
+ memset( msg->header, 0, sizeof( uta_mhdr_t ) ); // ensure no junk in the header area
if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
- hdr->rmr_ver = RMR_MSG_VER; // version info should we need to recognised old style messages someday
+ hdr->rmr_ver = htonl( RMR_MSG_VER ); // set current version
+ SET_HDR_LEN( hdr ); // ensure these are converted to net byte order
+ SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
+ //SET_HDR_D1_LEN( hdr, ctx->d1_len ); // no need until we start using them
+ //SET_HDR_D2_LEN( hdr, ctx->d2_len );
}
msg->len = 0; // length of data in the payload
- msg->alloc_len = mlen; // length of allocated payload
- msg->payload = msg->header + sizeof( uta_mhdr_t ); // point past header to payload (single buffer allocation above)
+ msg->alloc_len = mlen; // length of allocated transport buffer
+ msg->payload = msg->header + PAYLOAD_OFFSET( hdr ); // past header, trace and other data bits
msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area
msg->state = state; // fill in caller's state (likely the state of the last operation)
msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
The alen parm is the assumed allocated length; assumed because it's a value likely
to have come from nng receive and the actual alloc len might be larger, but we
can only assume this is the total usable space.
+
+ This function returns the message with an error state set if it detects that the
+ received message might have been truncated. Check is done here as the calculation
+ is somewhat based on header version.
*/
static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen ) {
- uta_mhdr_t* hdr;
+ uta_mhdr_t* hdr; // current header
+ uta_v1mhdr_t* v1hdr; // version 1 header
+ int ver;
+ int hlen; // header len to use for a truncation check
msg->header = nng_msg_body( msg->tp_buf ); // header is the start of the transport buffer
+ v1hdr = (uta_v1mhdr_t *) msg->header; // v1 will always allow us to suss out the version
- hdr = (uta_mhdr_t *) msg->header;
- hdr->rmr_ver = RMR_MSG_VER; // version info should we need to recognised old style messages someday
- msg->len = ntohl( hdr->plen ); // length sender says is in the payload (received length could be larger)
- msg->alloc_len = alen; // length of whole tp buffer (including header)
- msg->payload = msg->header + sizeof( uta_mhdr_t ); // point past header to payload (single buffer allocation above)
- msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area
- msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
- msg->mtype = ntohl( hdr->mtype ); // capture and convert from network order to local order
- msg->state = RMR_OK;
+ if( v1hdr->rmr_ver == 1 ) { // bug in verion 1 didn't encode the version in network byte order
+ ver = 1;
+ v1hdr->rmr_ver = htonl( 1 ); // save it correctly in case we clone the message
+ } else {
+ ver = ntohl( v1hdr->rmr_ver );
+ }
+
+ switch( ver ) {
+ case 1:
+ msg->len = ntohl( v1hdr->plen ); // length sender says is in the payload (received length could be larger)
+ msg->alloc_len = alen; // length of whole tp buffer (including header, trace and data bits)
+ msg->payload = msg->header + sizeof( uta_v1mhdr_t ); // point past header to payload (single buffer allocation above)
+
+ msg->xaction = &v1hdr->xid[0]; // point at transaction id in header area
+ msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
+ msg->mtype = ntohl( v1hdr->mtype ); // capture and convert from network order to local order
+ msg->state = RMR_OK;
+ hlen = sizeof( uta_v1mhdr_t );
+ break;
+
+ default: // current version always lands here
+ hdr = (uta_mhdr_t *) msg->header;
+ msg->len = ntohl( hdr->plen ); // length sender says is in the payload (received length could be larger)
+ msg->alloc_len = alen; // length of whole tp buffer (including header, trace and data bits)
+
+ msg->payload = msg->header + PAYLOAD_OFFSET( hdr ); // past header, trace and other data bits
+ msg->xaction = &hdr->xid[0]; // point at transaction id in header area
+ msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
+ msg->mtype = ntohl( hdr->mtype ); // capture and convert from network order to local order
+ hlen = RMR_HDR_LEN( hdr ); // len to use for truncated check later
+ break;
+ }
+
+ if( msg->len > (msg->alloc_len - hlen ) ) { // more than we should have had room for; error
+ msg->state = RMR_ERR_TRUNC;
+ msg->len = msg->alloc_len - hlen; // adjust len down so user app doesn't overrun
+ } else {
+ msg->state = RMR_OK;
+ }
}
/*
rmr_mbuf_t* nm; // new message buffer
size_t mlen;
int state;
+ uta_mhdr_t* hdr;
+ uta_v1mhdr_t* v1hdr;
nm = (rmr_mbuf_t *) malloc( sizeof *nm );
if( nm == NULL ) {
exit( 1 );
}
- nm->header = nng_msg_body( nm->tp_buf );
+ nm->header = nng_msg_body( nm->tp_buf ); // set and copy the header from old message
+ v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version
+ switch( ntohl( v1hdr->rmr_ver ) ) {
+ case 1:
+ memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header
+ nm->payload = (void *) v1hdr + sizeof( *v1hdr );
+ break;
+
+ default: // current message always caught here
+ hdr = nm->header;
+ memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) ); // copy complete header, trace and other data
+ nm->payload = nm->header + PAYLOAD_OFFSET( hdr ); // point at the payload
+ break;
+ }
+
+ // --- these are all version agnostic -----------------------------------
nm->mtype = old_msg->mtype;
nm->len = old_msg->len; // length of data in the payload
nm->alloc_len = mlen; // length of allocated payload
- nm->payload = nm->header + sizeof( uta_mhdr_t ); // point past header to payload (single buffer allocation above)
- nm->xaction = ((uta_mhdr_t *)nm->header)->xid; // point at transaction id in header area
+
+ nm->xaction = hdr->xid; // reference xaction
nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message
-
- memcpy( ((uta_mhdr_t *)nm->header)->src, ((uta_mhdr_t *)old_msg->header)->src, RMR_MAX_SID );
memcpy( nm->payload, old_msg->payload, old_msg->len );
return nm;
msg->tp_buf = NULL;
} else {
- //msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK ); // will abort on failure, no need to check
msg = alloc_mbuf( ctx, RMR_OK ); // msg without a transport buffer
}
+ msg->alloc_len = 0;
msg->len = 0;
msg->payload = NULL;
msg->xaction = NULL;
- //rsize = msg->alloc_len; // set to max, and we'll get len back here too
- //msg->state = nng_recv( ctx->nn_sock, msg->header, &rsize, NO_FLAGS ); // total space (header + payload len) allocated
msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS ); // blocks hard until received
if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
return msg;
}
rsize = nng_msg_len( msg->tp_buf );
- if( rsize >= sizeof( uta_mhdr_t ) ) { // we need at least a full header here
-
- ref_tpbuf( msg, rsize ); // point payload, header etc to the just received tp buffer
+ if( rsize >= sizeof( uta_v1mhdr_t ) ) { // we need at least a full type 1 (smallest) header here
+ ref_tpbuf( msg, rsize ); // point payload, header etc to the data and set trunc error if needed
hdr = (uta_mhdr_t *) msg->header;
- msg->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src
- if( msg->len > (msg->alloc_len - sizeof( uta_mhdr_t )) ) { // way more than we should have had room for; error
- msg->state = RMR_ERR_TRUNC;
- }
+ msg->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src
+
if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n",
msg->mtype, msg->state, msg->len, msg->payload - (unsigned char *) msg->header );
} else {
- msg->len = 0;
msg->state = RMR_ERR_EMPTY;
+ msg->len = 0;
+ msg->alloc_len = rsize;
+ msg->payload = NULL;
+ msg->xaction = NULL;
+ msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
+ msg->mtype = -1;
}
return msg;
errno = 0;
msg->state = RMR_OK;
if( msg->flags & MFL_ZEROCOPY ) { // faster sending with zcopy buffer
- //nng_flags |= NNG_FLAG_ALLOC; // indicate a zc buffer that nng is expected to free
-
do {
if( (state = nng_sendmsg( nn_sock, (nng_msg *) msg->tp_buf, nng_flags )) != 0 ) { // must check and retry some if transient failure
msg->state = state;
}
} while( state && retries > 0 );
} else {
+ // future: this should not happen as all buffers we deal with are zc buffers; might make sense to remove the test and else
msg->state = RMR_ERR_SENDFAILED;
errno = ENOTSUP;
return msg;
msg->state = RMR_ERR_RETRY; // errno will have nano reason
} else {
msg->state = xlate_nng_state( msg->state, RMR_ERR_SENDFAILED ); // xlate to our state and set errno
- //errno = -msg->state;
- //msg->state = RMR_ERR_SENDFAILED; // errno will have nano reason
}
if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );