API and build change and fixe summaries. Doc correctsions
and/or changes are not mentioned here; see the commit messages.
+2019 September 3; version 1.6.0
+ Fix bug in the rmr_rts_msg() function. If a return to sender message
+ failed, the source IP address was not correctly adjusted and could
+ cause the message to be "reflected" back to the sender on a retry.
+
+ Added the ability to set the source "ID" via an environment var
+ (RMR_SRC_ID). When present in the environment, the string will
+ be placed in to the message header as the source and thus be used
+ by an application calling rmr_rts_smg() to return a response to
+ the sender. If this environment variable is not present, the host
+ name (original behaviour) is used.
+
2019 August 26; version 1.4.0
New message types were added.
uta_ctx_t* ctx;
int state;
char* hold_src; // we need the original source if send fails
+ char* hold_ip; // also must hold original ip
int sock_ok = 0; // true if we found a valid endpoint socket
if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
}
((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
- if( HDR_VERSION( msg->header ) > 2 ) { // new version uses sender's ip address for rts
- sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock ); // default to IP based rts
- }
+
+ sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock ); // src is always used first for rts
if( ! sock_ok ) {
- sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock ); // IP not in rt, try name
+ if( HDR_VERSION( msg->header ) > 2 ) { // with ver2 the ip is there, try if src name not known
+ sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock );
+ }
if( ! sock_ok ) {
msg->state = RMR_ERR_NOENDPT;
return msg; // preallocated msg can be reused since not given back to nn
msg->state = RMR_OK; // ensure it is clear before send
hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to
+ hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip ); // both the src host and src ip
strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours
msg = send_msg( ctx, msg, nn_sock, -1 );
if( msg ) {
strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC ); // always return original source so rts can be called again
+ strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC ); // always return original source so rts can be called again
msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source
}
free( hold_src );
+ free( hold_ip );
return msg;
}
char* proto_port;
char wbuf[1024]; // work buffer
char* tok; // pointer at token in a buffer
+ char* tok2;
int state;
if( ! announced ) {
port = proto_port; // assume something like "1234" was passed
}
- if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
- fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
- return NULL;
- }
- if( (tok = strchr( wbuf, '.' )) != NULL ) {
- *tok = 0; // we don't keep domain portion
+ if( (tok = getenv( ENV_SRC_ID )) != NULL ) { // env var overrides what we dig from system
+ tok = strdup( tok ); // something we can destroy
+ if( *tok == '[' ) { // we allow an ipv6 address here
+ tok2 = strchr( tok, ']' ) + 1; // we will chop the port (...]:port) if given
+ } else {
+ tok2 = strchr( tok, ':' ); // find :port if there so we can chop
+ }
+ if( tok2 && *tok2 ) { // if it's not the end of string marker
+ *tok2 = 0; // make it so
+ }
+
+ snprintf( wbuf, RMR_MAX_SRC, "%s", tok );
+ free( tok );
+ } else {
+ if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
+ fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
+ return NULL;
+ }
+ if( (tok = strchr( wbuf, '.' )) != NULL ) {
+ *tok = 0; // we don't keep domain portion
+ }
}
+
ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) { // our registered name is host:port
fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
}
chute = &ctx->chutes[0]; // chute 0 used only for its semaphore
-
+
if( max_wait > 0 ) {
clock_gettime( CLOCK_REALTIME, &ts );
seconds = 1; // use as flag later to invoked timed wait
}
- errno = 0;
- state = 0;
- while( chute->mbuf == NULL && ! errno ) {
+ errno = EINTR;
+ state = -1;
+ while( state < 0 && errno == EINTR ) {
if( seconds ) {
state = sem_timedwait( &chute->barrier, &ts ); // wait for msg or timeout
} else {
state = sem_wait( &chute->barrier );
}
-
- if( state < 0 && errno == EINTR ) { // interrupted go back and wait; all other errors cause exit
- errno = 0;
- }
}
if( state < 0 ) {
mbuf = ombuf; // return caller's buffer if they passed one in
} else {
+ errno = 0; // interrupted call state could be left; clear
if( DEBUG ) fprintf( stderr, "[DBUG] mt_rcv extracting from normal ring\n" );
if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued
- if( mbuf ) {
- mbuf->state = RMR_OK;
-
- if( ombuf ) {
- rmr_free_msg( ombuf ); // we cannot reuse as mbufs are queued on the ring
- }
- } else {
- mbuf = ombuf; // no buffer, return user's if there
+ mbuf->state = RMR_OK;
+
+ if( ombuf ) {
+ rmr_free_msg( ombuf ); // we cannot reuse as mbufs are queued on the ring
}
+ } else {
+ errno = ETIMEDOUT;
+ mbuf = ombuf; // no buffer, return user's if there
}
}
return 1;
}
+ setenv( "RMR_SRC_ID", "somehost", 1 ); // context should have this as source
if( (rmc2 = rmr_init( ":6789", 1024, FL_NOTHREAD )) == NULL ) { // init without starting a thread
errors += fail_if_nil( rmc, "rmr_init returned a nil pointer for non-threaded init " );
}
+ fprintf( stderr, "<INFO> with RMR_SRC_ID env set, source name in context: (%s)\n", ((uta_ctx_t *) rmc2)->my_name );
+ v = strcmp( ((uta_ctx_t *) rmc2)->my_name, "somehost:6789" );
+ errors += fail_not_equal( v, 0, "source name not set from environment variable (see previous info)" );
free_ctx( rmc2 ); // coverage
-
+
+ unsetenv( "RMR_SRC_ID" ); // context should NOT have our artificial name
if( (rmc2 = rmr_init( NULL, 1024, FL_NOTHREAD )) == NULL ) { // drive default port selector code
errors += fail_if_nil( rmc, "rmr_init returned a nil pointer when driving for default port " );
}
+ fprintf( stderr, "<INFO> after unset of RMR_SRC_ID, source name in context: (%s)\n", ((uta_ctx_t *) rmc2)->my_name );
+ v = strcmp( ((uta_ctx_t *) rmc2)->my_name, "somehost:6789" );
+ errors += fail_if_equal( v, 0, "source name smells when removed from environment (see previous info)" );
+ free_ctx( rmc2 ); // attempt to reduce leak check errors
v = rmr_ready( rmc ); // unknown return; not checking at the moment
em_set_mtc_msgs( 0 ); // turn off
em_set_rcvdelay( 0 ); // full speed receive rate
((uta_ctx_t *)rmc)->shutdown = 1; // force the mt-reciver attached to the context to stop
+
+ em_set_rcvdelay( 0 ); // let the receive loop spin w/o receives so we drive warning code about queue full
+ sleep( 5 );
+ em_set_rcvdelay( 1 ); // restore slow receive pace for any later tests
#endif