fix(send): Add second key lookup if sub-id is set 44/144/2
authorE. Scott Daniels <daniels@research.att.com>
Mon, 13 May 2019 17:28:39 +0000 (17:28 +0000)
committerE. Scott Daniels <daniels@research.att.com>
Mon, 13 May 2019 18:39:10 +0000 (18:39 +0000)
When finding an endpoint based on mtype+sub_id a second lookup
using just mtype is now performed if the mtype/sub_id combination
did not map to an endpoint.  This allows for "auto correction"
if the user application does not reset (clear) the sub_id when a
mtype should not be sent with one.

Fixed broken unit test for rmr_call()

Change-Id: I5056e3d0a477d5a4272031ad8b2046e3ffca7c8a
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
CMakeLists.txt
src/nanomsg/src/rmr.c
src/nng/src/rmr_nng.c
test/app_test/run_rr_test.ksh
test/rmr_nng_api_static_test.c

index 21ea46a..51639d2 100644 (file)
@@ -23,7 +23,7 @@ cmake_minimum_required( VERSION 3.5 )
 
 set( major_version "1" )               # should be automatically populated from git tag later, but until CI process sets a tag we use this
 set( minor_version "0" )
 
 set( major_version "1" )               # should be automatically populated from git tag later, but until CI process sets a tag we use this
 set( minor_version "0" )
-set( patch_level "21" )
+set( patch_level "22" )
 
 set( install_root "${CMAKE_INSTALL_PREFIX}" )
 set( install_lib "lib" )
 
 set( install_root "${CMAKE_INSTALL_PREFIX}" )
 set( install_lib "lib" )
index 392feb7..e61664b 100644 (file)
@@ -244,6 +244,7 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
        rmr_mbuf_t*     clone_m;                // cloned message for an nth send
        uint64_t key;                           // lookup key is now subid and mtype
        int max_rt = 1000;
        rmr_mbuf_t*     clone_m;                // cloned message for an nth send
        uint64_t key;                           // lookup key is now subid and mtype
        int max_rt = 1000;
+       int     altk_ok = 0;                    // ok to retry with alt key when true
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
@@ -266,18 +267,29 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
        group = 0;                                                                                              // always start with group 0
 
        key = build_rt_key( msg->sub_id, msg->mtype );                  // what we need to find the route table entry
        group = 0;                                                                                              // always start with group 0
 
        key = build_rt_key( msg->sub_id, msg->mtype );                  // what we need to find the route table entry
+       if( msg->sub_id != UNSET_SUBID ) {                                              // if sub id set, allow retry with just mtype if no endpoint when sub-id used
+               altk_ok = 1;
+       }
+
        while( send_again ) {
                max_rt = 1000;
                nn_sock = uta_epsock_rr( ctx->rtable, key, group, &send_again );                // round robin select endpoint; again set if mult groups
        while( send_again ) {
                max_rt = 1000;
                nn_sock = uta_epsock_rr( ctx->rtable, key, group, &send_again );                // round robin select endpoint; again set if mult groups
-               if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d socket=%d len=%d\n",
-                               msg->mtype, send_again, group, nn_sock, msg->len );
-               group++;
+               if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d socket=%d len=%d ak_ok=%d\n",
+                               msg->mtype, send_again, group, nn_sock, msg->len, altk_ok );
 
                if( nn_sock < 0 ) {
 
                if( nn_sock < 0 ) {
+                       if( altk_ok ) {                                                                                 // ok to retry with alternate key
+                               key = build_rt_key( UNSET_SUBID, msg->mtype );          // build key with just mtype and retry
+                               send_again = 1;
+                               altk_ok = 0;            
+                               continue;
+                       }
+
                        msg->state = RMR_ERR_NOENDPT;
                        errno = ENXIO;                                                                                  // must ensure it's not eagain
                        return msg;                                                                                             // caller can resend (maybe) or free
                }
                        msg->state = RMR_ERR_NOENDPT;
                        errno = ENXIO;                                                                                  // must ensure it's not eagain
                        return msg;                                                                                             // caller can resend (maybe) or free
                }
