feat(API): Add trace data functions
[ric-plt/lib/rmr.git] / src / nng / src / rmr_nng.c
index 9a9a043..a3fcd89 100644 (file)
@@ -104,7 +104,7 @@ extern int rmr_payload_size( rmr_mbuf_t* msg ) {
        }
 
        errno = 0;
-       return msg->alloc_len - sizeof( uta_mhdr_t );                                           // figure size should we not have a msg buffer
+       return msg->alloc_len - RMR_HDR_LEN( msg->header );                             // allocated transport size less the header and other data bits
 }
 
 /*
@@ -119,10 +119,47 @@ extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) {
                return NULL;
        }
 
-       m = alloc_zcmsg( ctx, NULL, size, 0 );
+       m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );                              // alloc with default trace data
        return  m;
 }
 
+
+/*
+       Allocates a send message as a zerocopy message allowing the underlying message protocol
+       to send the buffer without copy. In addition, a trace data field of tr_size will be
+       added and the supplied data coppied to the buffer before returning the message to
+       the caller.
+*/
+extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
+       uta_ctx_t*      ctx;
+       rmr_mbuf_t*     m;
+       int state;
+
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               return NULL;
+       }
+
+       m = alloc_zcmsg( ctx, NULL, size, 0, tr_size );                         // alloc with specific tr size
+       if( m != NULL ) {
+               state = rmr_set_trace( m, data, tr_size );                              // roll their data in
+               if( state != tr_size ) {
+                       m->state = RMR_ERR_INITFAILED;
+               }
+       }
+
+       return  m;
+}
+
+/*
+       This provides an external path to the realloc static function as it's called by an
+       outward facing mbuf api function. Used to reallocate a message with a different
+       trace data size.
+*/
+extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
+       return realloc_msg( msg, new_tr_size );
+}
+
+
 /*
        Return the message to the available pool, or free it outright.
 */
@@ -436,7 +473,7 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
        if( old_msg ) {
                msg = old_msg;
        } else {
-               msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK );                      // will abort on failure, no need to check
+               msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN );                  // will abort on failure, no need to check
        }
 
        if( ms_to < 0 ) {
@@ -575,8 +612,8 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        int             state;
 
        if( ! announced ) {
-               fprintf( stderr, "[INFO] ric message routing library on NNG (%s %s.%s.%s built: %s)\n", 
-                       QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
+               fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d (%s %s.%s.%s built: %s)\n", 
+                       RMR_MSG_VER, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
                announced = 1;
        }
 
@@ -596,17 +633,11 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
        ctx->mring = uta_mk_ring( 128 );                                // message ring to hold asynch msgs received while waiting for call response
 
-       ctx->max_plen = RMR_MAX_RCV_BYTES + sizeof( uta_mhdr_t );               // default max buffer size
+       ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
        if( max_msg_size > 0 ) {
-               if( max_msg_size <= ctx->max_plen ) {                                           // user defined len can be smaller
-                       ctx->max_plen = max_msg_size;
-               } else {
-                       fprintf( stderr, "[WRN] rmr_init: attempt to set max payload len > than allowed maximum; capped at %d bytes\n", ctx->max_plen );
-               }
+               ctx->max_plen = max_msg_size;
        }
 
-       ctx->max_mlen = ctx->max_plen + sizeof( uta_mhdr_t );
-
        // we're using a listener to get rtg updates, so we do NOT need this.
        //uta_lookup_rtg( ctx );                                                        // attempt to fill in rtg info; rtc will handle missing values/errors
 
@@ -687,6 +718,27 @@ extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
        return init( uproto_port, max_msg_size, flags & UFL_MASK  );            // ensure any internal flags are off
 }
 
+/*
+       This sets the default trace length which will be added to any message buffers
+       allocated.  It can be set at any time, and if rmr_set_trace() is given a 
+       trace len that is different than the default allcoated in a message, the message
+       will be resized.
+
+       Returns 0 on failure and 1 on success. If failure, then errno will be set.
+*/
+extern int rmr_init_trace( void* vctx, int tr_len ) {
+       uta_ctx_t* ctx;
+
+       errno = 0;
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               errno = EINVAL;
+               return 0;
+       }
+
+       ctx->trace_data_len = tr_len;
+       return 1;
+}
+
 /*
        Return true if routing table is initialised etc. and app can send/receive.
 */
@@ -719,7 +771,7 @@ extern int rmr_get_rcvfd( void* vctx ) {
        }
 
        if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
-               fprintf( stderr, ">>> cannot get recv fd: %s\n", nng_strerror( state ) );
+               fprintf( stderr, "[WRN] rmr cannot get recv fd: %s\n", nng_strerror( state ) );
                return -1;
        }