The change to fix a bug2 78/878/2 1.6.0
authorE. Scott Daniels <daniels@research.att.com>
Tue, 3 Sep 2019 14:52:41 +0000 (10:52 -0400)
committerE. Scott Daniels <daniels@research.att.com>
Tue, 3 Sep 2019 17:15:08 +0000 (13:15 -0400)
Fix return to sender retry bug; add env rts id

change 1: fix rts retry bug
The source IP address was not being restored properly after
a return to sender failure and subsequent retries were being
"reflected" rather than being sent to the message source.

Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: I065e4b6b17896ee0de87e62916f72d2fc3067940

Support env var to define return to sender name

The host name in all environments cannot always be trusted to
be used as the message source from a "return to sender"
point of view.  This change allows the RMR_SRC_ID environment
variable to supply the "hostname" that is placed in a message
as the source and thus used when an application needs to
respond to the sender.

Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: Ie55c3147d6fc423c71315a43d23c88abc7e3c8d2

Tweak unit tests for better coverage

A warning was added if the multi-threaded receive process
drops messages. This tweak ensures it it tested during
unit test.

Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: I380d58177cbc73eb1a10bbc52286922db7ead0b7

Bump version in CMake

Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: I2d4b19a5cfd1e2c9ecd196b0af2532788414e565

CHANGES
CMakeLists.txt
src/rmr/common/include/rmr_agnostic.h
src/rmr/nng/src/mt_call_nng_static.c
src/rmr/nng/src/rmr_nng.c
src/rmr/nng/src/sr_nng_static.c
test/rmr_nng_api_static_test.c

diff --git a/CHANGES b/CHANGES
index d2a3f1d..f74e182 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -2,6 +2,18 @@
 API and build change  and fixe summaries. Doc correctsions
 and/or changes are not mentioned here; see the commit messages.
 
+2019 September 3; version 1.6.0
+       Fix bug in the rmr_rts_msg() function. If a return to sender message
+       failed, the source IP address was not correctly adjusted and could
+       cause the message to be "reflected" back to the sender on a retry.
+
+       Added the ability to set the source "ID" via an environment var
+       (RMR_SRC_ID).  When present in the environment, the string will
+       be placed in to the message header as the source and thus be used
+       by an application calling rmr_rts_smg() to return a response to
+       the sender.  If this environment variable is not present, the host
+       name (original behaviour) is used.
+
 2019 August 26; version 1.4.0 
        New message types were added.
 
index e7dc408..bb91e10 100644 (file)
@@ -35,7 +35,7 @@ project( rmr LANGUAGES C )
 cmake_minimum_required( VERSION 3.5 )
 
 set( major_version "1" )               # should be automatically populated from git tag later, but until CI process sets a tag we use this
-set( minor_version "5" )
+set( minor_version "6" )
 set( patch_level "0" )
 
 set( install_root "${CMAKE_INSTALL_PREFIX}" )
index 17a5158..e83a951 100644 (file)
@@ -54,6 +54,7 @@ typedef struct uta_ctx  uta_ctx_t;
 #define ENV_VERBOSE_FILE "RMR_VCTL_FILE"       // file where vlevel may be managed for some (non-time critical) functions
 #define ENV_NAME_ONLY "RMR_SRC_NAMEONLY"       // src in message is name only
 #define ENV_WARNINGS   "RMR_WARNINGS"          // if == 1 then we write some, non-performance impacting, warnings
+#define ENV_SRC_ID             "RMR_SRC_ID"            // forces this string (adding :port, max 63 ch) into the source field; host name used if not set
 
 #define NO_FLAGS       0                               // no flags to pass to a function
 
index 325b313..71106f1 100644 (file)
 #include <semaphore.h>
 
 static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
+       static  int warned = 0;
        chute_t*        chute;
