The caller must check for this and handle.
*/
extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
- int nn_sock; // endpoint socket for send
+ int nn_sock = -1; // endpoint socket for send
uta_ctx_t* ctx;
int state;
uta_mhdr_t* hdr;
char* hold_src; // we need the original source if send fails
+ char* hold_ip;
if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
errno = EINVAL; // if msg is null, this is their clue
return msg;
}
- nn_sock = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src ); // socket of specific endpoint
- if( nn_sock < 0 ) {
- msg->state = RMR_ERR_NOENDPT;
- return msg; // preallocated msg can be reused since not given back to nn
+ if( HDR_VERSION( msg->header ) > 2 ) { // new version uses sender's ip address for rts
+ nn_sock = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip ); // socket of specific endpoint
+ }
+ if( nn_sock < 0 ) {
+ nn_sock = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src ); // socket of specific endpoint
+ if( nn_sock < 0 ) {
+ msg->state = RMR_ERR_NOENDPT;
+ return msg; // preallocated msg can be reused since not given back to nn
+ }
}
hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to
- strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID ); // must overlay the source to be ours
+ hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );
+ strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours
+ strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
+
msg = send_msg( ctx, msg, nn_sock );
if( msg ) {
- strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SID ); // always return original source so rts can be called again
+ strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC ); // always return original source so rts can be called again
+ strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );
msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source
}
free( hold_src );
+ free( hold_ip );
return msg;
}
if( max_msg_size <= ctx->max_plen ) { // user defined len can be smaller
ctx->max_plen = max_msg_size;
} else {
- fprintf( stderr, "[WARN] rmr_init: attempt to set max payload len > than allowed maximum; capped at %d bytes\n", ctx->max_plen );
+ fprintf( stderr, "[WRN] rmr_init: attempt to set max payload len > than allowed maximum; capped at %d bytes\n", ctx->max_plen );
}
}
ctx->nn_sock = nn_socket( AF_SP, NN_PULL ); // our 'listen' socket should allow multiple senders to connect
if( ctx->nn_sock < 0 ) {
- fprintf( stderr, "[CRIT] rmr_init: unable to initialise nanomsg listen socket: %d\n", errno );
+ fprintf( stderr, "[CRI] rmr_init: unable to initialise nanomsg listen socket: %d\n", errno );
free_ctx( ctx );
return NULL;
}
}
if( (gethostname( wbuf, sizeof( wbuf ) )) < 0 ) {
- fprintf( stderr, "[CRIT] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
+ fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
return NULL;
}
if( (tok = strchr( wbuf, '.' )) != NULL ) {
*tok = 0; // we don't keep domain portion
}
- ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SID );
- if( snprintf( ctx->my_name, RMR_MAX_SID, "%s:%s", wbuf, port ) >= RMR_MAX_SID ) { // our registered name is host:port
- fprintf( stderr, "[CRIT] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SID, wbuf, port );
+ ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
+ if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) { // our registered name is host:port
+ fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
return NULL;
}
}
snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port );
if( nn_bind( ctx->nn_sock, bind_info ) < 0) { // bind and automatically accept client sessions
- fprintf( stderr, "[CRIT] rmr_init: unable to bind nanomsg listen socket for %s: %s\n", bind_info, strerror( errno ) );
+ fprintf( stderr, "[CRI] rmr_init: unable to bind nanomsg listen socket for %s: %s\n", bind_info, strerror( errno ) );
nn_close( ctx->nn_sock );
free_ctx( ctx );
return NULL;
}
+ if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
+ if( atoi( tok ) > 0 ) {
+ flags |= RMRFL_NAME_ONLY; // don't allow IP addreess to go out in messages
+ }
+ }
+
+ if( flags & RMRFL_NAME_ONLY ) {
+ ctx->my_ip = strdup( ctx->my_name ); // user application or env var has specified that IP address is NOT sent out, use name
+ if( DEBUG ) fprintf( stderr, "[DBUG] name only mode is set; not sending IP address as source\n" );
+ } else {
+ ctx->ip_list = mk_ip_list( port ); // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
+ ctx->my_ip = get_default_ip( ctx->ip_list ); // and (guess) at what should be the default to put into messages as src
+ if( ctx->my_ip == NULL ) {
+ strcpy( ctx->my_ip, ctx->my_name ); // revert to name if we cant suss out ip address
+ fprintf( stderr, "[WRN] rmr_init: default ip address could not be sussed out, using name as source\n" );
+ } else {
+ if( DEBUG ) fprintf( stderr, "[DBUG] default ip address: %s\n", ctx->my_ip );
+ }
+ }
+
if( ! (flags & FL_NOTHREAD) ) { // skip if internal context that does not need rout table thread
if( pthread_create( &ctx->rtc_th, NULL, rtc, (void *) ctx ) ) { // kick the rt collector thread
- fprintf( stderr, "[WARN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
+ fprintf( stderr, "[WRN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
}
}