+               group++;
 
                if( send_again ) {
                        clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
 
                if( send_again ) {
                        clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
index ba7c852..6240d52 100644 (file)
@@ -191,6 +191,11 @@ extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
        more than one group of endpoints defined, then the message will be sent
        in round robin fashion to one endpoint in each group.
 
        more than one group of endpoints defined, then the message will be sent
        in round robin fashion to one endpoint in each group.
 
+       An endpoint will be looked up in the route table using the message type and
+       the subscription id. If the subscription id is "UNSET_SUBID", then only the
+       message type is used.  If the initial lookup, with a subid, fails, then a
+       second lookup using just the mtype is tried.
+
        CAUTION: this is a non-blocking send.  If the message cannot be sent, then
                it will return with an error and errno set to eagain. If the send is
                a limited fanout, then the returned status is the status of the last
        CAUTION: this is a non-blocking send.  If the message cannot be sent, then
                it will return with an error and errno set to eagain. If the send is
                a limited fanout, then the returned status is the status of the last
@@ -205,6 +210,7 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
        rmr_mbuf_t*     clone_m;                // cloned message for an nth send
        int sock_ok;                            // got a valid socket from round robin select
        uint64_t key;                           // mtype or sub-id/mtype sym table key
        rmr_mbuf_t*     clone_m;                // cloned message for an nth send
        int sock_ok;                            // got a valid socket from round robin select
        uint64_t key;                           // mtype or sub-id/mtype sym table key
+       int     altk_ok = 0;                    // set true if we can lookup on alternate key if mt/sid lookup fails
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
@@ -231,18 +237,29 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
        group = 0;                                                                                              // always start with group 0
 
        key = build_rt_key( msg->sub_id, msg->mtype );                  // route table key to find the entry
        group = 0;                                                                                              // always start with group 0
 
        key = build_rt_key( msg->sub_id, msg->mtype );                  // route table key to find the entry
+       if( msg->sub_id != UNSET_SUBID ) {                                              
+               altk_ok = 1;                                                                            // if caller's sub-id doesn't hit with mtype, allow mtype only key for retry
+       }
        while( send_again ) {
                sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock );              // round robin sel epoint; again set if mult groups
        while( send_again ) {
                sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock );              // round robin sel epoint; again set if mult groups
-               if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d\n",
-                               msg->mtype, send_again, group, msg->len, sock_ok );
-               group++;
+               if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d ak_ok=%d\n",
+                               msg->mtype, send_again, group, msg->len, sock_ok, altk_ok );
 
                if( ! sock_ok ) {
 
                if( ! sock_ok ) {
+                       if( altk_ok ) {                                                                                 // we can try with the alternate (no sub-id) key
+                               altk_ok = 0;
+                               key = build_rt_key( UNSET_SUBID, msg->mtype );          // build with just the mtype and try again
+                               send_again = 1;                                                                         // ensure we don't exit the while
+                               continue;
+                       }
+
                        msg->state = RMR_ERR_NOENDPT;
                        errno = ENXIO;                                                                                  // must ensure it's not eagain
                        return msg;                                                                                             // caller can resend (maybe) or free
                }
 
                        msg->state = RMR_ERR_NOENDPT;
                        errno = ENXIO;                                                                                  // must ensure it's not eagain
                        return msg;                                                                                             // caller can resend (maybe) or free
                }
 
+               group++;
+
                if( send_again ) {
                        clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
                        if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
                if( send_again ) {
                        clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
                        if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
index 3727cc2..4a3fd1c 100644 (file)
@@ -46,9 +46,9 @@ function run_sender {
        export RMR_RTG_SVC=8990
        if (( $nano_sender ))
        then
        export RMR_RTG_SVC=8990
        if (( $nano_sender ))
        then
-               ./sender_nano $(( nmsg * nrcvrs )) $delay 1
+               ./sender_nano $(( nmsg * nrcvrs )) $delay $max_mtype
        else
        else
-               ./sender $(( nmsg * nrcvrs ))  $delay 1
+               ./sender $(( nmsg * nrcvrs ))  $delay $max_mtype
        fi
        echo $? >/tmp/PID$$.src         # must communicate state back via file b/c asynch
 }
        fi
        echo $? >/tmp/PID$$.src         # must communicate state back via file b/c asynch
 }
@@ -85,15 +85,15 @@ function set_rt {
                rte |0 | $endpoints  |0
                rte |1 | $endpoints  |10
                mse |2 | 20 | $endpoints                # new style mtype/subid entry
                rte |0 | $endpoints  |0
                rte |1 | $endpoints  |10
                mse |2 | 20 | $endpoints                # new style mtype/subid entry
-               rte |3 | $endpoints  |0
-               rte |4 | $endpoints  |0
-               rte |5 | $endpoints  |0
-               rte |6 | $endpoints  |0
-               rte |7 | $endpoints  |0
-               rte |8 | $endpoints  |0
-               rte |9 | $endpoints  |0
-               rte |10 | $endpoints  |0
-               rte |11 | $endpoints  |0
+               rte |3 | $endpoints  | -1
+               rte |4 | $endpoints  | -1
+               rte |5 | $endpoints  | -1
+               rte |6 | $endpoints  | -1
+               rte |7 | $endpoints  | -1
+               rte |8 | $endpoints  | -1
+               rte |9 | $endpoints  | -1
+               rte |10 | $endpoints  | -1
+               rte |11 | $endpoints  | -1
                newrt |end
 endKat
 
                newrt |end
 endKat
 
@@ -114,6 +114,7 @@ nano_receiver=0
 wait=1
 rebuild=0
 verbose=0
 wait=1
 rebuild=0
 verbose=0
+max_mtype=1                                    # causes all msgs to go with type 1; use -M to set up, but likely harder to validate
 nrcvrs=3                                       # this is sane, but -r allows it to be set up
 
 while [[ $1 == -* ]]
 nrcvrs=3                                       # this is sane, but -r allows it to be set up
 
 while [[ $1 == -* ]]
@@ -121,6 +122,7 @@ do
        case $1 in
                -B)     rebuild=1;;
                -d)     delay=$2; shift;;
        case $1 in
                -B)     rebuild=1;;
                -d)     delay=$2; shift;;