-       int                     state;
 
        if( ! uta_ring_insert( ctx->mring, mbuf ) ) {
                rmr_free_msg( mbuf );                                                           // drop if ring is full
+               if( !warned ) {
+                       fprintf( stderr, "[WARN] rmr_mt_receive: application is not receiving fast enough; messages dropping\n" );
+                       warned++;
+               }
+
+               return;
        }
 
        chute = &ctx->chutes[0];
-       chute->mbuf = mbuf;
-       state = sem_post( &chute->barrier );                                                            // tickle the ring monitor
+       sem_post( &chute->barrier );                                                            // tickle the ring monitor
 }
 
 /*
index 1b6e1f0..bc3fe10 100644 (file)
@@ -248,6 +248,7 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
        uta_ctx_t*      ctx;
        int                     state;
        char*           hold_src;                       // we need the original source if send fails
+       char*           hold_ip;                        // also must hold original ip
        int                     sock_ok = 0;            // true if we found a valid endpoint socket
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
@@ -268,11 +269,12 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
        }
 
        ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
-       if( HDR_VERSION( msg->header ) > 2 ) {                                                  // new version uses sender's ip address for rts
-               sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock );                      // default to IP based rts
-       }
+
+       sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock );                        // src is always used first for rts
        if( ! sock_ok ) {
-               sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock );                // IP  not in rt, try name
+               if( HDR_VERSION( msg->header ) > 2 ) {                                                  // with ver2 the ip is there, try if src name not known
+                       sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock );
+               }
                if( ! sock_ok ) {
                        msg->state = RMR_ERR_NOENDPT;
                        return msg;                                                                                                                             // preallocated msg can be reused since not given back to nn
@@ -281,14 +283,17 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
 
        msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
        hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
+       hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );                                        // both the src host and src ip
        strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );        // must overlay the source to be ours
        msg = send_msg( ctx, msg, nn_sock, -1 );
        if( msg ) {
                strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );    // always return original source so rts can be called again
+               strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );   // always return original source so rts can be called again
                msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
        }
 
        free( hold_src );
+       free( hold_ip );
        return msg;
 }
 
@@ -590,6 +595,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        char*   proto_port;
        char    wbuf[1024];                                     // work buffer
        char*   tok;                                            // pointer at token in a buffer
+       char*   tok2;
        int             state;
 
        if( ! announced ) {
@@ -646,13 +652,29 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
                port = proto_port;                      // assume something like "1234" was passed
        }
 
-       if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
-               fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
-               return NULL;
-       }
-       if( (tok = strchr( wbuf, '.' )) != NULL ) {
-               *tok = 0;                                                                       // we don't keep domain portion
+       if( (tok = getenv( ENV_SRC_ID )) != NULL ) {                                                    // env var overrides what we dig from system
+               tok = strdup( tok );                                    // something we can destroy
+               if( *tok == '[' ) {                                             // we allow an ipv6 address here
+                       tok2 = strchr( tok, ']' ) + 1;          // we will chop the port (...]:port) if given
+               } else {
+                       tok2 = strchr( tok, ':' );                      // find :port if there so we can chop
+               }
+               if( tok2  && *tok2 ) {                                  // if it's not the end of string marker
+                       *tok2 = 0;                                                      // make it so
+               }
+
+               snprintf( wbuf, RMR_MAX_SRC, "%s", tok );
+               free( tok );
+       } else {
+               if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
+                       fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
+                       return NULL;
+               }
+               if( (tok = strchr( wbuf, '.' )) != NULL ) {
+                       *tok = 0;                                                                       // we don't keep domain portion
+               }
        }
+
        ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
        if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
                fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
@@ -860,7 +882,7 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
        }
 
        chute = &ctx->chutes[0];                                        // chute 0 used only for its semaphore
-       
+
        if( max_wait > 0 ) {
                clock_gettime( CLOCK_REALTIME, &ts );   
 
@@ -881,34 +903,30 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
                seconds = 1;                                                                                                    // use as flag later to invoked timed wait
        }
 
-       errno = 0;
-       state = 0;
-       while( chute->mbuf == NULL && ! errno ) {
+       errno = EINTR;
+       state = -1;
+       while( state < 0 && errno == EINTR ) {
                if( seconds ) {
                        state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
                } else {
                        state = sem_wait( &chute->barrier );
                }
-
-               if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
-                       errno = 0;
-               }
        }
 
        if( state < 0 ) {
                mbuf = ombuf;                           // return caller's buffer if they passed one in
        } else {
+               errno = 0;                                              // interrupted call state could be left; clear
                if( DEBUG ) fprintf( stderr, "[DBUG] mt_rcv extracting from normal ring\n" );
                if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
-                       if( mbuf ) {
-                               mbuf->state = RMR_OK;
-
-                               if( ombuf ) {
-                                       rmr_free_msg( ombuf );                                  // we cannot reuse as mbufs are queued on the ring
-                               }
-                       } else {
-                               mbuf = ombuf;                           // no buffer, return user's if there
+                       mbuf->state = RMR_OK;
+
+                       if( ombuf ) {
+                               rmr_free_msg( ombuf );                                  // we cannot reuse as mbufs are queued on the ring
                        }
+               } else {
+                       errno = ETIMEDOUT;
+                       mbuf = ombuf;                           // no buffer, return user's if there
                }
        }
 
index c04690d..dfe3d04 100644 (file)
@@ -1,4 +1,4 @@
- // : vi ts=4 sw=4 noet 2
+// : vi ts=4 sw=4 noet 2
 /*
 ==================================================================================
        Copyright (c) 2019 Nokia
index ca2e6f2..f08c2f1 100644 (file)
@@ -122,16 +122,25 @@ static int rmr_api_test( ) {
                return 1;
        }
 
+       setenv( "RMR_SRC_ID", "somehost", 1 );                                                          // context should have this as source
        if( (rmc2 = rmr_init( ":6789", 1024, FL_NOTHREAD )) == NULL ) {         // init without starting a thread
                errors += fail_if_nil( rmc, "rmr_init returned a nil pointer for non-threaded init "  );
        }
 
+       fprintf( stderr, "<INFO> with RMR_SRC_ID env set, source name in context: (%s)\n", ((uta_ctx_t *) rmc2)->my_name );
+       v = strcmp( ((uta_ctx_t *) rmc2)->my_name, "somehost:6789" );
+       errors += fail_not_equal( v, 0, "source name not set from environment variable (see previous info)" );
        free_ctx( rmc2 );                       // coverage
-
+       
+       unsetenv( "RMR_SRC_ID" );                                                                                               // context should NOT have our artificial name 
        if( (rmc2 = rmr_init( NULL, 1024, FL_NOTHREAD )) == NULL ) {                    // drive default port selector code
                errors += fail_if_nil( rmc, "rmr_init returned a nil pointer when driving for default port "  );
        }
 
+       fprintf( stderr, "<INFO> after unset of RMR_SRC_ID, source name in context: (%s)\n", ((uta_ctx_t *) rmc2)->my_name );
+       v = strcmp( ((uta_ctx_t *) rmc2)->my_name, "somehost:6789" );
+       errors += fail_if_equal( v, 0, "source name smells when removed from environment (see previous info)" );
+       free_ctx( rmc2 );                       // attempt to reduce leak check errors
 
        v = rmr_ready( rmc );           // unknown return; not checking at the moment
 
@@ -440,6 +449,10 @@ static int rmr_api_test( ) {
        em_set_mtc_msgs( 0 );                                                   // turn off 
        em_set_rcvdelay( 0 );                                                   // full speed receive rate
        ((uta_ctx_t *)rmc)->shutdown = 1;                               // force the mt-reciver attached to the context to stop
+
+       em_set_rcvdelay( 0 );                                                   // let the receive loop spin w/o receives so we drive warning code about queue full
+       sleep( 5 );
+       em_set_rcvdelay( 1 );                                                   // restore slow receive pace for any later tests
 #endif