Revert RTS to use unidirectional connection
[ric-plt/lib/rmr.git] / src / rmr / si / src / rmr_si.c
index 0a4f806..9c67dc3 100644 (file)
@@ -102,8 +102,8 @@ static void free_ctx( uta_ctx_t* ctx ) {
 
        The allocated len stored in the msg is:
                transport header length +
-               message header + 
-               user requested payload 
+               message header +
+               user requested payload
 
        The msg header is a combination of the fixed RMR header and the variable
        trace data and d2 fields which may vary for each message.
@@ -205,7 +205,7 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
 
                d1 = DATA1_ADDR( msg->header );
                d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
-       }       
+       }
 
        return mtosend_msg( vctx, msg, max_to );
 }
@@ -223,7 +223,7 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
 
                d1 = DATA1_ADDR( msg->header );
                d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
-       }       
+       }
 
        return rmr_mtosend_msg( vctx, msg,  -1 );                                                       // retries < 0  uses default from ctx
 }
@@ -231,16 +231,20 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
 /*
        Return to sender allows a message to be sent back to the endpoint where it originated.
 
-       In the SI world the file descriptor that was the source of the message is captured in
-       the mbuffer and thus can be used to quickly find the target for an RTS call. 
+       With SI95 it was thought that the return to sender would be along the same open conneciton
+       and thus no table lookup would be needed to open a 'reverse direction' path. However, for
+       applications sending at high message rates, returning responses on the same connection
+       causes major strife. Thus the decision was made to use the same method as NNG and just
+       open a second connection for reverse path.
 
-       The source information in the message is used to select the socket on which to write
-       the message rather than using the message type and round-robin selection. This
-       should return a message buffer with the state of the send operation set. On success
-       (state is RMR_OK, the caller may use the buffer for another receive operation), and on
-       error it can be passed back to this function to retry the send if desired. On error,
-       errno will liklely have the failure reason set by the nng send processing.
-       The following are possible values for the state in the message buffer:
+       We will attempt to use the name in the received message to look up the endpoint. If
+       that failes, then we will write on the connection that the message arrived on as a
+       falback.
+
+       On success (state is RMR_OK, the caller may use the buffer for another receive operation),
+       and on error it can be passed back to this function to retry the send if desired. On error,
+       errno will liklely have the failure reason set by the nng send processing.  The following
+       are possible values for the state in the message buffer:
 
        Message states returned:
                RMR_ERR_BADARG - argument (context or msg) was nil or invalid
@@ -253,7 +257,7 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
        failure. The value of errno might give a clue as to what is wrong.
 
        CAUTION:
-               Like send_msg(), this is non-blocking and will return the msg if there is an errror.
+               Like send_msg(), this is non-blocking and will return the msg if there is an error.
                The caller must check for this and handle it properly.
 */
 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
@@ -284,21 +288,20 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
 
        ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
 
-/*
-       sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep, ctx->si_ctx );                      // src is always used first for rts
+       sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep );   // always try src first
        if( ! sock_ok ) {
-*/
-       if( (nn_sock = msg->rts_fd) < 0 ) {
-               if( HDR_VERSION( msg->header ) > 2 ) {                                                  // with ver2 the ip is there, try if src name not known
-                       //sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep, ctx->si_ctx );
-                       sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep  );
-               }
-               if( ! sock_ok ) {
-                       msg->state = RMR_ERR_NOENDPT;
-                       return msg;                                                                                                                             // preallocated msg can be reused since not given back to nn
+               if( (nn_sock = msg->rts_fd) < 0 ) {
+                       if( HDR_VERSION( msg->header ) > 2 ) {                                                  // with ver2 the ip is there, try if src name not known
+                               sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep  );
+                       }
+                       if( ! sock_ok ) {
+                               msg->state = RMR_ERR_NOENDPT;
+                               return msg;
+                       }
                }
        }
 
+
        msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
        hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
        hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );                                        // both the src host and src ip
@@ -310,7 +313,7 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
                                case RMR_OK:
                                        ep->scounts[EPSC_GOOD]++;
                                        break;
-                       
+
                                case RMR_ERR_RETRY:
                                        ep->scounts[EPSC_TRANS]++;
                                        break;
@@ -335,7 +338,7 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
        If multi-threading call is turned on, this invokes that mechanism with the special call
        id of 1 and a max wait of 1 second.  If multi threaded call is not on, then the original
        behavour (described below) is carried out.  This is safe to use when mt is enabled, but
-       the user app is invoking rmr_call() from only one thread, and the caller doesn't need 
+       the user app is invoking rmr_call() from only one thread, and the caller doesn't need
        a flexible timeout.
 
        On timeout this function will return a nil pointer. If the original message could not
@@ -686,7 +689,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
                interface = "0.0.0.0";
        }
-       
+
        snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port );           // FIXME -- si only supports 0.0.0.0 by default
        if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
                rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
@@ -843,7 +846,7 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
        long    nano_sec;                       // max wait xlated to nano seconds
        int             state;
        rmr_mbuf_t*     ombuf;                  // mbuf user passed; if we timeout we return state here
-       
+
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
                errno = EINVAL;
                if( mbuf ) {
@@ -861,7 +864,7 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
                if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
                        if( ombuf ) {
                                rmr_free_msg( ombuf );                          // can't reuse, caller's must be trashed now
-                       }       
+                       }
                } else {
                        mbuf = ombuf;                                           // return original if it was given with timeout status
                        if( ombuf != NULL ) {
@@ -957,7 +960,7 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
        long    seconds = 0;            // max wait seconds
        long    nano_sec;                       // max wait xlated to nano seconds
        int             state;
-       
+
        errno = EINVAL;
        if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
                if( mbuf ) {
@@ -986,7 +989,7 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
                rmr_free_msg( chute->mbuf );
                chute->mbuf = NULL;
        }
-       
+
        hdr = (uta_mhdr_t *) mbuf->header;
        hdr->flags |= HFL_CALL_MSG;                                                                             // must signal this sent with a call
        memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID );                    // xaction that we will wait for
@@ -995,7 +998,7 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
        mbuf->flags |= MFL_NOALLOC;                                                                             // send message without allocating a new one (expect nil from mtosend
 
        if( max_wait >= 0 ) {
-               clock_gettime( CLOCK_REALTIME, &ts );   
+               clock_gettime( CLOCK_REALTIME, &ts );
 
                if( max_wait > 999 ) {
                        seconds = max_wait / 1000;
@@ -1059,20 +1062,20 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
        Given an existing message buffer, reallocate the payload portion to
        be at least new_len bytes.  The message header will remain such that
        the caller may use the rmr_rts_msg() function to return a payload
-       to the sender. 
+       to the sender.
 
        The mbuf passed in may or may not be reallocated and the caller must
-       use the returned pointer and should NOT assume that it can use the 
+       use the returned pointer and should NOT assume that it can use the
        pointer passed in with the exceptions based on the clone flag.
 
        If the clone flag is set, then a duplicated message, with larger payload
        size, is allocated and returned.  The old_msg pointer in this situation is
-       still valid and must be explicitly freed by the application. If the clone 
+       still valid and must be explicitly freed by the application. If the clone
        message is not set (0), then any memory management of the old message is
        handled by the function.
 
-       If the copy flag is set, the contents of the old message's payload is 
-       copied to the reallocated payload.  If the flag is not set, then the 
+       If the copy flag is set, the contents of the old message's payload is
+       copied to the reallocated payload.  If the flag is not set, then the
        contents of the payload is undetermined.
 */
 extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {