From dfe7b622b128e7bfb4a5e1f7e0afdb84e6001d14 Mon Sep 17 00:00:00 2001 From: "E. Scott Daniels" Date: Tue, 3 Sep 2019 10:52:41 -0400 Subject: [PATCH] The change to fix a bug2 Fix return to sender retry bug; add env rts id change 1: fix rts retry bug The source IP address was not being restored properly after a return to sender failure and subsequent retries were being "reflected" rather than being sent to the message source. Signed-off-by: E. Scott Daniels Change-Id: I065e4b6b17896ee0de87e62916f72d2fc3067940 Support env var to define return to sender name The host name in all environments cannot always be trusted to be used as the message source from a "return to sender" point of view. This change allows the RMR_SRC_ID environment variable to supply the "hostname" that is placed in a message as the source and thus used when an application needs to respond to the sender. Signed-off-by: E. Scott Daniels Change-Id: Ie55c3147d6fc423c71315a43d23c88abc7e3c8d2 Tweak unit tests for better coverage A warning was added if the multi-threaded receive process drops messages. This tweak ensures it it tested during unit test. Signed-off-by: E. Scott Daniels Change-Id: I380d58177cbc73eb1a10bbc52286922db7ead0b7 Bump version in CMake Signed-off-by: E. Scott Daniels Change-Id: I2d4b19a5cfd1e2c9ecd196b0af2532788414e565 --- CHANGES | 12 ++++++ CMakeLists.txt | 2 +- src/rmr/common/include/rmr_agnostic.h | 1 + src/rmr/nng/src/mt_call_nng_static.c | 11 ++++-- src/rmr/nng/src/rmr_nng.c | 70 ++++++++++++++++++++++------------- src/rmr/nng/src/sr_nng_static.c | 2 +- test/rmr_nng_api_static_test.c | 15 +++++++- 7 files changed, 81 insertions(+), 32 deletions(-) diff --git a/CHANGES b/CHANGES index d2a3f1d..f74e182 100644 --- a/CHANGES +++ b/CHANGES @@ -2,6 +2,18 @@ API and build change and fixe summaries. Doc correctsions and/or changes are not mentioned here; see the commit messages. +2019 September 3; version 1.6.0 + Fix bug in the rmr_rts_msg() function. If a return to sender message + failed, the source IP address was not correctly adjusted and could + cause the message to be "reflected" back to the sender on a retry. + + Added the ability to set the source "ID" via an environment var + (RMR_SRC_ID). When present in the environment, the string will + be placed in to the message header as the source and thus be used + by an application calling rmr_rts_smg() to return a response to + the sender. If this environment variable is not present, the host + name (original behaviour) is used. + 2019 August 26; version 1.4.0 New message types were added. diff --git a/CMakeLists.txt b/CMakeLists.txt index e7dc408..bb91e10 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -35,7 +35,7 @@ project( rmr LANGUAGES C ) cmake_minimum_required( VERSION 3.5 ) set( major_version "1" ) # should be automatically populated from git tag later, but until CI process sets a tag we use this -set( minor_version "5" ) +set( minor_version "6" ) set( patch_level "0" ) set( install_root "${CMAKE_INSTALL_PREFIX}" ) diff --git a/src/rmr/common/include/rmr_agnostic.h b/src/rmr/common/include/rmr_agnostic.h index 17a5158..e83a951 100644 --- a/src/rmr/common/include/rmr_agnostic.h +++ b/src/rmr/common/include/rmr_agnostic.h @@ -54,6 +54,7 @@ typedef struct uta_ctx uta_ctx_t; #define ENV_VERBOSE_FILE "RMR_VCTL_FILE" // file where vlevel may be managed for some (non-time critical) functions #define ENV_NAME_ONLY "RMR_SRC_NAMEONLY" // src in message is name only #define ENV_WARNINGS "RMR_WARNINGS" // if == 1 then we write some, non-performance impacting, warnings +#define ENV_SRC_ID "RMR_SRC_ID" // forces this string (adding :port, max 63 ch) into the source field; host name used if not set #define NO_FLAGS 0 // no flags to pass to a function diff --git a/src/rmr/nng/src/mt_call_nng_static.c b/src/rmr/nng/src/mt_call_nng_static.c index 325b313..71106f1 100644 --- a/src/rmr/nng/src/mt_call_nng_static.c +++ b/src/rmr/nng/src/mt_call_nng_static.c @@ -32,16 +32,21 @@ #include static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) { + static int warned = 0; chute_t* chute; - int state; if( ! uta_ring_insert( ctx->mring, mbuf ) ) { rmr_free_msg( mbuf ); // drop if ring is full + if( !warned ) { + fprintf( stderr, "[WARN] rmr_mt_receive: application is not receiving fast enough; messages dropping\n" ); + warned++; + } + + return; } chute = &ctx->chutes[0]; - chute->mbuf = mbuf; - state = sem_post( &chute->barrier ); // tickle the ring monitor + sem_post( &chute->barrier ); // tickle the ring monitor } /* diff --git a/src/rmr/nng/src/rmr_nng.c b/src/rmr/nng/src/rmr_nng.c index 1b6e1f0..bc3fe10 100644 --- a/src/rmr/nng/src/rmr_nng.c +++ b/src/rmr/nng/src/rmr_nng.c @@ -248,6 +248,7 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { uta_ctx_t* ctx; int state; char* hold_src; // we need the original source if send fails + char* hold_ip; // also must hold original ip int sock_ok = 0; // true if we found a valid endpoint socket if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast @@ -268,11 +269,12 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { } ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off - if( HDR_VERSION( msg->header ) > 2 ) { // new version uses sender's ip address for rts - sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock ); // default to IP based rts - } + + sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock ); // src is always used first for rts if( ! sock_ok ) { - sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock ); // IP not in rt, try name + if( HDR_VERSION( msg->header ) > 2 ) { // with ver2 the ip is there, try if src name not known + sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock ); + } if( ! sock_ok ) { msg->state = RMR_ERR_NOENDPT; return msg; // preallocated msg can be reused since not given back to nn @@ -281,14 +283,17 @@ extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) { msg->state = RMR_OK; // ensure it is clear before send hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to + hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip ); // both the src host and src ip strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours msg = send_msg( ctx, msg, nn_sock, -1 ); if( msg ) { strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC ); // always return original source so rts can be called again + strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC ); // always return original source so rts can be called again msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source } free( hold_src ); + free( hold_ip ); return msg; } @@ -590,6 +595,7 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { char* proto_port; char wbuf[1024]; // work buffer char* tok; // pointer at token in a buffer + char* tok2; int state; if( ! announced ) { @@ -646,13 +652,29 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { port = proto_port; // assume something like "1234" was passed } - if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) { - fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) ); - return NULL; - } - if( (tok = strchr( wbuf, '.' )) != NULL ) { - *tok = 0; // we don't keep domain portion + if( (tok = getenv( ENV_SRC_ID )) != NULL ) { // env var overrides what we dig from system + tok = strdup( tok ); // something we can destroy + if( *tok == '[' ) { // we allow an ipv6 address here + tok2 = strchr( tok, ']' ) + 1; // we will chop the port (...]:port) if given + } else { + tok2 = strchr( tok, ':' ); // find :port if there so we can chop + } + if( tok2 && *tok2 ) { // if it's not the end of string marker + *tok2 = 0; // make it so + } + + snprintf( wbuf, RMR_MAX_SRC, "%s", tok ); + free( tok ); + } else { + if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) { + fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) ); + return NULL; + } + if( (tok = strchr( wbuf, '.' )) != NULL ) { + *tok = 0; // we don't keep domain portion + } } + ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC ); if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) { // our registered name is host:port fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port ); @@ -860,7 +882,7 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) { } chute = &ctx->chutes[0]; // chute 0 used only for its semaphore - + if( max_wait > 0 ) { clock_gettime( CLOCK_REALTIME, &ts ); @@ -881,34 +903,30 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) { seconds = 1; // use as flag later to invoked timed wait } - errno = 0; - state = 0; - while( chute->mbuf == NULL && ! errno ) { + errno = EINTR; + state = -1; + while( state < 0 && errno == EINTR ) { if( seconds ) { state = sem_timedwait( &chute->barrier, &ts ); // wait for msg or timeout } else { state = sem_wait( &chute->barrier ); } - - if( state < 0 && errno == EINTR ) { // interrupted go back and wait; all other errors cause exit - errno = 0; - } } if( state < 0 ) { mbuf = ombuf; // return caller's buffer if they passed one in } else { + errno = 0; // interrupted call state could be left; clear if( DEBUG ) fprintf( stderr, "[DBUG] mt_rcv extracting from normal ring\n" ); if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued - if( mbuf ) { - mbuf->state = RMR_OK; - - if( ombuf ) { - rmr_free_msg( ombuf ); // we cannot reuse as mbufs are queued on the ring - } - } else { - mbuf = ombuf; // no buffer, return user's if there + mbuf->state = RMR_OK; + + if( ombuf ) { + rmr_free_msg( ombuf ); // we cannot reuse as mbufs are queued on the ring } + } else { + errno = ETIMEDOUT; + mbuf = ombuf; // no buffer, return user's if there } } diff --git a/src/rmr/nng/src/sr_nng_static.c b/src/rmr/nng/src/sr_nng_static.c index c04690d..dfe3d04 100644 --- a/src/rmr/nng/src/sr_nng_static.c +++ b/src/rmr/nng/src/sr_nng_static.c @@ -1,4 +1,4 @@ - // : vi ts=4 sw=4 noet 2 +// : vi ts=4 sw=4 noet 2 /* ================================================================================== Copyright (c) 2019 Nokia diff --git a/test/rmr_nng_api_static_test.c b/test/rmr_nng_api_static_test.c index ca2e6f2..f08c2f1 100644 --- a/test/rmr_nng_api_static_test.c +++ b/test/rmr_nng_api_static_test.c @@ -122,16 +122,25 @@ static int rmr_api_test( ) { return 1; } + setenv( "RMR_SRC_ID", "somehost", 1 ); // context should have this as source if( (rmc2 = rmr_init( ":6789", 1024, FL_NOTHREAD )) == NULL ) { // init without starting a thread errors += fail_if_nil( rmc, "rmr_init returned a nil pointer for non-threaded init " ); } + fprintf( stderr, " with RMR_SRC_ID env set, source name in context: (%s)\n", ((uta_ctx_t *) rmc2)->my_name ); + v = strcmp( ((uta_ctx_t *) rmc2)->my_name, "somehost:6789" ); + errors += fail_not_equal( v, 0, "source name not set from environment variable (see previous info)" ); free_ctx( rmc2 ); // coverage - + + unsetenv( "RMR_SRC_ID" ); // context should NOT have our artificial name if( (rmc2 = rmr_init( NULL, 1024, FL_NOTHREAD )) == NULL ) { // drive default port selector code errors += fail_if_nil( rmc, "rmr_init returned a nil pointer when driving for default port " ); } + fprintf( stderr, " after unset of RMR_SRC_ID, source name in context: (%s)\n", ((uta_ctx_t *) rmc2)->my_name ); + v = strcmp( ((uta_ctx_t *) rmc2)->my_name, "somehost:6789" ); + errors += fail_if_equal( v, 0, "source name smells when removed from environment (see previous info)" ); + free_ctx( rmc2 ); // attempt to reduce leak check errors v = rmr_ready( rmc ); // unknown return; not checking at the moment @@ -440,6 +449,10 @@ static int rmr_api_test( ) { em_set_mtc_msgs( 0 ); // turn off em_set_rcvdelay( 0 ); // full speed receive rate ((uta_ctx_t *)rmc)->shutdown = 1; // force the mt-reciver attached to the context to stop + + em_set_rcvdelay( 0 ); // let the receive loop spin w/o receives so we drive warning code about queue full + sleep( 5 ); + em_set_rcvdelay( 1 ); // restore slow receive pace for any later tests #endif -- 2.16.6