feat(API): Add trace data functions 64/64/2
authorE. Scott Daniels <daniels@research.att.com>
Thu, 18 Apr 2019 14:01:16 +0000 (14:01 +0000)
committerE. Scott Daniels <daniels@research.att.com>
Fri, 19 Apr 2019 18:31:04 +0000 (18:31 +0000)
The trace data in the RMr header can now be set and
retrieved by the user programme with the new functions.

Change-Id: Ie00381d3671f906c703ca7d9048cf4a1a6d6194d
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
 Finish trace functions for nng

Change-Id: Ie7dda5d13d0d53e57347655cbb27d0fc13173d28
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
 Switch to address macro from offset

Change-Id: I560d696a7e5e743437b14d2737a3dde8d12c2aa9
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Set version manually

The CI process _should_ set a tag when the code is merged
and we will pull that tag and use it as the maj.min.patch
version.  However, the CI process is not doing this, so
we are forced to mantain the version number in the CMake
file for now.  This commit sets that version to 1.0.17.

Change-Id: I577efbd64bc0711244a1dbc1ae27eb9581b6d7d6
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
 Add trace functions for nanomsg

Change-Id: If640b7ee8a4996d8c324bbdd2cb4a85f68a3cc73
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
19 files changed:
CMakeLists.txt
src/common/include/rmr.h
src/common/include/rmr_agnostic.h
src/common/src/mbuf_api.c
src/nanomsg/include/rmr_private.h
src/nanomsg/src/rmr.c
src/nanomsg/src/sr_static.c
src/nng/include/rmr_nng_private.h
src/nng/src/rmr_nng.c
src/nng/src/sr_nng_static.c
test/mbuf_api_static_test.c
test/mbuf_api_test.c
test/rmr_nng_api_static_test.c
test/rmr_nng_test.c
test/rt_static_test.c
test/sr_nng_static_test.c
test/symtab_test.c
test/test_support.c
test/unit_test.ksh

index c34f17a..6de24c5 100644 (file)
@@ -22,9 +22,9 @@ project( rmr LANGUAGES C )
 cmake_minimum_required( VERSION 3.5 )
 
 
-set( major_version "0" )               # automatically populated from git tag later
+set( major_version "1" )               # should be automatically populated from git tag later, but until CI process sets a tag we use this
 set( minor_version "0" )
-set( patch_level "0" )
+set( patch_level "17" )
 
 set( install_root "${CMAKE_INSTALL_PREFIX}" )
 set( install_lib "lib" )
@@ -66,10 +66,11 @@ execute_process(
        OUTPUT_VARIABLE spoiled_str
 )
 
-set( mmp_version ${mmp_version_str} )
-list( GET mmp_version 0 major_version )
-list( GET mmp_version 1 minor_version )
-list( GET mmp_version 2 patch_level )
+# uncomment these lines once CI starts adding a tag on merge
+#set( mmp_version ${mmp_version_str} )
+#list( GET mmp_version 0 major_version )
+#list( GET mmp_version 1 minor_version )
+#list( GET mmp_version 2 patch_level )
 
 message( "+++ building ${major_version}.${minor_version}.${patch_level}${spoiled_str}" )
 
index 9ef563f..15c8400 100644 (file)
@@ -63,6 +63,7 @@ extern "C" {
 #define RMR_ERR_TIMEOUT                12              // message processing call timed out
 #define RMR_ERR_UNSET          13              // the message hasn't been populated with a transport buffer
 #define        RMR_ERR_TRUNC           14              // received message likely truncated
+#define RMR_ERR_INITFAILED     15              // initialisation of something (probably message) failed
 
 #define RMR_WH_CONNECTED(a) (a>=0)     // for now whid is integer; it could be pointer at some future date
 
@@ -96,6 +97,7 @@ extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size );
 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg );
 extern void rmr_close( void* vctx );
 extern void* rmr_init( char* proto_port, int max_msg_size, int flags );
+extern int rmr_init_trace( void* vctx, int size );
 extern int rmr_payload_size( rmr_mbuf_t* msg );
 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg );
 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to );
@@ -107,6 +109,7 @@ extern int rmr_set_rtimeout( void* vctx, int time );
 extern int rmr_set_stimeout( void* vctx, int time );
 extern int rmr_get_rcvfd( void* vctx );                                                                // only supported with nng
 extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to );
+extern rmr_mbuf_t*  rmr_tralloc_msg( void* context, int msize, int trsize, unsigned const char* data );
 extern rmr_whid_t rmr_wh_open( void* vctx, char const* target );
 extern rmr_mbuf_t* rmr_wh_send_msg( void* vctx, rmr_whid_t whid, rmr_mbuf_t* msg );
 extern void rmr_wh_close( void* vctx, int whid );
@@ -118,11 +121,15 @@ extern void rmr_bytes2payload( rmr_mbuf_t* mbuf, unsigned char const* src, int l
 extern int rmr_bytes2xact( rmr_mbuf_t* mbuf, unsigned char const* src, int len );
 extern void rmr_free_msg( rmr_mbuf_t* mbuf );
 extern unsigned char*  rmr_get_meid( rmr_mbuf_t* mbuf, unsigned char* dest );
+extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* mbuf, int new_tr_size );
 extern int rmr_str2meid( rmr_mbuf_t* mbuf, unsigned char const* str );
 extern void rmr_str2payload( rmr_mbuf_t* mbuf, unsigned char const* str );
 extern void rmr_str2payload( rmr_mbuf_t* mbuf, unsigned char const* str );
 extern int rmr_str2xact( rmr_mbuf_t* mbuf, unsigned char const* str );
 
+extern int rmr_get_trlen( rmr_mbuf_t* msg );
+extern int rmr_get_trace( rmr_mbuf_t* msg, unsigned char* dest, int size );
+extern int rmr_set_trace( rmr_mbuf_t* msg, unsigned const char* data, int size );
 
 extern int rmr_rcv_to( void* vctx, int time );         // DEPRECATED -- replaced with set_rtimeout
 extern int rmr_send_to( void* vctx, int time );                // DEPRECATED -- replaced with set_stimeout
index fc45429..c28ccb9 100644 (file)
@@ -72,17 +72,24 @@ typedef struct uta_ctx  uta_ctx_t;
 //#define DEF_RTG_MSGID        ""                              // default to pick up all messages from rtg
 #define DEF_RTG_PORT   "tcp:4561"              // default port that we accept rtg connections on
 #define DEF_COMM_PORT  "tcp:4560"              // default port we use for normal communications
+#define DEF_TR_LEN             -1                              // use default trace data len from context
 