+               -m) max_mtype=$2; shift;;
                -N)     nano_sender=1
                        nano_receiver=1
                        ;;
                -N)     nano_sender=1
                        nano_receiver=1
                        ;;
@@ -138,7 +140,6 @@ do
        shift
 done
 
        shift
 done
 
-
 if (( verbose ))
 then
        echo "2" >.verbose
 if (( verbose ))
 then
        echo "2" >.verbose
index e0dc4d8..2db1a20 100644 (file)
@@ -222,22 +222,23 @@ static int rmr_api_test( ) {
        errors += fail_if( errno != 0, "rmr_rts_msg did not reset errno "  );
 
 
        errors += fail_if( errno != 0, "rmr_rts_msg did not reset errno "  );
 
 
-       snprintf( msg->xaction, 17, "%015d", 16 );              // dummy transaction id (emulation generates, this should arrive after a few calls to recv)
 
        msg->state = 0;
        msg = rmr_call( NULL, msg );
        errors += fail_if( msg->state == 0, "rmr_call did not set message state when given message with nil context "  );
 
 
        msg->state = 0;
        msg = rmr_call( NULL, msg );
        errors += fail_if( msg->state == 0, "rmr_call did not set message state when given message with nil context "  );
 
+       snprintf( msg->xaction, 17, "%015d", 16 );              // dummy transaction id (emulation generates, this should arrive after a few calls to recv)
        msg->mtype = 0;
        msg->mtype = 0;
-       msg = rmr_call( rmc, msg );                                             // this call should return a message as we can anticipate a dummy message in
+       msg->sub_id = -1;
+       msg = rmr_call( rmc, msg );                                             // dummy nng/nano function will sequentually add xactions and should match or '16'
        errors += fail_if_nil( msg, "rmr_call returned a nil message on call expected to succeed "  );
        if( msg ) {
                errors += fail_not_equal( msg->state, RMR_OK, "rmr_call did not properly set state on successful return "  );
                errors += fail_not_equal( errno, 0, "rmr_call did not properly set errno (a) on successful return "  );
        }
 
        errors += fail_if_nil( msg, "rmr_call returned a nil message on call expected to succeed "  );
        if( msg ) {
                errors += fail_not_equal( msg->state, RMR_OK, "rmr_call did not properly set state on successful return "  );
                errors += fail_not_equal( errno, 0, "rmr_call did not properly set errno (a) on successful return "  );
        }
 
-       snprintf( wbuf, 17, "%015d", 14 );                              // if we call receive we should find this in the first 15 tries
-       for( i = 0; i < 16; i++ ) {
+       snprintf( wbuf, 17, "%015d", 14 );                              // while waiting, the queued messages should have #14, so issue a few receives looking for it
+       for( i = 0; i < 16; i++ ) {                                             // it should be in the first 15
                msg = rmr_rcv_msg( rmc, msg );
                if( msg ) {
                        if( strcmp( wbuf, msg->xaction ) == 0 ) {               // found the queued message
                msg = rmr_rcv_msg( rmc, msg );
                if( msg ) {
                        if( strcmp( wbuf, msg->xaction ) == 0 ) {               // found the queued message
@@ -254,15 +255,10 @@ static int rmr_api_test( ) {
        if( ! msg ) {
                msg = rmr_alloc_msg( rmc, 2048 );                               // something buggered above; get a new one
        }
        if( ! msg ) {
                msg = rmr_alloc_msg( rmc, 2048 );                               // something buggered above; get a new one
        }
-       msg = rmr_call( rmc, msg );                                                     // make a call that we never expect a response on
-       errors += fail_if_nil( msg, "rmr_call returned a non-nil message on call expected not to receive a response "  );
-       if( msg ) {
-               errors += fail_if_equal( msg->state, RMR_OK, "rmr_call did not properly set state on queued message receive "  );
-               errors += fail_if( errno == 0, "rmr_call did not properly set errno on queued message receivesuccessful "  );
-       }
-
-       msg = rmr_call( rmc, msg );                                             // this should "timeout" because the message xaction id won't ever appear again
-       errors += fail_if_nil( msg, "rmr_call returned a nil message on call expected to fail "  );
+       msg->mtype = 0;
+       msg->sub_id = -1;
+       msg = rmr_call( rmc, msg );                                                     // make a call that we never expect a response on (nil pointer back)
+       errors += fail_not_nil( msg, "rmr_call returned a nil message on call expected not to receive a response "  );
        errors += fail_if( errno == 0, "rmr_call did not set errno on failure "  );
 
        rmr_free_msg( NULL );                   // drive for coverage; nothing to check
        errors += fail_if( errno == 0, "rmr_call did not set errno on failure "  );
 
        rmr_free_msg( NULL );                   // drive for coverage; nothing to check