// --------------- public functions --------------------------------------------------------------------------
/*
- Set the receive timeout to time. If time >1000 we assume the time is milliseconds,
- else we assume seconds. Setting -1 is always block.
+ Set the receive timeout to time (ms). A value of 0 is the same as a non-blocking
+ receive and -1 is block for ever.
Returns the nn value (0 on success <0 on error).
*/
extern int rmr_set_rtimeout( void* vctx, int time ) {
return -1;
}
- if( time > 0 ) {
- if( time < 1000 ) {
- time = time * 1000; // assume seconds, nn wants ms
- }
- }
+ if( ctx->last_rto == time ) {
+ return 0;
+ }
+
+ ctx->last_rto = time;
return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_RCVTIMEO, &time, sizeof( time ) );
}
return rmr_rcv_to( vctx, time );
}
-
/*
Set the send timeout to time. If time >1000 we assume the time is milliseconds,
else we assume seconds. Setting -1 is always block.
return NULL;
}
- m = alloc_zcmsg( ctx, NULL, size, 0 );
+ m = alloc_zcmsg( ctx, NULL, size, 0, DEF_TR_LEN );
return m;
}
+/*
+ Allocates a send message as a zerocopy message allowing the underlying message protocol
+ to send the buffer without copy. In addition, a trace data field of tr_size will be
+ added and the supplied data coppied to the buffer before returning the message to
+ the caller.
+*/
+extern rmr_mbuf_t* rmr_tralloc_msg( void* vctx, int size, int tr_size, unsigned const char* data ) {
+ uta_ctx_t* ctx;
+ rmr_mbuf_t* m;
+ int state;
+
+ if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+ return NULL;
+ }
+
+ m = alloc_zcmsg( ctx, NULL, size, 0, tr_size ); // alloc with specific tr size
+ if( m != NULL ) {
+ state = rmr_set_trace( m, data, tr_size ); // roll their data in
+ if( state != tr_size ) {
+ m->state = RMR_ERR_INITFAILED;
+ }
+ }
+
+ return m;
+}
+
+/*
+ Need an external path to the realloc static function as it's called by an
+ outward facing mbuf api function.
+*/
+extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
+ return realloc_msg( msg, new_tr_size );
+}
+
/*
Return the message to the available pool, or free it outright.
*/
nanomsg as it just sets the rcv timeout and calls rmr_rcv_msg().
*/
extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
- rmr_set_rtimeout( vctx, ms_to );
+ uta_ctx_t* ctx;
+
+ if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
+ if( ctx->last_rto != ms_to ) { // avoid call overhead
+ rmr_set_rtimeout( vctx, ms_to );
+ }
+ }
+
return rmr_rcv_msg( vctx, old_msg );
}
ctx->mring = uta_mk_ring( 128 ); // message ring to hold asynch msgs received while waiting for call response
+ ctx->last_rto = -2; // last receive timeout that was set; invalid value to force first to set
ctx->max_plen = RMR_MAX_RCV_BYTES + sizeof( uta_mhdr_t ); // default max buffer size
if( max_msg_size > 0 ) {