-// -- header length/offset macros which ensure network conversion ----
-#define RMR_HDR_LEN(h)         (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3))   // convert from net byte order
-#define PAYLOAD_OFFSET(h)      (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3))
-#define TRACE_OFFSET(h)                (ntohl(((uta_mhdr_t *)h)->len0))
-#define DATA1_OFFSET(h)                (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1))
-#define DATA2_OFFSET(h)                (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl((uta_mhdr_t *)h)->len2)
+// -- header length/offset macros must ensure network conversion ----
+#define RMR_HDR_LEN(h)         (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3)) // ALL things, not just formal struct
 #define RMR_TR_LEN(h)          (ntohl(((uta_mhdr_t *)h)->len1))
 #define RMR_D1_LEN(h)          (ntohl(((uta_mhdr_t *)h)->len2))
 #define RMR_D2_LEN(h)          (ntohl(((uta_mhdr_t *)h)->len3))
 
+#define TRACE_OFFSET(h)                ((ntohl(((uta_mhdr_t *)h)->len0)))
+#define DATA1_OFFSET(h)                (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1))
+#define DATA2_OFFSET(h)                (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2))
+#define PAYLOAD_OFFSET(h)      (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3))
+
+#define TRACE_ADDR(h)          (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0))
+#define DATA1_ADDR(h)          (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1))
+#define DATA2_ADDR(h)          (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2))
+#define PAYLOAD_ADDR(h)                (((void *)h)+ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3))
+
 #define SET_HDR_LEN(h)         (((uta_mhdr_t *)h)->len0=htonl((int32_t)sizeof(uta_mhdr_t)))            // convert to network byte order on insert
 #define SET_HDR_TR_LEN(h,l)    (((uta_mhdr_t *)h)->len1=htonl((int32_t)l))
 #define SET_HDR_D1_LEN(h,l)    (((uta_mhdr_t *)h)->len2=htonl((int32_t)l))
@@ -189,6 +196,7 @@ typedef struct {
        int             naddrs;                 // num actually used
 } if_addrs_t;
 
+
 // --------------- ring things  -------------------------------------------------
 typedef struct ring {
        uint16_t head;                          // index of the head of the ring (insert point)
@@ -230,6 +238,7 @@ static rtable_ent_t* uta_add_rte( route_table_t* rt, int mtype, int nrrgroups );
 static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name );
 static void read_static_rt( uta_ctx_t* ctx, int vlevel );
 static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel );
+static rmr_mbuf_t* realloc_msg( rmr_mbuf_t* msg, int size );
 static void* rtc( void* vctx );
 static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name );
 
index 930d74d..ef0c28f 100644 (file)
@@ -33,6 +33,7 @@
 #include <errno.h>
 #include <string.h>
 #include <unistd.h>
+#include <stdio.h>
 
 #include "rmr.h"                               // things the users see
 #include "rmr_agnostic.h"              // agnostic things (must be included before private)
@@ -215,3 +216,103 @@ extern unsigned char*  rmr_get_meid( rmr_mbuf_t* mbuf, unsigned char* dest ) {
 
        return dest;
 }
+
+// ------------------- trace related access functions --------------------------------------
+/*
+       The set_trace function will copy the supplied data for size bytes into the 
+       header.  If the header trace area is not large enough, a new one will be allocated
+       which will cause a payload copy based on the msg->len value (if 0 no payload
+       data is copied).
+
+       The return value is the number of bytes actually coppied. If 0 bytes are coppied
+       errno should indicate the reason. If 0 is returned and errno is 0, then size
+       passed was 0.  The state in the message is left UNCHANGED.
+*/
+extern int rmr_set_trace( rmr_mbuf_t* msg, unsigned const char* data, int size ) {
+       uta_mhdr_t*     hdr;
+       rmr_mbuf_t* nm;                 // new message if larger is needed
+       int             len;
+       void*   old_tp_buf;             // if we need to realloc, must hold old to free
+       void*   old_hdr;
+
+       if( msg == NULL ) {
+               errno = EINVAL;
+               return 0;
+       }
+
+       errno = 0;
+       if( size <= 0 ) {
+               return 0;
+       }
+       
+       hdr = (uta_mhdr_t *) msg->header;
+       len = RMR_TR_LEN( hdr );
+
+       if( len != size ) {                                                     // different sized trace data, must realloc the buffer
+               nm = rmr_realloc_msg( msg, size );              // realloc with changed trace size
+               old_tp_buf = msg->tp_buf;
+               old_hdr = msg->header;
+
+               msg->tp_buf = nm->tp_buf;                               // reference the reallocated buffer
+               msg->header = nm->header;
+               msg->id = NULL;                                                 // currently unused
+               msg->xaction = nm->xaction;
+               msg->payload = nm->payload;
+
+               nm->tp_buf = old_tp_buf;                                // set to free
+               nm->header = old_hdr;                                   // nano frees on hdr, so must set both
+               rmr_free_msg( nm );
+
+               hdr = (uta_mhdr_t *) msg->header;               // header WILL be different
+               len = RMR_TR_LEN( hdr );
+       }
+
+       memcpy( TRACE_ADDR( hdr ), data, size );
+       
+       return size;
+}
+
+
+/*
+       Copies the trace bytes from the message header into the buffer provided by 
+       the user. If the trace data in the header is less than size, then only
+       that number of bytes are copied, else exactly size bytes are copied. The
+       number actually copied is returned.
+*/
+extern int rmr_get_trace( rmr_mbuf_t* msg, unsigned char* dest, int size ) {
+       uta_mhdr_t*     hdr = NULL;
+       int n2copy = 0;
+
+       if( msg == NULL ) {
+               return 0;
+       }
+
+       if( size <= 0 || dest == NULL ) {
+               return 0;
+       }
+
+       hdr = msg->header;
+       if( (n2copy = size < RMR_TR_LEN( hdr ) ? size : RMR_TR_LEN( hdr )) <= 0  ) {
+               return 0;
+       }
+
+       memcpy( dest, TRACE_ADDR( hdr ), n2copy );
+
+       return n2copy;
+}
+
+/*
+       Returns the number of bytes currently allocated for trace data in the message
+       buffer.
+*/
+extern int rmr_get_trlen( rmr_mbuf_t* msg ) {
+       uta_mhdr_t*     hdr;
+
+       if( msg == NULL ) {
+               return 0;
+       }
+
+       hdr = msg->header;
+
+       return RMR_TR_LEN( hdr );
+}
index 9c00675..f70d576 100644 (file)
@@ -99,7 +99,7 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name );
 static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more );
 
 // ------ msg ------------------------------------------------
-static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state );
+static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int tr_size );
 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  );
 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg );
 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg );
index 2900117..1ed2cbc 100644 (file)
@@ -169,10 +169,44 @@ extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
                return NULL;
        }
 
-       m = alloc_zcmsg( ctx, NULL, size, 0 );
+       m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );
        return  m;
 }
 
