Revert RTS to use unidirectional connection 52/2552/1 3.2.3
authorE. Scott Daniels <daniels@research.att.com>
Fri, 21 Feb 2020 18:58:19 +0000 (13:58 -0500)
committerE. Scott Daniels <daniels@research.att.com>
Fri, 21 Feb 2020 18:58:19 +0000 (13:58 -0500)
Because of potential performance impacts it was decided to
revert return to sender messages to NOT use the same
connection that the message was received on.  This applies
only to the SI95; RTS has always been on uni-directional
connections with NNG.

This change also enables the MEID routing in the SI95 code.

Issue-ID: RIC-153

Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: I6415effb3283f5961aa0098f1ff0d1380d7aa197

CHANGES
CMakeLists.txt
src/rmr/common/include/rmr_agnostic.h
src/rmr/common/src/rt_generic_static.c
src/rmr/nng/src/rtable_nng_static.c
src/rmr/si/src/rmr_si.c
src/rmr/si/src/rtable_si_static.c
src/rmr/si/src/sr_si_static.c

diff --git a/CHANGES b/CHANGES
index 2854cd5..f77ae13 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -2,6 +2,9 @@
 API and build change  and fix summaries. Doc correctsions
 and/or changes are not mentioned here; see the commit messages.
 
+2020 February 21; version 3.2.3
+       Add meid routing support to the SI95 interface.
+
 2020 February 20; version 3.2.2
        Fix receive thread related core dump (ring early unlock).
 
index 41eee9f..678a408 100644 (file)
@@ -38,7 +38,7 @@ cmake_minimum_required( VERSION 3.5 )
 
 set( major_version "3" )               # should be automatically populated from git tag later, but until CI process sets a tag we use this
 set( minor_version "2" )
-set( patch_level "2" )
+set( patch_level "3" )
 
 set( install_root "${CMAKE_INSTALL_PREFIX}" )
 set( install_inc "include/rmr" )
index 6b1537b..c2139c3 100644 (file)
@@ -301,6 +301,7 @@ static int ie_test( void* r, int i_factor, long inserts );
 static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype );
 static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list );
 static void del_rte( void* st, void* entry, char const* name, void* thing, void* data );
+static endpoint_t*  get_meid_owner( route_table_t *rt, char* meid );
 static char* uta_fib( char* fname );
 static route_table_t* uta_rt_init( );
 static route_table_t* uta_rt_clone( route_table_t* srt );
index f098f53..16fddd1 100644 (file)
@@ -1235,4 +1235,18 @@ static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
        return key;
 }
 
+/*
+       Given a route table and meid string, find the owner (if known). Returns a pointer to
+       the endpoint struct or nil.
+*/
+static inline endpoint_t*  get_meid_owner( route_table_t *rt, char* meid ) {
+       endpoint_t* ep;         // the ep we found in the hash
+
+       if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) {
+               return NULL;
+       }
+
+       return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE ); 
+}
+
 #endif
index 6adf230..2cec490 100644 (file)
@@ -390,20 +390,6 @@ static inline rtable_ent_t*  uta_get_rte( route_table_t *rt, int sid, int mtype,
        return rte;
 }
 
-/*
-       Given a route table and meid string, find the owner (if known). Returns a pointer to
-       the endpoint struct or nil.
-*/
-static inline endpoint_t*  get_meid_owner( route_table_t *rt, char* meid ) {
-       endpoint_t* ep;         // the ep we found in the hash
-
-       if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) {
-               return NULL;
-       }
-
-       return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE ); 
-}
-
 /*
        Return a string of count information. E.g.:
                <ep-name>:<port> <good> <hard-fail> <soft-fail>
index 0a4f806..9c67dc3 100644 (file)
@@ -102,8 +102,8 @@ static void free_ctx( uta_ctx_t* ctx ) {
 
        The allocated len stored in the msg is:
                transport header length +
-               message header + 
-               user requested payload 
+               message header +
+               user requested payload
 
        The msg header is a combination of the fixed RMR header and the variable
        trace data and d2 fields which may vary for each message.
@@ -205,7 +205,7 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
 
                d1 = DATA1_ADDR( msg->header );
                d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
-       }       
+       }
 
        return mtosend_msg( vctx, msg, max_to );
 }
@@ -223,7 +223,7 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
 
                d1 = DATA1_ADDR( msg->header );
                d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
-       }       
+       }
 
        return rmr_mtosend_msg( vctx, msg,  -1 );                                                       // retries < 0  uses default from ctx
 }
@@ -231,16 +231,20 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
 /*
        Return to sender allows a message to be sent back to the endpoint where it originated.
 
-       In the SI world the file descriptor that was the source of the message is captured in
-       the mbuffer and thus can be used to quickly find the target for an RTS call. 
+       With SI95 it was thought that the return to sender would be along the same open conneciton
+       and thus no table lookup would be needed to open a 'reverse direction' path. However, for
+       applications sending at high message rates, returning responses on the same connection
+       causes major strife. Thus the decision was made to use the same method as NNG and just
+       open a second connection for reverse path.
 
-       The source information in the message is used to select the socket on which to write
-       the message rather than using the message type and round-robin selection. This
-       should return a message buffer with the state of the send operation set. On success
-       (state is RMR_OK, the caller may use the buffer for another receive operation), and on
-       error it can be passed back to this function to retry the send if desired. On error,
-       errno will liklely have the failure reason set by the nng send processing.
-       The following are possible values for the state in the message buffer:
+       We will attempt to use the name in the received message to look up the endpoint. If
+       that failes, then we will write on the connection that the message arrived on as a
+       falback.
+
+       On success (state is RMR_OK, the caller may use the buffer for another receive operation),
+       and on error it can be passed back to this function to retry the send if desired. On error,
+       errno will liklely have the failure reason set by the nng send processing.  The following
+       are possible values for the state in the message buffer:
 
        Message states returned:
                RMR_ERR_BADARG - argument (context or msg) was nil or invalid
@@ -253,7 +257,7 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
        failure. The value of errno might give a clue as to what is wrong.
 
        CAUTION:
-               Like send_msg(), this is non-blocking and will return the msg if there is an errror.
+               Like send_msg(), this is non-blocking and will return the msg if there is an error.
                The caller must check for this and handle it properly.
 */
 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
