// :vi sw=4 ts=4 noet:
/*
==================================================================================
- Copyright (c) 2019 Nokia
+ Copyright (c) 2019 Nokia
Copyright (c) 2018-2019 AT&T Intellectual Property.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
+ http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
the older nanomsg messaging transport mehhanism.
To "hide" internal functions the choice was made to implement them
- all as static functions. This means that we include nearly
+ all as static functions. This means that we include nearly
all of our modules here as 90% of the library is not visible to
- the outside world.
+ the outside world.
Author: E. Scott Daniels
Date: 28 November 2018
// --------------- public functions --------------------------------------------------------------------------
/*
- Set the receive timeout to time. If time >1000 we assume the time is milliseconds,
- else we assume seconds. Setting -1 is always block.
+ Set the receive timeout to time (ms). A value of 0 is the same as a non-blocking
+ receive and -1 is block for ever.
Returns the nn value (0 on success <0 on error).
*/
extern int rmr_set_rtimeout( void* vctx, int time ) {
return -1;
}
- if( time > 0 ) {
- if( time < 1000 ) {
- time = time * 1000; // assume seconds, nn wants ms
- }
- }
+ if( ctx->last_rto == time ) {
+ return 0;
+ }
+
+ ctx->last_rto = time;
return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_RCVTIMEO, &time, sizeof( time ) );
}
return rmr_rcv_to( vctx, time );
}
-
/*
Set the send timeout to time. If time >1000 we assume the time is milliseconds,
else we assume seconds. Setting -1 is always block.
}
if( time > 0 ) {
- if( time < 1000 ) {
+ if( time < 1000 ) {
time = time * 1000; // assume seconds, nn wants ms
}
- }
+ }
return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_SNDTIMEO, &time, sizeof( time ) );
}
Returns the size of the payload (bytes) that the msg buffer references.
Len in a message is the number of bytes which were received, or should
be transmitted, however, it is possible that the mbuf was allocated
- with a larger payload space than the payload length indicates; this
+ with a larger payload space than the payload length indicates; this
function returns the absolute maximum space that the user has available
in the payload. On error (bad msg buffer) -1 is returned and errno should
indicate the rason.
free( mbuf->header );
}
}
-
+
free( mbuf );
}
/*
- Accept a message and send it to an endpoint based on message type.
+ Accept a message and send it to an endpoint based on message type.
Allocates a new message buffer for the next send. If a message type has
more than one group of endpoints defined, then the message will be sent
- in round robin fashion to one endpoint in each group.
+ in round robin fashion to one endpoint in each group.
CAUTION: this is a non-blocking send. If the message cannot be sent, then
it will return with an error and errno set to eagain. If the send is
int group; // selected group to get socket for
int send_again; // true if the message must be sent again
rmr_mbuf_t* clone_m; // cloned message for an nth send
+ uint64_t key; // lookup key is now subid and mtype
+ int max_rt = 1000;
+ int altk_ok = 0; // ok to retry with alt key when true
if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
errno = EINVAL; // if msg is null, this is their clue
if( msg != NULL ) {
msg->state = RMR_ERR_BADARG;
errno = EINVAL; // must ensure it's not eagain
- }
+ }
return msg;
}
send_again = 1; // force loop entry
group = 0; // always start with group 0
+ key = build_rt_key( msg->sub_id, msg->mtype ); // what we need to find the route table entry
+ if( msg->sub_id != UNSET_SUBID ) { // if sub id set, allow retry with just mtype if no endpoint when sub-id used
+ altk_ok = 1;
+ }
+
while( send_again ) {
- nn_sock = uta_epsock_rr( ctx->rtable, msg->mtype, group, &send_again ); // round robin select endpoint; again set if mult groups
- if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d socket=%d len=%d\n",
- msg->mtype, send_again, group, nn_sock, msg->len );
- group++;
+ max_rt = 1000;
+ nn_sock = uta_epsock_rr( ctx->rtable, key, group, &send_again ); // round robin select endpoint; again set if mult groups
+ if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d socket=%d len=%d ak_ok=%d\n",
+ msg->mtype, send_again, group, nn_sock, msg->len, altk_ok );
if( nn_sock < 0 ) {
+ if( altk_ok ) { // ok to retry with alternate key
+ key = build_rt_key( UNSET_SUBID, msg->mtype ); // build key with just mtype and retry
+ send_again = 1;
+ altk_ok = 0;
+ continue;
+ }
+
msg->state = RMR_ERR_NOENDPT;
errno = ENXIO; // must ensure it's not eagain
return msg; // caller can resend (maybe) or free
}
+ group++;
if( send_again ) {
clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available
- if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
+ if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d sub_id=%d len=%d\n", msg->mtype, msg->sub_id, msg->len );
msg->flags |= MFL_NOALLOC; // send should not allocate a new buffer
msg = send_msg( ctx, msg, nn_sock ); // do the hard work, msg should be nil on success
- /*
- if( msg ) {
- // error do we need to count successes/errors, how to report some success, esp if last fails?
- }
- */
+ while( max_rt > 0 && msg && msg->state == RMR_ERR_RETRY ) {
+ msg = send_msg( ctx, msg, nn_sock );
+ max_rt--;
+ }
msg = clone_m; // clone will be the next to send
} else {
msg = send_msg( ctx, msg, nn_sock ); // send the last, and allocate a new buffer; drops the clone if it was
+ while( max_rt > 0 && msg && msg->state == RMR_ERR_RETRY ) {
+ msg = send_msg( ctx, msg, nn_sock );
+ max_rt--;
+ }
}
}
}
/*
- Return to sender allows a message to be sent back to the endpoint where it originated.
+ Return to sender allows a message to be sent back to the endpoint where it originated.
The source information in the message is used to select the socket on which to write
- the message rather than using the message type and round-robin selection. This
+ the message rather than using the message type and round-robin selection. This
should return a message buffer with the state of the send operation set. On success
(state is RMR_OK, the caller may use the buffer for another receive operation), and on
error it can be passed back to this function to retry the send if desired. On error,
errno = EINVAL; // if msg is null, this is their clue
if( msg != NULL ) {
msg->state = RMR_ERR_BADARG;
- }
+ }
return msg;
}
Normally, a message struct pointer is returned and msg->state must be checked for RMR_OK
to ensure that no error was encountered. If the state is UTA_BADARG, then the message
- may be resent (likely the context pointer was nil). If the message is sent, but no
+ may be resent (likely the context pointer was nil). If the message is sent, but no
response is received, a nil message is returned with errno set to indicate the likley
issue:
ETIMEDOUT -- too many messages were queued before reciving the expected response
if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
if( msg != NULL ) {
msg->state = RMR_ERR_BADARG;
- }
+ }
return msg;
}
if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
if( old_msg != NULL ) {
old_msg->state = RMR_ERR_BADARG;
- }
+ }
errno = EINVAL;
return old_msg;
}
}
/*
- Receive with a timeout. This is a convenience function when sitting on top of
- nanomsg as it just sets the rcv timeout and calls rmr_rcv_msg().
+ Receive with a timeout. This is a convenience function when sitting on top of
+ nanomsg as it just sets the rcv timeout and calls rmr_rcv_msg().
*/
extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
- rmr_set_rtimeout( vctx, ms_to );
+ uta_ctx_t* ctx;
+
+ if( (ctx = (uta_ctx_t *) vctx) != NULL ) {
+ if( ctx->last_rto != ms_to ) { // avoid call overhead
+ rmr_set_rtimeout( vctx, ms_to );
+ }
+ }
+
return rmr_rcv_msg( vctx, old_msg );
}
/*
This blocks until the message with the 'expect' ID is received. Messages which are received
- before the expected message are queued onto the message ring. The function will return
+ before the expected message are queued onto the message ring. The function will return
a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
expected message is received. If the queued message ring fills a nil pointer is returned
and errno is set to ENOBUFS.
- Generally this will be invoked only by the call() function as it waits for a response, but
+ Generally this will be invoked only by the call() function as it waits for a response, but
it is exposed to the user application as three is no reason not to.
*/
extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect, int allow2queue ) {
uta_ctx_t* ctx;
int queued = 0; // number we pushed into the ring
int exp_len = 0; // length of expected ID
-
+
if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
if( msg != NULL ) {
msg->state = RMR_ERR_BADARG;
- }
+ }
errno = EINVAL;
return msg;
}
char* proto = "tcp"; // pointer into the proto/port string user supplied
char* port;
char* proto_port;
- char wbuf[1024]; // work buffer
+ char wbuf[1024]; // work buffer
char* tok; // pointer at token in a buffer
int state;
char* interface = NULL; // interface to bind to pulled from RMR_BIND_IF if set
- fprintf( stderr, "[INFO] ric message routing library on nanomsg (%s %s.%s.%s built: %s)\n",
+ fprintf( stderr, "[INFO] ric message routing library on nanomsg (%s %s.%s.%s built: %s)\n",
QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
errno = 0;
ctx->mring = uta_mk_ring( 128 ); // message ring to hold asynch msgs received while waiting for call response
+ ctx->last_rto = -2; // last receive timeout that was set; invalid value to force first to set
ctx->max_plen = RMR_MAX_RCV_BYTES + sizeof( uta_mhdr_t ); // default max buffer size
if( max_msg_size > 0 ) {
uta_lookup_rtg( ctx ); // attempt to fill in rtg info; rtc will handle missing values/errors
- ctx->nn_sock = nn_socket( AF_SP, NN_PULL ); // our 'listen' socket should allow multiple senders to connect
+ ctx->nn_sock = nn_socket( AF_SP, NN_PULL ); // our 'listen' socket should allow multiple senders to connect
if( ctx->nn_sock < 0 ) {
fprintf( stderr, "[CRIT] rmr_init: unable to initialise nanomsg listen socket: %d\n", errno );
free_ctx( ctx );
interface = "0.0.0.0";
}
snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port );
- if( nn_bind( ctx->nn_sock, bind_info ) < 0) { // bind and automatically accept client sessions
+ if( nn_bind( ctx->nn_sock, bind_info ) < 0) { // bind and automatically accept client sessions
fprintf( stderr, "[CRIT] rmr_init: unable to bind nanomsg listen socket for %s: %s\n", bind_info, strerror( errno ) );
nn_close( ctx->nn_sock );
free_ctx( ctx );
return (void *) ctx;
}
+/*
+ This sets the default trace length which will be added to any message buffers
+ allocated. It can be set at any time, and if rmr_set_trace() is given a
+ trace len that is different than the default allcoated in a message, the message
+ will be resized.
+
+ Returns 0 on failure and 1 on success. If failure, then errno will be set.
+*/
+extern int rmr_init_trace( void* vctx, int tr_len ) {
+ uta_ctx_t* ctx;
+
+ errno = 0;
+ if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+ errno = EINVAL;
+ return 0;
+ }
+
+ ctx->trace_data_len = tr_len;
+ return 1;
+}
/*
Publicly facing initialisation function. Wrapper for the init() funcion above
- as it needs to ensure internal flags are masked off before calling the
+ as it needs to ensure internal flags are masked off before calling the
real workhorse.
*/
extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
}
/*
- Provides a non-fatal (compile) interface for the nng only function.
+ Provides a non-fatal (compile) interface for the nng only function.
Not supported on top of nano, so this always returns -1.
*/
extern int rmr_get_rcvfd( void* vctx ) {
if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
return;
}
-
+
nn_close( ctx->nn_sock );
}