+/*
+       Allocates a send message as a zerocopy message allowing the underlying message protocol
+       to send the buffer without copy. In addition, a trace data field of tr_size will be
+       added and the supplied data coppied to the buffer before returning the message to
+       the caller.
+*/
+extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
+       uta_ctx_t*      ctx;
+       rmr_mbuf_t*     m;
+       int state;
+
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               return NULL;
+       }
+
+       m = alloc_zcmsg( ctx, NULL, size, 0, tr_size );                         // alloc with specific tr size
+       if( m != NULL ) {
+               state = rmr_set_trace( m, data, tr_size );                              // roll their data in
+               if( state != tr_size ) {
+                       m->state = RMR_ERR_INITFAILED;
+               }
+       }
+
+       return  m;
+}
+
+/*
+       Need an external path to the realloc static function as it's called by an
+       outward facing mbuf api function.
+*/
+extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
+       return realloc_msg( msg, new_tr_size );
+}
+
 /*
        Return the message to the available pool, or free it outright.
 */
index 44e00ea..0f59da2 100644 (file)
        a new message struct as well. Size is the size of the zc buffer to allocate (not
        including our header). If size is 0, then the buffer allocated is the size previously
        allocated (if msg is !nil) or the default size given at initialisation).
+
+
+       The trlo parm is the trace length override which will be used if not 0. If 0, then the
+       length in the context is used (default).
 */
-static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state ) {
+static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
        int     mlen;
        uta_mhdr_t*     hdr;
+       int tr_len;                             // length to allocate for trace info
 
-       mlen = sizeof( uta_mhdr_t );                                            // figure size should we not have a msg buffer
-       mlen += (size > 0 ? size  : ctx->max_plen);                     // add user requested size or size set during init
+       tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
+
+       mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len;       // start with header and trace/data lengths
+       mlen += (size > 0 ? size  : ctx->max_plen);                                                     // add user requested size or size set during init
 
        if( msg == NULL ) {
                msg = (rmr_mbuf_t *) malloc( sizeof *msg );
@@ -94,14 +101,14 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
        hdr = (uta_mhdr_t *) msg->header;
        hdr->rmr_ver = htonl( RMR_MSG_VER );                                                            // current version
        SET_HDR_LEN( hdr );
-       SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
+       SET_HDR_TR_LEN( hdr, tr_len );                                                  // set the actual length used
        //SET_HDR_D1_LEN( hdr, ctx->d1_len );                                   // moot until we actually need these data areas
        //SET_HDR_D2_LEN( hdr, ctx->d1_len );
 
        msg->len = 0;                                                                                   // length of data in the payload
        msg->alloc_len = mlen;                                                                  // length of allocated payload
-       msg->payload = msg->header + PAYLOAD_OFFSET( hdr );             // point at the payload in transport
-       msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                                                // point at transaction id in header area
+       msg->payload = PAYLOAD_ADDR( hdr );                                             // point at the payload in transport
+       msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
        msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
        msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
        strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );
@@ -140,7 +147,7 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
        nm->mtype = old_msg->mtype;
        nm->len = old_msg->len;                                                                 // length of data in the payload
        nm->alloc_len = mlen;                                                                   // length of allocated payload
-       nm->payload = nm->header + PAYLOAD_OFFSET( nm->header );
+       nm->payload = PAYLOAD_ADDR( nm->header );                               // reference the payload
        nm->xaction = ((uta_mhdr_t *)nm->header)->xid;                  // point at transaction id in header area
        nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
        nm->flags |= MFL_ZEROCOPY;                                                              // this is a zerocopy sendable message
@@ -149,6 +156,69 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
        return nm;
 }
 
