X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=src%2Fnng%2Fsrc%2Frmr_nng.c;h=a3fcd8977b7c7628c42b9669f0b08c3c20b6b9c8;hb=ae8e63b75e4c5e754c1f3c2d4a600100a05225a0;hp=9a9a0430c7061d34b63f4625e7e9c8b796812b2f;hpb=fd9cc7a5b3355146388ebdf4d558cb284c66c5f1;p=ric-plt%2Flib%2Frmr.git diff --git a/src/nng/src/rmr_nng.c b/src/nng/src/rmr_nng.c index 9a9a043..a3fcd89 100644 --- a/src/nng/src/rmr_nng.c +++ b/src/nng/src/rmr_nng.c @@ -104,7 +104,7 @@ extern int rmr_payload_size( rmr_mbuf_t* msg ) { } errno = 0; - return msg->alloc_len - sizeof( uta_mhdr_t ); // figure size should we not have a msg buffer + return msg->alloc_len - RMR_HDR_LEN( msg->header ); // allocated transport size less the header and other data bits } /* @@ -119,10 +119,47 @@ extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) { return NULL; } - m = alloc_zcmsg( ctx, NULL, size, 0 ); + m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN ); // alloc with default trace data return m; } + +/* + Allocates a send message as a zerocopy message allowing the underlying message protocol + to send the buffer without copy. In addition, a trace data field of tr_size will be + added and the supplied data coppied to the buffer before returning the message to + the caller. +*/ +extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) { + uta_ctx_t* ctx; + rmr_mbuf_t* m; + int state; + + if( (ctx = (uta_ctx_t *) vctx) == NULL ) { + return NULL; + } + + m = alloc_zcmsg( ctx, NULL, size, 0, tr_size ); // alloc with specific tr size + if( m != NULL ) { + state = rmr_set_trace( m, data, tr_size ); // roll their data in + if( state != tr_size ) { + m->state = RMR_ERR_INITFAILED; + } + } + + return m; +} + +/* + This provides an external path to the realloc static function as it's called by an + outward facing mbuf api function. Used to reallocate a message with a different + trace data size. +*/ +extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) { + return realloc_msg( msg, new_tr_size ); +} + + /* Return the message to the available pool, or free it outright. */ @@ -436,7 +473,7 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) { if( old_msg ) { msg = old_msg; } else { - msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK ); // will abort on failure, no need to check + msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN ); // will abort on failure, no need to check } if( ms_to < 0 ) { @@ -575,8 +612,8 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { int state; if( ! announced ) { - fprintf( stderr, "[INFO] ric message routing library on NNG (%s %s.%s.%s built: %s)\n", - QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ ); + fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d (%s %s.%s.%s built: %s)\n", + RMR_MSG_VER, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ ); announced = 1; } @@ -596,17 +633,11 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { ctx->send_retries = 1; // default is not to sleep at all; RMr will retry about 10K times before returning ctx->mring = uta_mk_ring( 128 ); // message ring to hold asynch msgs received while waiting for call response - ctx->max_plen = RMR_MAX_RCV_BYTES + sizeof( uta_mhdr_t ); // default max buffer size + ctx->max_plen = RMR_MAX_RCV_BYTES; // max user payload lengh if( max_msg_size > 0 ) { - if( max_msg_size <= ctx->max_plen ) { // user defined len can be smaller - ctx->max_plen = max_msg_size; - } else { - fprintf( stderr, "[WRN] rmr_init: attempt to set max payload len > than allowed maximum; capped at %d bytes\n", ctx->max_plen ); - } + ctx->max_plen = max_msg_size; } - ctx->max_mlen = ctx->max_plen + sizeof( uta_mhdr_t ); - // we're using a listener to get rtg updates, so we do NOT need this. //uta_lookup_rtg( ctx ); // attempt to fill in rtg info; rtc will handle missing values/errors @@ -687,6 +718,27 @@ extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) { return init( uproto_port, max_msg_size, flags & UFL_MASK ); // ensure any internal flags are off } +/* + This sets the default trace length which will be added to any message buffers + allocated. It can be set at any time, and if rmr_set_trace() is given a + trace len that is different than the default allcoated in a message, the message + will be resized. + + Returns 0 on failure and 1 on success. If failure, then errno will be set. +*/ +extern int rmr_init_trace( void* vctx, int tr_len ) { + uta_ctx_t* ctx; + + errno = 0; + if( (ctx = (uta_ctx_t *) vctx) == NULL ) { + errno = EINVAL; + return 0; + } + + ctx->trace_data_len = tr_len; + return 1; +} + /* Return true if routing table is initialised etc. and app can send/receive. */ @@ -719,7 +771,7 @@ extern int rmr_get_rcvfd( void* vctx ) { } if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) { - fprintf( stderr, ">>> cannot get recv fd: %s\n", nng_strerror( state ) ); + fprintf( stderr, "[WRN] rmr cannot get recv fd: %s\n", nng_strerror( state ) ); return -1; }