@@ -284,21 +288,20 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
 
        ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
 
-/*
-       sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep, ctx->si_ctx );                      // src is always used first for rts
+       sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep );   // always try src first
        if( ! sock_ok ) {
-*/
-       if( (nn_sock = msg->rts_fd) < 0 ) {
-               if( HDR_VERSION( msg->header ) > 2 ) {                                                  // with ver2 the ip is there, try if src name not known
-                       //sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep, ctx->si_ctx );
-                       sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep  );
-               }
-               if( ! sock_ok ) {
-                       msg->state = RMR_ERR_NOENDPT;
-                       return msg;                                                                                                                             // preallocated msg can be reused since not given back to nn
+               if( (nn_sock = msg->rts_fd) < 0 ) {
+                       if( HDR_VERSION( msg->header ) > 2 ) {                                                  // with ver2 the ip is there, try if src name not known
+                               sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep  );
+                       }
+                       if( ! sock_ok ) {
+                               msg->state = RMR_ERR_NOENDPT;
+                               return msg;
+                       }
                }
        }
 
+
        msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
        hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
        hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );                                        // both the src host and src ip
@@ -310,7 +313,7 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
                                case RMR_OK:
                                        ep->scounts[EPSC_GOOD]++;
                                        break;
-                       
+
                                case RMR_ERR_RETRY:
                                        ep->scounts[EPSC_TRANS]++;
                                        break;
@@ -335,7 +338,7 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
        If multi-threading call is turned on, this invokes that mechanism with the special call
        id of 1 and a max wait of 1 second.  If multi threaded call is not on, then the original
        behavour (described below) is carried out.  This is safe to use when mt is enabled, but
-       the user app is invoking rmr_call() from only one thread, and the caller doesn't need 
+       the user app is invoking rmr_call() from only one thread, and the caller doesn't need
        a flexible timeout.
 
        On timeout this function will return a nil pointer. If the original message could not
@@ -686,7 +689,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
                interface = "0.0.0.0";
        }
-       
+
        snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port );           // FIXME -- si only supports 0.0.0.0 by default
        if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
                rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
@@ -843,7 +846,7 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
        long    nano_sec;                       // max wait xlated to nano seconds
        int             state;
        rmr_mbuf_t*     ombuf;                  // mbuf user passed; if we timeout we return state here