+static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
+       rmr_mbuf_t* nm;                 // new message buffer
+       size_t  mlen;
+       int state;
+       uta_mhdr_t* hdr;
+       uta_v1mhdr_t* v1hdr;
+       int     tr_old_len;                     // tr size in new buffer
+
+
+       nm = (rmr_mbuf_t *) malloc( sizeof *nm );
+       if( nm == NULL ) {
+               fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
+               exit( 1 );
+       }
+       memset( nm, 0, sizeof( *nm ) );
+
+       hdr = old_msg->header;
+       tr_old_len = RMR_TR_LEN( hdr );                         // bytes in old header for trace
+
+       mlen = old_msg->alloc_len + (tr_len - tr_old_len);                                                      // new length with trace adjustment
+       if( DEBUG ) fprintf( stderr, "tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
+       if( (nm->header = (uta_mhdr_t *) nn_allocmsg( mlen, 0 )) == NULL ) {                            // this will be released on send, so DO NOT free
+               fprintf( stderr, "[CRIT] rmr_realloc: cannot get memory for zero copy buffer: %d\n", errno );
+               exit( 1 );
+       }
+
+       nm->tp_buf = nm->header;                                                                // in nano both are the same
+       v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
+       switch( ntohl( v1hdr->rmr_ver ) ) {
+               case 1:
+                       memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
+                       nm->payload = (void *) v1hdr + sizeof( *v1hdr );
+                       break;
+
+               default:                                                                                        // current message always caught  here
+                       hdr = nm->header;
+                       memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) );           // ONLY copy the header portion; trace and data offsets might have changed
+                       if( RMR_D1_LEN( hdr )  ) {
+                               memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header ), RMR_D1_LEN( hdr ) );          // copy data1 and data2 if necessary
+                       
+                       }
+                       if( RMR_D2_LEN( hdr )  ) {
+                               memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header ), RMR_D2_LEN( hdr ) );          // copy data1 and data2 if necessary
+                       }
+
+                       SET_HDR_TR_LEN( hdr, tr_len );                                                                          // len MUST be set before pointing payload
+                       nm->payload = PAYLOAD_ADDR( hdr );                                                                      // reference user payload
+                       break;
+       }
+               
+       // --- these are all version agnostic -----------------------------------
+       nm->mtype = old_msg->mtype;
+       nm->len = old_msg->len;                                                                 // length of data in the payload
+       nm->alloc_len = mlen;                                                                   // length of allocated payload
+
+       nm->xaction = hdr->xid;                                                                 // reference xaction
+       nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
+       nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
+       memcpy( nm->payload, old_msg->payload, old_msg->len );
+
+       return nm;
+}
+
 /*
        This is the receive work horse used by the outer layer receive functions.
        It waits for a message to be received on our listen socket. If old msg
@@ -167,7 +237,7 @@ static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
        if( old_msg ) {
                msg = old_msg;
        } else {
-               msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK );                      // will abort on failure, no need to check
+               msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
        }
 
        msg->state = nn_recv( ctx->nn_sock, msg->header, msg->alloc_len, NO_FLAGS );            // total space (header + payload len) allocated
@@ -181,7 +251,7 @@ static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
                msg->mtype = ntohl( hdr->mtype );                                                               // capture and convert from network order to local order
                msg->state = RMR_OK;
                msg->flags |= MFL_ADDSRC;                                                                               // turn on so if user app tries to send this buffer we reset src
-               msg->payload = msg->header + PAYLOAD_OFFSET( msg->header );
+               msg->payload = PAYLOAD_ADDR( msg->header );
                msg->xaction = &hdr->xid[0];                                                    // provide user with ref to fixed space xaction id
                if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n", 
                                msg->mtype, msg->state, msg->len,  msg->payload - (unsigned char *) msg->header );
@@ -208,7 +278,7 @@ static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
        if( old_msg ) {
                msg = old_msg;
        } else {
-               msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK );                      // will abort on failure, no need to check
+               msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
        }
 
        msg->state = nn_recv( ctx->nn_sock, msg->header, msg->alloc_len, NO_FLAGS );            // read and state will be length
@@ -243,6 +313,7 @@ static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock ) {
        int state;
        uta_mhdr_t*     hdr;
+       int     tr_len;                                 // length from the message being sent (must snarf before send to use after send)
 
        // future: ensure that application did not overrun the XID buffer; last byte must be 0
 
@@ -254,6 +325,7 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock ) {
                strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );                                        // must overlay the source to be ours
        }
 
+       tr_len = RMR_TR_LEN( hdr );
        if( msg->flags & MFL_ZEROCOPY ) {                                                                       // faster sending with zcopy buffer
                if( (state = nn_send( nn_sock, &msg->header, NN_MSG, NN_DONTWAIT )) < 0 ) {
                        msg->state = state;
@@ -267,9 +339,9 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock ) {
        }
 
        // future:  if nano sends bytes, but less than mlen, then what to do?
-       if( msg->state >= 0 ) {                                                         // successful send
-               if( !(msg->flags & MFL_NOALLOC) ) {                             // if noalloc is set, then caller doesn't want a new buffer
-                       return alloc_zcmsg( ctx, msg, 0, RMR_OK );      // preallocate a zero-copy buffer and return msg
+       if( msg->state >= 0 ) {                                                                         // successful send
+               if( !(msg->flags & MFL_NOALLOC) ) {                                             // if noalloc is set, then caller doesn't want a new buffer
+                       return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len );      // preallocate a zero-copy buffer and return msg (with same trace len as sent buffer)
                } else {
                        rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
                        return NULL;
index 29be7d3..a1bba74 100644 (file)
@@ -109,12 +109,13 @@ static inline int xlate_nng_state( int state, int def_state );
 
 
 // --- msg ---------------------------------------
-static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state );
+static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo );
 static rmr_mbuf_t* alloc_mbuf( uta_ctx_t* ctx, int state );
 static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen ) ;
 static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  );
 static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg );
 static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg );
+static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  );
 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock, int retries );
 static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg );
 
index af8ce6f..a3fcd89 100644 (file)
@@ -119,10 +119,47 @@ extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
                return NULL;
        }
 
-       m = alloc_zcmsg( ctx, NULL, size, 0 );
+       m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );                              // alloc with default trace data
+       return  m;
+}
+
+
+/*
+       Allocates a send message as a zerocopy message allowing the underlying message protocol
+       to send the buffer without copy. In addition, a trace data field of tr_size will be
+       added and the supplied data coppied to the buffer before returning the message to
+       the caller.
+*/
+extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
+       uta_ctx_t*      ctx;
+       rmr_mbuf_t*     m;
+       int state;
+
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               return NULL;
+       }
+
+       m = alloc_zcmsg( ctx, NULL, size, 0, tr_size );                         // alloc with specific tr size
+       if( m != NULL ) {
+               state = rmr_set_trace( m, data, tr_size );                              // roll their data in
+               if( state != tr_size ) {
+                       m->state = RMR_ERR_INITFAILED;
+               }
+       }
+
        return  m;
 }
 
+/*
+       This provides an external path to the realloc static function as it's called by an
+       outward facing mbuf api function. Used to reallocate a message with a different
+       trace data size.
+*/
+extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
+       return realloc_msg( msg, new_tr_size );
+}
+
+
 /*
        Return the message to the available pool, or free it outright.
 */
@@ -436,7 +473,7 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
        if( old_msg ) {
                msg = old_msg;
        } else {
-               msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK );                      // will abort on failure, no need to check
+               msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
        }
 
        if( ms_to < 0 ) {
@@ -681,6 +718,27 @@ extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
        return init( uproto_port, max_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
 }
 
+/*
+       This sets the default trace length which will be added to any message buffers
+       allocated.  It can be set at any time, and if rmr_set_trace() is given a 
+       trace len that is different than the default allcoated in a message, the message
+       will be resized.
+
+       Returns 0 on failure and 1 on success. If failure, then errno will be set.
+*/
+extern int rmr_init_trace( void* vctx, int tr_len ) {
+       uta_ctx_t* ctx;
+
+       errno = 0;
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               errno = EINVAL;
+               return 0;
+       }
+
+       ctx->trace_data_len = tr_len;
+       return 1;
+}
+
 /*
        Return true if routing table is initialised etc. and app can send/receive.
 */
index 369c68e..3592a7b 100644 (file)
@@ -37,7 +37,6 @@
 #include <nng/protocol/pipeline0/push.h>
 #include <nng/protocol/pipeline0/pull.h>
 
-
 /*
        Translates the nng state passed in to one of ours that is suitable to put
        into the message, and sets errno to something that might be useful.
@@ -102,16 +101,22 @@ static inline int xlate_nng_state( int state, int def_state ) {
        including our header). If size is 0, then the buffer allocated is the size previously
        allocated (if msg is !nil) or the default size given at initialisation).
 
+       The trlo (trace data lengh override) is used for trace length if >0. If <= 0, then
+       the context value is used.
+
        NOTE:  while accurate, the nng doc implies that both the msg buffer and data buffer
                are zero copy, however ONLY the message is zero copy. We now allocate and use
                nng messages.
 */
