fix(send): Add second key lookup if sub-id is set
[ric-plt/lib/rmr.git] / src / nng / src / rmr_nng.c
index ba7c852..6240d52 100644 (file)
@@ -191,6 +191,11 @@ extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
        more than one group of endpoints defined, then the message will be sent
        in round robin fashion to one endpoint in each group.
 
        more than one group of endpoints defined, then the message will be sent
        in round robin fashion to one endpoint in each group.
 
+       An endpoint will be looked up in the route table using the message type and
+       the subscription id. If the subscription id is "UNSET_SUBID", then only the
+       message type is used.  If the initial lookup, with a subid, fails, then a
+       second lookup using just the mtype is tried.
+
        CAUTION: this is a non-blocking send.  If the message cannot be sent, then
                it will return with an error and errno set to eagain. If the send is
                a limited fanout, then the returned status is the status of the last
        CAUTION: this is a non-blocking send.  If the message cannot be sent, then
                it will return with an error and errno set to eagain. If the send is
                a limited fanout, then the returned status is the status of the last
@@ -205,6 +210,7 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
        rmr_mbuf_t*     clone_m;                // cloned message for an nth send
        int sock_ok;                            // got a valid socket from round robin select
        uint64_t key;                           // mtype or sub-id/mtype sym table key
        rmr_mbuf_t*     clone_m;                // cloned message for an nth send
        int sock_ok;                            // got a valid socket from round robin select
        uint64_t key;                           // mtype or sub-id/mtype sym table key
+       int     altk_ok = 0;                    // set true if we can lookup on alternate key if mt/sid lookup fails
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
@@ -231,18 +237,29 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
        group = 0;                                                                                              // always start with group 0
 
        key = build_rt_key( msg->sub_id, msg->mtype );                  // route table key to find the entry
        group = 0;                                                                                              // always start with group 0
 
        key = build_rt_key( msg->sub_id, msg->mtype );                  // route table key to find the entry
+       if( msg->sub_id != UNSET_SUBID ) {                                              
+               altk_ok = 1;                                                                            // if caller's sub-id doesn't hit with mtype, allow mtype only key for retry
+       }
        while( send_again ) {
                sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock );              // round robin sel epoint; again set if mult groups
        while( send_again ) {
                sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock );              // round robin sel epoint; again set if mult groups
-               if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d\n",
-                               msg->mtype, send_again, group, msg->len, sock_ok );
-               group++;
+               if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d ak_ok=%d\n",
+                               msg->mtype, send_again, group, msg->len, sock_ok, altk_ok );
 
                if( ! sock_ok ) {
 
                if( ! sock_ok ) {
+                       if( altk_ok ) {                                                                                 // we can try with the alternate (no sub-id) key
+                               altk_ok = 0;
+                               key = build_rt_key( UNSET_SUBID, msg->mtype );          // build with just the mtype and try again
+                               send_again = 1;                                                                         // ensure we don't exit the while
+                               continue;
+                       }
+
                        msg->state = RMR_ERR_NOENDPT;
                        errno = ENXIO;                                                                                  // must ensure it's not eagain
                        return msg;                                                                                             // caller can resend (maybe) or free
                }
 
                        msg->state = RMR_ERR_NOENDPT;
                        errno = ENXIO;                                                                                  // must ensure it's not eagain
                        return msg;                                                                                             // caller can resend (maybe) or free
                }
 
+               group++;
+
                if( send_again ) {
                        clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
                        if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
                if( send_again ) {
                        clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
                        if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );