fix(send): Add second key lookup if sub-id is set 44/144/2
authorE. Scott Daniels <daniels@research.att.com>
Mon, 13 May 2019 17:28:39 +0000 (17:28 +0000)
committerE. Scott Daniels <daniels@research.att.com>
Mon, 13 May 2019 18:39:10 +0000 (18:39 +0000)
When finding an endpoint based on mtype+sub_id a second lookup
using just mtype is now performed if the mtype/sub_id combination
did not map to an endpoint.  This allows for "auto correction"
if the user application does not reset (clear) the sub_id when a
mtype should not be sent with one.

Fixed broken unit test for rmr_call()

Change-Id: I5056e3d0a477d5a4272031ad8b2046e3ffca7c8a
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
CMakeLists.txt
src/nanomsg/src/rmr.c
src/nng/src/rmr_nng.c
test/app_test/run_rr_test.ksh
test/rmr_nng_api_static_test.c

index 21ea46a..51639d2 100644 (file)
@@ -23,7 +23,7 @@ cmake_minimum_required( VERSION 3.5 )
 
 set( major_version "1" )               # should be automatically populated from git tag later, but until CI process sets a tag we use this
 set( minor_version "0" )
-set( patch_level "21" )
+set( patch_level "22" )
 
 set( install_root "${CMAKE_INSTALL_PREFIX}" )
 set( install_lib "lib" )
index 392feb7..e61664b 100644 (file)
@@ -244,6 +244,7 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
        rmr_mbuf_t*     clone_m;                // cloned message for an nth send
        uint64_t key;                           // lookup key is now subid and mtype
        int max_rt = 1000;
+       int     altk_ok = 0;                    // ok to retry with alt key when true
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
@@ -266,18 +267,29 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
        group = 0;                                                                                              // always start with group 0
 
        key = build_rt_key( msg->sub_id, msg->mtype );                  // what we need to find the route table entry
+       if( msg->sub_id != UNSET_SUBID ) {                                              // if sub id set, allow retry with just mtype if no endpoint when sub-id used
+               altk_ok = 1;
+       }
+
        while( send_again ) {
                max_rt = 1000;
                nn_sock = uta_epsock_rr( ctx->rtable, key, group, &send_again );                // round robin select endpoint; again set if mult groups
-               if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d socket=%d len=%d\n",
-                               msg->mtype, send_again, group, nn_sock, msg->len );
-               group++;
+               if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d socket=%d len=%d ak_ok=%d\n",
+                               msg->mtype, send_again, group, nn_sock, msg->len, altk_ok );
 
                if( nn_sock < 0 ) {
+                       if( altk_ok ) {                                                                                 // ok to retry with alternate key
+                               key = build_rt_key( UNSET_SUBID, msg->mtype );          // build key with just mtype and retry
+                               send_again = 1;
+                               altk_ok = 0;            
+                               continue;
+                       }
+
                        msg->state = RMR_ERR_NOENDPT;
                        errno = ENXIO;                                                                                  // must ensure it's not eagain
                        return msg;                                                                                             // caller can resend (maybe) or free
                }
+               group++;
 
                if( send_again ) {
                        clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
index ba7c852..6240d52 100644 (file)
@@ -191,6 +191,11 @@ extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
        more than one group of endpoints defined, then the message will be sent
        in round robin fashion to one endpoint in each group.
 
+       An endpoint will be looked up in the route table using the message type and
+       the subscription id. If the subscription id is "UNSET_SUBID", then only the
+       message type is used.  If the initial lookup, with a subid, fails, then a
+       second lookup using just the mtype is tried.
+
        CAUTION: this is a non-blocking send.  If the message cannot be sent, then
                it will return with an error and errno set to eagain. If the send is
                a limited fanout, then the returned status is the status of the last
@@ -205,6 +210,7 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
        rmr_mbuf_t*     clone_m;                // cloned message for an nth send
        int sock_ok;                            // got a valid socket from round robin select
        uint64_t key;                           // mtype or sub-id/mtype sym table key
+       int     altk_ok = 0;                    // set true if we can lookup on alternate key if mt/sid lookup fails
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
@@ -231,18 +237,29 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
        group = 0;                                                                                              // always start with group 0
 
        key = build_rt_key( msg->sub_id, msg->mtype );                  // route table key to find the entry
+       if( msg->sub_id != UNSET_SUBID ) {                                              
+               altk_ok = 1;                                                                            // if caller's sub-id doesn't hit with mtype, allow mtype only key for retry
+       }
        while( send_again ) {
                sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock );              // round robin sel epoint; again set if mult groups
-               if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d\n",
-                               msg->mtype, send_again, group, msg->len, sock_ok );
-               group++;
+               if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d ak_ok=%d\n",
+                               msg->mtype, send_again, group, msg->len, sock_ok, altk_ok );
 
                if( ! sock_ok ) {
+                       if( altk_ok ) {                                                                                 // we can try with the alternate (no sub-id) key
+                               altk_ok = 0;
+                               key = build_rt_key( UNSET_SUBID, msg->mtype );          // build with just the mtype and try again
+                               send_again = 1;                                                                         // ensure we don't exit the while
+                               continue;
+                       }
+
                        msg->state = RMR_ERR_NOENDPT;
                        errno = ENXIO;                                                                                  // must ensure it's not eagain
                        return msg;                                                                                             // caller can resend (maybe) or free
                }
 