-static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state ) {
+static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int trlo ) {
        size_t          mlen;                   // size of the transport buffer that we'll allocate
        uta_mhdr_t*     hdr;                    // convenience pointer
+       int                     tr_len;                 // trace data len (default or override)
 
-       mlen = sizeof( uta_mhdr_t ) + ctx->trace_data_len + ctx->d1_len + ctx->d2_len;  // start with header and trace/data lengths
-       mlen += (size > 0 ? size  : ctx->max_plen);                     // add user requested size or size set during init
+       tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
+
+       mlen = sizeof( uta_mhdr_t ) + tr_len + ctx->d1_len + ctx->d2_len;       // start with header and trace/data lengths
+       mlen += (size > 0 ? size  : ctx->max_plen);                                                     // add user requested size or size set during init
 
        if( msg == NULL ) {
                msg = (rmr_mbuf_t *) malloc( sizeof *msg );
@@ -141,7 +146,7 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
        }
        msg->len = 0;                                                                                   // length of data in the payload
        msg->alloc_len = mlen;                                                                  // length of allocated transport buffer
-       msg->payload = msg->header + PAYLOAD_OFFSET( hdr );             // past header, trace and other data bits
+       msg->payload = PAYLOAD_ADDR( hdr );                                             // point to payload (past all header junk)
        msg->xaction = ((uta_mhdr_t *)msg->header)->xid;                // point at transaction id in header area
        msg->state = state;                                                                             // fill in caller's state (likely the state of the last operation)
        msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
@@ -213,7 +218,7 @@ static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
 
        switch( ver ) {
                case 1:
-                       msg->len = ntohl( v1hdr->plen );                                                        // length sender says is in the payload (received length could be larger)
+                       msg->len = ntohl( v1hdr->plen );                                                // length sender says is in the payload (received length could be larger)
                        msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
                        msg->payload = msg->header + sizeof( uta_v1mhdr_t );    // point past header to payload (single buffer allocation above)
 
@@ -229,7 +234,7 @@ static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
                        msg->len = ntohl( hdr->plen );                                                  // length sender says is in the payload (received length could be larger)
                        msg->alloc_len = alen;                                                                  // length of whole tp buffer (including header, trace and data bits)
 
-                       msg->payload = msg->header + PAYLOAD_OFFSET( hdr );             // past header, trace and other data bits
+                       msg->payload = PAYLOAD_ADDR( hdr );                                             // at user payload
                        msg->xaction = &hdr->xid[0];                                                    // point at transaction id in header area
                        msg->flags |= MFL_ZEROCOPY;                                                             // this is a zerocopy sendable message
                        msg->mtype = ntohl( hdr->mtype );                                               // capture and convert from network order to local order
@@ -278,8 +283,80 @@ static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg  ) {
 
                default:                                                                                        // current message always caught  here
                        hdr = nm->header;
-                       memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) );         // copy complete header, trace and other data
-                       nm->payload = nm->header + PAYLOAD_OFFSET( hdr );               // point at the payload
+                       memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header ));  // copy complete header, trace and other data
+                       nm->payload = PAYLOAD_ADDR( hdr );                              // at user payload
+                       break;
+       }
+               
+       // --- these are all version agnostic -----------------------------------
+       nm->mtype = old_msg->mtype;
+       nm->len = old_msg->len;                                                                 // length of data in the payload
+       nm->alloc_len = mlen;                                                                   // length of allocated payload
+
+       nm->xaction = hdr->xid;                                                                 // reference xaction
+       nm->state = old_msg->state;                                                             // fill in caller's state (likely the state of the last operation)
+       nm->flags = old_msg->flags | MFL_ZEROCOPY;                              // this is a zerocopy sendable message
+       memcpy( nm->payload, old_msg->payload, old_msg->len );
+
+       return nm;
+}
+
+/*
+       This will clone a message with a change to the trace area in the header such that
+       it will be tr_len passed in. The trace area in the cloned message will be uninitialised.
+       The orignal message will be left unchanged, and a pointer to the new message is returned.
+       It is not possible to realloc buffers and change the data sizes.
+*/
+static inline rmr_mbuf_t* realloc_msg( rmr_mbuf_t* old_msg, int tr_len  ) {
+       rmr_mbuf_t* nm;                 // new message buffer
+       size_t  mlen;
+       int state;
+       uta_mhdr_t* hdr;
+       uta_v1mhdr_t* v1hdr;
+       int     tr_old_len;                     // tr size in new buffer
+       int     coffset;                        // an offset to something in the header for copy
+
+       nm = (rmr_mbuf_t *) malloc( sizeof *nm );
+       if( nm == NULL ) {
+               fprintf( stderr, "[CRI] rmr_clone: cannot get memory for message buffer\n" );
+               exit( 1 );
+       }
+       memset( nm, 0, sizeof( *nm ) );
+
+       hdr = old_msg->header;
+       tr_old_len = RMR_TR_LEN( hdr );                         // bytes in old header for trace
+
+       mlen = old_msg->alloc_len + (tr_len - tr_old_len);                                                      // new length with trace adjustment
+       if( DEBUG ) fprintf( stderr, "[DBUG] tr_realloc old size=%d new size=%d new tr_len=%d\n", (int) old_msg->alloc_len, (int) mlen, (int) tr_len );
+       if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
+               fprintf( stderr, "[CRI] rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
+               exit( 1 );
+       }
+
+       nm->header = nng_msg_body( nm->tp_buf );                                // set and copy the header from old message
+       v1hdr = (uta_v1mhdr_t *) old_msg->header;                               // v1 will work to dig header out of any version
+       switch( ntohl( v1hdr->rmr_ver ) ) {
+               case 1:
+                       memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) );             // copy complete header
+                       nm->payload = (void *) v1hdr + sizeof( *v1hdr );
+                       break;
+
+               default:                                                                                        // current message always caught  here
+                       hdr = nm->header;
+                       memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) );           // ONLY copy the header portion; trace and data might have changed
+                       if( RMR_D1_LEN( hdr )  ) {
+                               coffset = DATA1_OFFSET( hdr );                                                                                          // offset to d1
+                               memcpy( hdr + coffset, old_msg->header + coffset, RMR_D1_LEN( hdr ) );          // copy data1 and data2 if necessary
+                       
+                       }
+                       if( RMR_D2_LEN( hdr )  ) {
+                               coffset = DATA2_OFFSET( hdr );                                                                                          // offset to d2
+                               memcpy( hdr + coffset, old_msg->header + coffset, RMR_D2_LEN( hdr ) );          // copy data2 and data2 if necessary
+                       }
+
+                       SET_HDR_TR_LEN( hdr, tr_len );                                                                          // MUST set before pointing payload
+                       nm->payload = PAYLOAD_ADDR( hdr );                                                                      // directly at the payload
+                       SET_HDR_TR_LEN( hdr, tr_len );                                                                          // do NOT copy old trace data, just set the new header
                        break;
        }
                
@@ -358,7 +435,6 @@ static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
                hdr = (uta_mhdr_t *) msg->header;
                msg->flags |= MFL_ADDSRC;                                       // turn on so if user app tries to send this buffer we reset src
 
-
                if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n", 
                                msg->mtype, msg->state, msg->len,  msg->payload - (unsigned char *) msg->header );
        } else {
@@ -391,7 +467,7 @@ static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
        if( old_msg ) {
                msg = old_msg;
        } else {
-               msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK );                      // will abort on failure, no need to check
+               msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
        }
 
        msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS );                  // blocks hard until received
@@ -430,12 +506,14 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock
        uta_mhdr_t*     hdr;
        int nng_flags = NNG_FLAG_NONBLOCK;              // if we need to set any nng flags (zc buffer) add it to this
        int spin_retries = 1000;                                // if eagain/timeout we'll spin this many times before giving up the CPU
+       int     tr_len;                                                         // trace len in sending message so we alloc new message with same trace size
 
        // future: ensure that application did not overrun the XID buffer; last byte must be 0
 
        hdr = (uta_mhdr_t *) msg->header;
        hdr->mtype = htonl( msg->mtype );                                                               // stash type/len in network byte order for transport
        hdr->plen = htonl( msg->len );
+       tr_len = RMR_TR_LEN( hdr );                                                                             // snarf trace len before sending as hdr is invalid after send
 
        if( msg->flags & MFL_ADDSRC ) {                                                                 // buffer was allocated as a receive buffer; must add our source
                strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );                                        // must overlay the source to be ours
@@ -462,6 +540,7 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock
                                msg->state = RMR_OK;
                                msg->header = NULL;                                                                                     // nano frees; don't risk accessing later by mistake
                                msg->tp_buf = NULL;
+                               hdr = NULL;
                        }
                } while( state && retries > 0 );
        } else {
@@ -478,9 +557,9 @@ static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, nng_socket nn_sock
                */
        }
 
