}
errno = 0;
- return msg->alloc_len - sizeof( uta_mhdr_t ); // figure size should we not have a msg buffer
+ return msg->alloc_len - RMR_HDR_LEN( msg->header ); // allocated transport size less the header and other data bits
}
/*
return NULL;
}
- m = alloc_zcmsg( ctx, NULL, size, 0 );
+ m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN ); // alloc with default trace data
return m;
}
+
+/*
+ Allocates a send message as a zerocopy message allowing the underlying message protocol
+ to send the buffer without copy. In addition, a trace data field of tr_size will be
+ added and the supplied data coppied to the buffer before returning the message to
+ the caller.
+*/
+extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
+ uta_ctx_t* ctx;
+ rmr_mbuf_t* m;
+ int state;
+
+ if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+ return NULL;
+ }
+
+ m = alloc_zcmsg( ctx, NULL, size, 0, tr_size ); // alloc with specific tr size
+ if( m != NULL ) {
+ state = rmr_set_trace( m, data, tr_size ); // roll their data in
+ if( state != tr_size ) {
+ m->state = RMR_ERR_INITFAILED;
+ }
+ }
+
+ return m;
+}
+
+/*
+ This provides an external path to the realloc static function as it's called by an
+ outward facing mbuf api function. Used to reallocate a message with a different
+ trace data size.
+*/
+extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
+ return realloc_msg( msg, new_tr_size );
+}
+
+
/*
Return the message to the available pool, or free it outright.
*/
if( old_msg ) {
msg = old_msg;
} else {
- msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK ); // will abort on failure, no need to check
+ msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN ); // will abort on failure, no need to check
}
if( ms_to < 0 ) {
int state;
if( ! announced ) {
- fprintf( stderr, "[INFO] ric message routing library on NNG (%s %s.%s.%s built: %s)\n",
- QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
+ fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d (%s %s.%s.%s built: %s)\n",
+ RMR_MSG_VER, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
announced = 1;
}
ctx->send_retries = 1; // default is not to sleep at all; RMr will retry about 10K times before returning
ctx->mring = uta_mk_ring( 128 ); // message ring to hold asynch msgs received while waiting for call response
- ctx->max_plen = RMR_MAX_RCV_BYTES + sizeof( uta_mhdr_t ); // default max buffer size
+ ctx->max_plen = RMR_MAX_RCV_BYTES; // max user payload lengh
if( max_msg_size > 0 ) {
- if( max_msg_size <= ctx->max_plen ) { // user defined len can be smaller
- ctx->max_plen = max_msg_size;
- } else {
- fprintf( stderr, "[WRN] rmr_init: attempt to set max payload len > than allowed maximum; capped at %d bytes\n", ctx->max_plen );
- }
+ ctx->max_plen = max_msg_size;
}
- ctx->max_mlen = ctx->max_plen + sizeof( uta_mhdr_t );
-
// we're using a listener to get rtg updates, so we do NOT need this.
//uta_lookup_rtg( ctx ); // attempt to fill in rtg info; rtc will handle missing values/errors
return init( uproto_port, max_msg_size, flags & UFL_MASK ); // ensure any internal flags are off
}
+/*
+ This sets the default trace length which will be added to any message buffers
+ allocated. It can be set at any time, and if rmr_set_trace() is given a
+ trace len that is different than the default allcoated in a message, the message
+ will be resized.
+
+ Returns 0 on failure and 1 on success. If failure, then errno will be set.
+*/
+extern int rmr_init_trace( void* vctx, int tr_len ) {
+ uta_ctx_t* ctx;
+
+ errno = 0;
+ if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+ errno = EINVAL;
+ return 0;
+ }
+
+ ctx->trace_data_len = tr_len;
+ return 1;
+}
+
/*
Return true if routing table is initialised etc. and app can send/receive.
*/
}
if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
- fprintf( stderr, ">>> cannot get recv fd: %s\n", nng_strerror( state ) );
+ fprintf( stderr, "[WRN] rmr cannot get recv fd: %s\n", nng_strerror( state ) );
return -1;
}