+               group++;
+
                if( send_again ) {
                        clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
                        if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
index 3727cc2..4a3fd1c 100644 (file)
@@ -46,9 +46,9 @@ function run_sender {
        export RMR_RTG_SVC=8990
        if (( $nano_sender ))
        then
-               ./sender_nano $(( nmsg * nrcvrs )) $delay 1
+               ./sender_nano $(( nmsg * nrcvrs )) $delay $max_mtype
        else
-               ./sender $(( nmsg * nrcvrs ))  $delay 1
+               ./sender $(( nmsg * nrcvrs ))  $delay $max_mtype
        fi
        echo $? >/tmp/PID$$.src         # must communicate state back via file b/c asynch
 }
@@ -85,15 +85,15 @@ function set_rt {
                rte |0 | $endpoints  |0
                rte |1 | $endpoints  |10
                mse |2 | 20 | $endpoints                # new style mtype/subid entry
-               rte |3 | $endpoints  |0
-               rte |4 | $endpoints  |0
-               rte |5 | $endpoints  |0
-               rte |6 | $endpoints  |0
-               rte |7 | $endpoints  |0
-               rte |8 | $endpoints  |0
-               rte |9 | $endpoints  |0
-               rte |10 | $endpoints  |0
-               rte |11 | $endpoints  |0
+               rte |3 | $endpoints  | -1
+               rte |4 | $endpoints  | -1
+               rte |5 | $endpoints  | -1
+               rte |6 | $endpoints  | -1
+               rte |7 | $endpoints  | -1
+               rte |8 | $endpoints  | -1
+               rte |9 | $endpoints  | -1
+               rte |10 | $endpoints  | -1
+               rte |11 | $endpoints  | -1
                newrt |end
 endKat
 
@@ -114,6 +114,7 @@ nano_receiver=0
 wait=1
 rebuild=0
 verbose=0
+max_mtype=1                                    # causes all msgs to go with type 1; use -M to set up, but likely harder to validate
 nrcvrs=3                                       # this is sane, but -r allows it to be set up
 
 while [[ $1 == -* ]]
@@ -121,6 +122,7 @@ do
        case $1 in
                -B)     rebuild=1;;
                -d)     delay=$2; shift;;
+               -m) max_mtype=$2; shift;;
                -N)     nano_sender=1
                        nano_receiver=1
                        ;;
@@ -138,7 +140,6 @@ do
        shift
 done
 
-
 if (( verbose ))
 then
        echo "2" >.verbose
index e0dc4d8..2db1a20 100644 (file)
@@ -222,22 +222,23 @@ static int rmr_api_test( ) {
        errors += fail_if( errno != 0, "rmr_rts_msg did not reset errno "  );
 
 
-       snprintf( msg->xaction, 17, "%015d", 16 );              // dummy transaction id (emulation generates, this should arrive after a few calls to recv)
 
        msg->state = 0;
        msg = rmr_call( NULL, msg );
        errors += fail_if( msg->state == 0, "rmr_call did not set message state when given message with nil context "  );
 
+       snprintf( msg->xaction, 17, "%015d", 16 );              // dummy transaction id (emulation generates, this should arrive after a few calls to recv)
        msg->mtype = 0;
-       msg = rmr_call( rmc, msg );                                             // this call should return a message as we can anticipate a dummy message in
+       msg->sub_id = -1;
+       msg = rmr_call( rmc, msg );                                             // dummy nng/nano function will sequentually add xactions and should match or '16'
        errors += fail_if_nil( msg, "rmr_call returned a nil message on call expected to succeed "  );
        if( msg ) {
                errors += fail_not_equal( msg->state, RMR_OK, "rmr_call did not properly set state on successful return "  );
                errors += fail_not_equal( errno, 0, "rmr_call did not properly set errno (a) on successful return "  );
        }
 
-       snprintf( wbuf, 17, "%015d", 14 );                              // if we call receive we should find this in the first 15 tries
-       for( i = 0; i < 16; i++ ) {
+       snprintf( wbuf, 17, "%015d", 14 );                              // while waiting, the queued messages should have #14, so issue a few receives looking for it
+       for( i = 0; i < 16; i++ ) {                                             // it should be in the first 15
                msg = rmr_rcv_msg( rmc, msg );
                if( msg ) {
                        if( strcmp( wbuf, msg->xaction ) == 0 ) {               // found the queued message
@@ -254,15 +255,10 @@ static int rmr_api_test( ) {
        if( ! msg ) {
                msg = rmr_alloc_msg( rmc, 2048 );                               // something buggered above; get a new one
        }
-       msg = rmr_call( rmc, msg );                                                     // make a call that we never expect a response on
-       errors += fail_if_nil( msg, "rmr_call returned a non-nil message on call expected not to receive a response "  );
-       if( msg ) {
-               errors += fail_if_equal( msg->state, RMR_OK, "rmr_call did not properly set state on queued message receive "  );
-               errors += fail_if( errno == 0, "rmr_call did not properly set errno on queued message receivesuccessful "  );
-       }
-
-       msg = rmr_call( rmc, msg );                                             // this should "timeout" because the message xaction id won't ever appear again
-       errors += fail_if_nil( msg, "rmr_call returned a nil message on call expected to fail "  );
+       msg->mtype = 0;
+       msg->sub_id = -1;
+       msg = rmr_call( rmc, msg );                                                     // make a call that we never expect a response on (nil pointer back)
+       errors += fail_not_nil( msg, "rmr_call returned a nil message on call expected not to receive a response "  );
        errors += fail_if( errno == 0, "rmr_call did not set errno on failure "  );
 
        rmr_free_msg( NULL );                   // drive for coverage; nothing to check