-       if( msg->state == RMR_OK ) {                                                            // successful send
-               if( !(msg->flags & MFL_NOALLOC) ) {                             // allocate another sendable zc buffer unless told otherwise
-                       return alloc_zcmsg( ctx, msg, 0, RMR_OK );      // preallocate a zero-copy buffer and return msg
+       if( msg->state == RMR_OK ) {                                                                    // successful send
+               if( !(msg->flags & MFL_NOALLOC) ) {                                                     // allocate another sendable zc buffer unless told otherwise
+                       return alloc_zcmsg( ctx, msg, 0, RMR_OK, tr_len );              // preallocate a zero-copy buffer and return msg
                } else {
                        rmr_free_msg( msg );                                            // not wanting a meessage back, trash this one
                        return NULL;
index 4c0cd5e..8598136 100644 (file)
@@ -42,6 +42,7 @@
 int mbuf_api_test( ) {
        unsigned char* c;
        int i;
+       int state;
        int errors = 0;
        rmr_mbuf_t*     mbuf;
        unsigned char src_buf[256];
@@ -57,6 +58,7 @@ int mbuf_api_test( ) {
        mbuf->header = mbuf->payload;
        mbuf->alloc_len = 1024;
 
+       // --- test payload field  access functions ---------------------------------------------------
        memset( src_buf, 0, sizeof( src_buf ) );
        rmr_bytes2payload( mbuf, NULL, strlen( src_buf) );                              // errno should be set on return
        errors += fail_if( errno == 0, "buf copy to payload with nil src returned good errno" );
@@ -76,6 +78,8 @@ int mbuf_api_test( ) {
        snprintf( src_buf, sizeof( src_buf ), "This is some text in the buffer" );
        rmr_str2payload( mbuf, src_buf );                                                       // this uses bytes2payload, so only one invocation needed
 
+
+       // --- test meid field  access functions ---------------------------------------------------
        errno = 0;
        i = rmr_bytes2meid( NULL, src_buf, RMR_MAX_MEID );
        errors += fail_if( errno == 0, "(errno) attempt to copy bytes to meid with nil message" );
@@ -121,6 +125,24 @@ int mbuf_api_test( ) {
        errors += fail_if( i == RMR_OK, "(rv) attempt to copy string to meid with large source buffer" );
 
 
+       snprintf( src_buf, sizeof( src_buf ), "test-meid" );
+       rmr_str2meid( mbuf, src_buf );
+
+       errno = 0;
+       c = rmr_get_meid( NULL, NULL );
+       errors += fail_if( c != NULL, "get meid with nil message buffer" );
+       errors += fail_if( errno == 0, "(errno bad) get meid with nil msg buffer" ); 
+       
+       c = rmr_get_meid( mbuf, NULL );                 // should allocate and return c
+       errors += fail_if( c == NULL, "get meid with nil dest pointer (did not allocate a buffer)" );
+       errors += fail_if( strcmp( c, "test-meid" ) != 0, "did not get expected meid from mbuffer" );
+
+       c = rmr_get_meid( mbuf, c );
+       errors += fail_if( c == NULL, "get meid with a dest pointer returned no pointer" );
+       errors += fail_if( strcmp( c, "test-meid" ) != 0, "did not get expected meid from mbuffer" );
+
+
+       // --- test transaction field  access functions ---------------------------------------------------
        errno = 0;
        i = rmr_bytes2xact( NULL, src_buf, RMR_MAX_XID );
        errors += fail_if( errno == 0, "(errno) attempt to copy bytes to xact with nil message" );
@@ -165,23 +187,67 @@ int mbuf_api_test( ) {
        errors += fail_if( errno == 0, "(errno) attempt to copy string to xact with large source buffer" );
        errors += fail_if( i == RMR_OK, "(rv) attempt to copy string to xact with large source buffer" );
 
-       
-       snprintf( src_buf, sizeof( src_buf ), "test-meid" );
-       rmr_str2meid( mbuf, src_buf );
 
-       errno = 0;
-       c = rmr_get_meid( NULL, NULL );
-       errors += fail_if( c != NULL, "get meid with nil message buffer" );
-       errors += fail_if( errno == 0, "(errno bad) get meid with nil msg buffer" ); 
+
+       // ------------ trace data tests ----------------------------------------------------------------
+       // CAUTION: to support standalone mbuf api tests, the underlying buffer reallocation functions are NOT used
+       //                      if this is driven by the mbuf_api_test.c driver
+
+       mbuf = test_mk_msg( 2048, 0 );          // initially no trace size to force realloc
+
+       state = TRACE_OFFSET( mbuf->header ) - PAYLOAD_OFFSET( mbuf->header );          // no trace data, payload and trace offset should be the same
+       errors += fail_not_equal( state, 0, "trace offset and payload offset do NOT match when trace data is absent" );
+
+       state = rmr_get_trlen( mbuf );
+       errors += fail_not_equal( state, 0, "initial trace len reported (a) does not match expected (b)" );
+
+       state = rmr_set_trace( NULL, src_buf, 100 );                            // coverage test on nil check
+       errors += fail_not_equal( state, 0, "set trace with nil msg didn't return expected 0 status" );
+
+       state = rmr_set_trace( mbuf, src_buf, 0 );                              // coverage test on length check
+       errors += fail_not_equal( state, 0, "set trace with 0 len didn't return expected 0 status" );
+
+       state = rmr_get_trace( NULL, src_buf, 100 );                            // coverage test on nil check
+       errors += fail_not_equal( state, 0, "get trace with nil msg didn't return expected 0 status" );
+
+       state = rmr_get_trace( mbuf, NULL, 100 );                                       // coverage test on nil check
+       errors += fail_not_equal( state, 0, "get trace with nil dest didn't return expected 0 status" );
+
+       state = rmr_get_trlen( NULL );                                                          // coverage test on nil check
+       errors += fail_not_equal( state, 0, "get trace length with nil msg didn't return expected 0 status" );
 
        
-       c = rmr_get_meid( mbuf, NULL );                 // should allocate and return c
-       errors += fail_if( c == NULL, "get meid with nil dest pointer (did not allocate a buffer)" );
-       errors += fail_if( strcmp( c, "test-meid" ) != 0, "did not get expected meid from mbuffer" );
+       src_buf[0] = 0;
+       state = rmr_set_trace( mbuf, "foo bar was here", 17 );          // should force a realloc
+       errors += fail_not_equal( state, 17, "bytes copied to trace (a) did not match expected size (b)" );
 
-       c = rmr_get_meid( mbuf, c );
-       errors += fail_if( c == NULL, "get meid with a dest pointer returned no pointer" );
-       errors += fail_if( strcmp( c, "test-meid" ) != 0, "did not get expected meid from mbuffer" );
+       state = rmr_get_trace( mbuf, src_buf, 17 );
+       errors += fail_not_equal( state, 17, "bytes retrieved from trace (a) did not match expected size (b)" );
+
+       state = rmr_get_trlen( mbuf );
+       errors += fail_not_equal( state, 17, "trace len reported (a) does not match expected (b)" );
+       state = strcmp( src_buf, "foo bar was here" );
+       errors+= fail_not_equal( state, 0, "compare of pulled trace info did not match" );
+
+       state = TRACE_OFFSET( mbuf->header ) - PAYLOAD_OFFSET( mbuf->header );          // when there is a trace area these should NOT be the same
+       errors += fail_if_equal( state, 0, "trace offset and payload offset match when trace data is present" );
+
+
+                                                                                       // second round of trace testing, allocating a message with a trace size that matches
+       mbuf = test_mk_msg( 2048, 17 );                 // trace size that matches what we'll stuff in, no realloc
+       state = rmr_get_trlen( mbuf );
+       errors += fail_not_equal( state, 17, "alloc with trace size: initial trace len reported (a) does not match expected (b)" );
+
+       src_buf[0] = 0;
+       state = rmr_set_trace( mbuf, "foo bar was here", 17 );          // should force a realloc
+       errors += fail_not_equal( state, 17, "bytes copied to trace (a) did not match expected size (b)" );
+
+       state = rmr_get_trace( mbuf, src_buf, 17 );
+       errors += fail_not_equal( state, 17, "bytes retrieved from trace (a) did not match expected size (b)" );
+       state = strcmp( src_buf, "foo bar was here" );
+       errors+= fail_not_equal( state, 0, "compare of pulled trace info did not match" );
+
+       i = rmr_get_trlen( mbuf );
        
 
        return errors > 0;                      // overall exit code bad if errors
index 858efdc..30427b1 100644 (file)
 /*
        Mnemonic:       mbuf_api_test.c
        Abstract:       Unit tests for the mbuf common API functions.
+                               To allow the mbuf functions to be tested without the bulk of the
+                               RMr mechanics, we dummy up a couple of functions that are in 
+                               rmr[_nng].c. 
+
        Author:         E. Scott Daniels
        Date:           2 April 2019
 */
 #include "test_support.c"                                              // our private library of test tools
 #include "mbuf_api_static_test.c"                              // test functions
 
+// --- dummies -------------------------------------------------------------------
+
+/*
+       This will leak, but we're not here to test free.
+*/
+extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
+       return;
+}
+
+/*
+       Minimal buffer realloc to allow api to be tested without coverage hit if
+       we actually pulled in the sr static set.
+
+       WARNING:  this is NOT a complete realloc.  We assume that we are testing
+                       just the trace length adjustment portion of the set_trace() 
+                       API and are not striving to test the real realloc function. That
+                       will be tested when the mbuf_api_static_test code is used by the
+                       more generic RMr test.  So, not all fields in the realloc'd buffer
+                       can be used.
+*/
+extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
+       rmr_mbuf_t*     new_msg;
+       uta_mhdr_t* hdr;
+
+       new_msg = (rmr_mbuf_t *) malloc( sizeof *new_msg );
+       new_msg->tp_buf = (void *) malloc( 2048 );
+       memset( new_msg->tp_buf, 0, 2048 );
+       hdr = (uta_mhdr_t*) new_msg->tp_buf;
+       SET_HDR_LEN( hdr );
+       SET_HDR_TR_LEN( hdr, new_tr_size );
+
+       new_msg->payload = new_msg->tp_buf + new_tr_size;
+       new_msg->header = new_msg->tp_buf;
+       new_msg->alloc_len = 2048;
+       new_msg->len = msg->len;
+       
+       return new_msg;
+}
+
+// --------------------------------------------------------------------------------
+
 int main( ) {
        int errors = 0;
 
index e9ecb72..3d09f28 100644 (file)
@@ -73,7 +73,7 @@ static void send_n_msgs( void* ctx, int n ) {
        }
 
        for( i = 0; i < n; i++ ) {
-fprintf( stderr, "mass send\n" );
+               //fprintf( stderr, "mass send\n" );
                msg->len = 100;
                msg->mtype = 1;
                msg->state = 999;
@@ -91,6 +91,7 @@ static int rmr_api_test( ) {
        int             v = 0;                                  // some value
        char    wbuf[128];
        int             i;
+       int             state;
 
        v = rmr_ready( NULL );
        errors += fail_if( v != 0, "rmr_ready returned true before initialisation" );
@@ -253,6 +254,22 @@ static int rmr_api_test( ) {
        }
        errors += fail_if( i >= 40, "torcv_msg never returned a timeout" );
 
+
+       // ---- trace things that are not a part of the mbuf_api functions and thus must be tested here
+       state = rmr_init_trace( NULL, 37 );                                             // coverage test nil context
+       errors += fail_not_equal( state, 0, "attempt to initialise trace with nil context returned non-zero state (a)" );
+       errors += fail_if_equal( errno, 0, "attempt to initialise trace with nil context did not set errno as expected" );
+
+       state = rmr_init_trace( rmc, 37 );
+       errors += fail_if_equal( state, 0, "attempt to set trace len in context was not successful" );
+       errors += fail_not_equal( errno, 0, "attempt to set trace len in context did not clear errno" );
+
+       msg = rmr_tralloc_msg( rmc, 1024, 17, "1904308620110417" );
+       errors += fail_if_nil( msg, "attempt to allocate message with trace data returned nil message" );
+       state = rmr_get_trace( msg, wbuf, 17 );
+       errors += fail_not_equal( state, 17, "len of trace data (a) returned after msg allocation was not expected size (b)" );
+       state = strcmp( wbuf, "1904308620110417" );
+       errors += fail_not_equal( state, 0, "trace data returned after tralloc was not correct" );
        
        em_send_failures = 1;
        send_n_msgs( rmc, 30 );                 // send 30 messages with emulation failures
@@ -262,10 +279,6 @@ static int rmr_api_test( ) {
        rmr_close( NULL );                      // drive for coverage
        rmr_close( rmc );                       // no return to check; drive for coverage
 
-//extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
-//extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
-
-
 
        if( ! errors ) {
                fprintf( stderr, "<INFO> all RMr API tests pass\n" );
index 1ebdb94..0b3068e 100644 (file)
@@ -71,6 +71,7 @@
 
 #include "../src/common/src/symtab.c"
 #include "../src/nng/src/rmr_nng.c"
+#include "../src/common/src/mbuf_api.c"
 
 static void gen_rt( uta_ctx_t* ctx );          // defined in sr_nng_static_test, but used by a few others (eliminate order requirement below)
 
@@ -84,6 +85,7 @@ static void gen_rt( uta_ctx_t* ctx );         // defined in sr_nng_static_test, but use
 #include "sr_nng_static_test.c"
 #include "wormhole_static_test.c"
 #include "rmr_nng_api_static_test.c"
+#include "mbuf_api_static_test.c"
 
 
 /*
@@ -120,6 +122,10 @@ int main() {
        errors += sr_nng_test();                                // test the send/receive static functions
        fprintf( stderr, "<INFO> error count: %d\n", errors );
 
+       fprintf( stderr, "<INFO> starting mbuf api tests\n" );
+       errors +=  mbuf_api_test( );
+       fprintf( stderr, "<INFO> error count: %d\n", errors );
+
        if( errors == 0 ) {
                fprintf( stderr, "<PASS> all tests were OK\n" );
        } else {
index f1e6981..26f81b7 100644 (file)
@@ -75,8 +75,9 @@ static int rt_test( ) {
        nng_socket nn_sock;             // this is a struct in nng, so difficult to validate
 
        setenv( "ENV_VERBOSE_FILE", ".ut_rmr_verbose", 1 );                     // allow for verbose code in rtc to be driven
-       i = open( ".rmr_verbose", O_CREAT, 0664 );
+       i = open( ".ut_rmr_verbose", O_RDWR | O_CREAT, 0644 );
        if( i >= 0 ) {
+               write( 1, "2\n", 2 );
                close( i );
        }
 
index 69090a1..9b936cc 100644 (file)
@@ -173,14 +173,16 @@ static int sr_nng_test() {
        errors += fail_if( state != 99, "xlate_nng_state did not return  default for unknown error" );
        errors += fail_if( errno == 0, "xlate_nng_state did not set errno (6)" );
 
-       // ---- drive rtc in a 'static' (not pthreaded) mode -----
-       setenv( "ENV_VERBOSE_FILE", ".ut_rmr_verbose", 1 );                     // allow for verbose code in rtc to be driven
-       i = open( ".rmr_verbose", O_CREAT, 0664 );
+       // ---- drive rtc in a 'static' (not pthreaded) mode to get some coverage; no 'results' to be verified -----
+       setenv( ENV_RTG_RAW, "1", 1 );                                                          // rtc should expect raw messages (mostly coverage here)
+       setenv( ENV_VERBOSE_FILE, ".ut_rmr_verbose", 1 );                       // allow for verbose code in rtc to be driven
+       i = open( ".ut_rmr_verbose", O_RDWR | O_CREAT, 0654 );
        if( i >= 0 ) {
-               write( i, "0\n", 2 );
+               write( i, "2\n", 2 );
                close( i );
        }
        ctx->shutdown = 1;                      // should force rtc to quit on first pass
+       rtc( NULL );                            // coverage test with nil pointer
        rtc( ctx );
        
 
index 76b2765..e2efa2b 100644 (file)
@@ -27,6 +27,8 @@
        Author:         E. Scott Daniels
 */
 
+#define NO_DUMMY_RMR 1                 // no dummy rmr functions; we don't pull in rmr.h or agnostic.h
+
 #include "../src/common/include/rmr_symtab.h"
 #include "../src/common/src/symtab.c"
 
index 4ed976c..49012c4 100644 (file)
@@ -138,4 +138,33 @@ static int fail_if_equal( int a, int b, char* what ) {
        return a != b ? GOOD : BAD;                     // user may override good/bad so do NOT return a==b directly!
 }
 
+
+#ifndef NO_DUMMY_RMR
+/*
+       Dummy message allocator for testing without sr_static functions
+*/
+static rmr_mbuf_t* test_mk_msg( int len, int tr_len ) {
+       rmr_mbuf_t*     new_msg;
+       uta_mhdr_t* hdr;
+       size_t  alen;
+
+       alen = sizeof( *hdr ) + tr_len + len;
+
+       new_msg = (rmr_mbuf_t *) malloc( sizeof *new_msg );
+       new_msg->tp_buf = (void *) malloc( alen );
+       memset( new_msg->tp_buf, 0, alen );
+
+       hdr = (uta_mhdr_t*) new_msg->tp_buf;
+       SET_HDR_LEN( hdr );
+       SET_HDR_TR_LEN( hdr, tr_len );
+
+       new_msg->header = new_msg->tp_buf;
+       new_msg->payload =  new_msg->header + PAYLOAD_OFFSET( hdr );
+       new_msg->alloc_len = alen;
+       new_msg->len = 0;
+       
+       return new_msg;
+}
+#endif
+
 #endif
index d876c0e..21848af 100755 (executable)
@@ -266,6 +266,7 @@ builder="make -B %s"                # default to plain ole make
 verbose=0
 show_all=1                                     # show all things -F sets to show failures only
 strict=0                                       # -s (strict) will set; when off, coverage state ignored in final pass/fail
+show_output=0                          # show output from each test execution (-S)
 
 while [[ $1 == "-"* ]]
 do
@@ -282,6 +283,7 @@ do
                -F)     show_all=0;;
 
                -s)     strict=1;;                                      # coverage counts toward pass/fail state
+               -S)     show_output=1;;                         # test output shown even on success
                -v)     (( verbose++ ));;
 
                -h)     usage; exit 0;;
@@ -354,6 +356,13 @@ do
                cat /tmp/PID$$.log
                (( ut_errors++ ))                               # cause failure even if not in strict mode
                continue                                                # skip coverage tests for this
+       else
+               if (( show_output ))
+               then
+                       printf "\n============= test programme output =======================\n"
+                       cat /tmp/PID$$.log
+                       printf "===========================================================\n"
+               fi
        fi
 
        (