X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Fnanomsg%2Fsrc%2Frmr.c;h=c9bb694d347f2ef17fb2181596b071c0414a467d;hb=ae8e63b75e4c5e754c1f3c2d4a600100a05225a0;hp=29001176b0c650a8d8b54c3628ffbf2ac2aa7e0e;hpb=68c5cf1104e89f5c43786a3e48f5c6a1e757f59f;p=ric-plt%2Flib%2Frmr.git diff --git a/src/nanomsg/src/rmr.c b/src/nanomsg/src/rmr.c index 2900117..c9bb694 100644 --- a/src/nanomsg/src/rmr.c +++ b/src/nanomsg/src/rmr.c @@ -80,8 +80,8 @@ static void free_ctx( uta_ctx_t* ctx ) { // --------------- public functions -------------------------------------------------------------------------- /* - Set the receive timeout to time. If time >1000 we assume the time is milliseconds, - else we assume seconds. Setting -1 is always block. + Set the receive timeout to time (ms). A value of 0 is the same as a non-blocking + receive and -1 is block for ever. Returns the nn value (0 on success <0 on error). */ extern int rmr_set_rtimeout( void* vctx, int time ) { @@ -92,11 +92,11 @@ extern int rmr_set_rtimeout( void* vctx, int time ) { return -1; } - if( time > 0 ) { - if( time < 1000 ) { - time = time * 1000; // assume seconds, nn wants ms - } - } + if( ctx->last_rto == time ) { + return 0; + } + + ctx->last_rto = time; return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_RCVTIMEO, &time, sizeof( time ) ); } @@ -108,7 +108,6 @@ extern int rmr_rcv_to( void* vctx, int time ) { return rmr_rcv_to( vctx, time ); } - /* Set the send timeout to time. If time >1000 we assume the time is milliseconds, else we assume seconds. Setting -1 is always block. @@ -169,10 +168,44 @@ extern rmr_mbuf_t* rmr_alloc_msg( void* vctx, int size ) { return NULL; } - m = alloc_zcmsg( ctx, NULL, size, 0 ); + m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN ); return m; } +/* + Allocates a send message as a zerocopy message allowing the underlying message protocol + to send the buffer without copy. In addition, a trace data field of tr_size will be + added and the supplied data coppied to the buffer before returning the message to + the caller. +*/ +extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) { + uta_ctx_t* ctx; + rmr_mbuf_t* m; + int state; + + if( (ctx = (uta_ctx_t *) vctx) == NULL ) { + return NULL; + } + + m = alloc_zcmsg( ctx, NULL, size, 0, tr_size ); // alloc with specific tr size + if( m != NULL ) { + state = rmr_set_trace( m, data, tr_size ); // roll their data in + if( state != tr_size ) { + m->state = RMR_ERR_INITFAILED; + } + } + + return m; +} + +/* + Need an external path to the realloc static function as it's called by an + outward facing mbuf api function. +*/ +extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) { + return realloc_msg( msg, new_tr_size ); +} + /* Return the message to the available pool, or free it outright. */ @@ -414,7 +447,14 @@ extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) { nanomsg as it just sets the rcv timeout and calls rmr_rcv_msg(). */ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) { - rmr_set_rtimeout( vctx, ms_to ); + uta_ctx_t* ctx; + + if( (ctx = (uta_ctx_t *) vctx) != NULL ) { + if( ctx->last_rto != ms_to ) { // avoid call overhead + rmr_set_rtimeout( vctx, ms_to ); + } + } + return rmr_rcv_msg( vctx, old_msg ); } @@ -520,6 +560,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { ctx->mring = uta_mk_ring( 128 ); // message ring to hold asynch msgs received while waiting for call response + ctx->last_rto = -2; // last receive timeout that was set; invalid value to force first to set ctx->max_plen = RMR_MAX_RCV_BYTES + sizeof( uta_mhdr_t ); // default max buffer size if( max_msg_size > 0 ) {