-       
+
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
                errno = EINVAL;
                if( mbuf ) {
@@ -861,7 +864,7 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
                if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
                        if( ombuf ) {
                                rmr_free_msg( ombuf );                          // can't reuse, caller's must be trashed now
-                       }       
+                       }
                } else {
                        mbuf = ombuf;                                           // return original if it was given with timeout status
                        if( ombuf != NULL ) {
@@ -957,7 +960,7 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
        long    seconds = 0;            // max wait seconds
        long    nano_sec;                       // max wait xlated to nano seconds
        int             state;
-       
+
        errno = EINVAL;
        if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
                if( mbuf ) {
@@ -986,7 +989,7 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
                rmr_free_msg( chute->mbuf );
                chute->mbuf = NULL;
        }
-       
+
        hdr = (uta_mhdr_t *) mbuf->header;
        hdr->flags |= HFL_CALL_MSG;                                                                             // must signal this sent with a call
        memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID );                    // xaction that we will wait for
@@ -995,7 +998,7 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
        mbuf->flags |= MFL_NOALLOC;                                                                             // send message without allocating a new one (expect nil from mtosend
 
        if( max_wait >= 0 ) {
-               clock_gettime( CLOCK_REALTIME, &ts );   
+               clock_gettime( CLOCK_REALTIME, &ts );
 
                if( max_wait > 999 ) {
                        seconds = max_wait / 1000;
@@ -1059,20 +1062,20 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
        Given an existing message buffer, reallocate the payload portion to
        be at least new_len bytes.  The message header will remain such that
        the caller may use the rmr_rts_msg() function to return a payload
-       to the sender. 
+       to the sender.
 
        The mbuf passed in may or may not be reallocated and the caller must
-       use the returned pointer and should NOT assume that it can use the 
+       use the returned pointer and should NOT assume that it can use the
        pointer passed in with the exceptions based on the clone flag.
 
        If the clone flag is set, then a duplicated message, with larger payload
        size, is allocated and returned.  The old_msg pointer in this situation is
-       still valid and must be explicitly freed by the application. If the clone 
+       still valid and must be explicitly freed by the application. If the clone
        message is not set (0), then any memory management of the old message is
        handled by the function.
 
-       If the copy flag is set, the contents of the old message's payload is 
-       copied to the reallocated payload.  If the flag is not set, then the 
+       If the copy flag is set, the contents of the old message's payload is
+       copied to the reallocated payload.  If the flag is not set, then the
        contents of the payload is undetermined.
 */
 extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
index 659f857..6215176 100644 (file)
@@ -386,6 +386,66 @@ static int uta_epsock_rr( uta_ctx_t* ctx, rtable_ent_t* rte, int group, int* mor
        return state;
 }
 
+/*
+       Given a message, use the meid field to find the owner endpoint for the meid.
+       The owner ep is then used to extract the socket through which the message
+       is sent. This returns TRUE if we found a socket and it was written to the
+       nn_sock pointer; false if we didn't.
+
+       We've been told that the meid is a string, thus we count on it being a nil
+       terminated set of bytes.
+*/
+static int epsock_meid( uta_ctx_t* ctx, route_table_t *rtable, rmr_mbuf_t* msg, int* nn_sock, endpoint_t** uepp ) {
+       endpoint_t*     ep;                             // seected end point
+       int     state = FALSE;                  // processing state
+       char*   meid;
+       si_ctx_t*       si_ctx;
+
+       if( PARINOID_CHECKS ) {
+               if( ctx == NULL || (si_ctx = ctx->si_ctx) == NULL  ) {
+                       return FALSE;
+               }
+       } else {
+               si_ctx = ctx->si_ctx;
+       }
+
+       errno = 0;
+       if( ! nn_sock || msg == NULL || rtable == NULL ) {                      // missing stuff; bail fast
+               errno = EINVAL;
+               return FALSE;
+       }
+
+       meid = ((uta_mhdr_t *) msg->header)->meid;
+
+       if( (ep = get_meid_owner( rtable, meid )) == NULL ) {
+               if( uepp != NULL ) {                                                            // caller needs refernce to endpoint too
+                       *uepp = NULL;
+               }
+
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_meid: no ep in hash for (%s)\n", meid );
+               return FALSE;
+       }
+
+       state = TRUE;
+       if( ! ep->open ) {                                                              // not connected
+               if( ep->addr == NULL ) {                                        // name didn't resolve before, try again
+                       ep->addr = strdup( ep->name );                  // use the name directly; if not IP then transport will do dns lookup
+               }
+
+               if( uta_link2( si_ctx, ep ) ) {                         // find entry in table and create link
+                       ep->open = TRUE;
+                       *nn_sock = ep->nn_sock;                                 // pass socket back to caller
+               } else {
+                       state = FALSE;
+               }
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_meid: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
+       } else {
+               *nn_sock = ep->nn_sock;
+       }
+
+       return state;
+}
+
 /*
        Finds the rtable entry which matches the key. Returns a nil pointer if
        no entry is found. If try_alternate is set, then we will attempt 
index 0b726dc..485611e 100644 (file)
@@ -713,7 +713,12 @@ static  rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
        send_again = 1;                                                                                 // force loop entry
        group = 0;                                                                                              // always start with group 0
        while( send_again ) {
-               sock_ok = uta_epsock_rr( ctx, rte, group, &send_again, &nn_sock, &ep );         // select endpt from rr group and set again if more groups
+               if( rte->nrrgroups > 0 ) {                                                      // this is a round robin entry if groups are listed
+                       sock_ok = uta_epsock_rr( ctx, rte, group, &send_again, &nn_sock, &ep );         // select endpt from rr group and set again if more groups
+               } else {
+                       sock_ok = epsock_meid( ctx, ctx->rtable, msg, &nn_sock, &ep );
+                       send_again = 0;
+               }
 
                if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
                                msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );