Address potential error state after good send
[ric-plt/lib/rmr.git] / src / rmr / nng / src / rmr_nng.c
index e77c9f3..1b6e1f0 100644 (file)
@@ -50,6 +50,8 @@
 #include <unistd.h>
 #include <time.h>
 #include <arpa/inet.h>
 #include <unistd.h>
 #include <time.h>
 #include <arpa/inet.h>
+#include <semaphore.h>
+#include <pthread.h>
 
 #include <nng/nng.h>
 #include <nng/protocol/pubsub0/pub.h>
 
 #include <nng/nng.h>
 #include <nng/protocol/pubsub0/pub.h>
@@ -70,6 +72,8 @@
 #include "tools_static.c"
 #include "sr_nng_static.c"                     // send/receive static functions
 #include "wormholes.c"                         // wormhole api externals and related static functions (must be LAST!)
 #include "tools_static.c"
 #include "sr_nng_static.c"                     // send/receive static functions
 #include "wormholes.c"                         // wormhole api externals and related static functions (must be LAST!)
+#include "mt_call_static.c"
+#include "mt_call_nng_static.c"
 
 
 //------------------------------------------------------------------------------
 
 
 //------------------------------------------------------------------------------
@@ -181,103 +185,20 @@ extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
 }
 
 /*
 }
 
 /*
-       send message with maximum timeout.
-       Accept a message and send it to an endpoint based on message type.
-       If NNG reports that the send attempt timed out, or should be retried,
-       RMr will retry for approximately max_to microseconds; rounded to the next
-       higher value of 10.
-
-       Allocates a new message buffer for the next send. If a message type has
-       more than one group of endpoints defined, then the message will be sent
-       in round robin fashion to one endpoint in each group.
-
-       An endpoint will be looked up in the route table using the message type and
-       the subscription id. If the subscription id is "UNSET_SUBID", then only the
-       message type is used.  If the initial lookup, with a subid, fails, then a
-       second lookup using just the mtype is tried.
-
-       CAUTION: this is a non-blocking send.  If the message cannot be sent, then
-               it will return with an error and errno set to eagain. If the send is
-               a limited fanout, then the returned status is the status of the last
-               send attempt.
-
+       This is a wrapper to the real timeout send. We must wrap it now to ensure that
+       the call flag and call-id are reset
 */
 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
 */
 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
-       nng_socket nn_sock;                     // endpoint socket for send
-       uta_ctx_t*      ctx;
-       int     group;                                  // selected group to get socket for
-       int send_again;                         // true if the message must be sent again
-       rmr_mbuf_t*     clone_m;                // cloned message for an nth send
-       int sock_ok;                            // got a valid socket from round robin select
-       uint64_t key;                           // mtype or sub-id/mtype sym table key
-       int     altk_ok = 0;                    // set true if we can lookup on alternate key if mt/sid lookup fails
-
-       if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
-               errno = EINVAL;                                                                                         // if msg is null, this is their clue
-               if( msg != NULL ) {
-                       msg->state = RMR_ERR_BADARG;
-                       errno = EINVAL;                                                                                 // must ensure it's not eagain
-               }
-               return msg;
-       }
-
-       errno = 0;                                                                                                      // clear; nano might set, but ensure it's not left over if it doesn't
-       if( msg->header == NULL ) {
-               fprintf( stderr, "rmr_send_msg: ERROR: message had no header\n" );
-               msg->state = RMR_ERR_NOHDR;
-               errno = EBADMSG;                                                                                        // must ensure it's not eagain
-               return msg;
-       }
-
-       if( max_to < 0 ) {
-               max_to = ctx->send_retries;             // convert to retries
-       }
+       char* d1;                                                                                                                       // point at the call-id in the header
 
 
-       send_again = 1;                                                                                 // force loop entry
-       group = 0;                                                                                              // always start with group 0
+       if( msg != NULL ) {
+               ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
 
 
-       key = build_rt_key( msg->sub_id, msg->mtype );                  // route table key to find the entry
-       if( msg->sub_id != UNSET_SUBID ) {
-               altk_ok = 1;                                                                            // if caller's sub-id doesn't hit with mtype, allow mtype only key for retry
-       }
-       while( send_again ) {
-               sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock );              // round robin sel epoint; again set if mult groups
-               if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d ak_ok=%d\n",
-                               msg->mtype, send_again, group, msg->len, sock_ok, altk_ok );
-
-               if( ! sock_ok ) {
-                       if( altk_ok ) {                                                                                 // we can try with the alternate (no sub-id) key
-                               altk_ok = 0;
-                               key = build_rt_key( UNSET_SUBID, msg->mtype );          // build with just the mtype and try again
-                               send_again = 1;                                                                         // ensure we don't exit the while
-                               continue;
-                       }
-
-                       msg->state = RMR_ERR_NOENDPT;
-                       errno = ENXIO;                                                                                  // must ensure it's not eagain
-                       return msg;                                                                                             // caller can resend (maybe) or free
-               }
-
-               group++;
-
-               if( send_again ) {
-                       clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
-                       if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
-                       msg->flags |= MFL_NOALLOC;                                                              // send should not allocate a new buffer
-                       msg = send_msg( ctx, msg, nn_sock, max_to );                    // do the hard work, msg should be nil on success
-                       /*
-                       if( msg ) {
-                               // error do we need to count successes/errors, how to report some success, esp if last fails?
-                       }
-                       */
-
-                       msg = clone_m;                                                                                  // clone will be the next to send
-               } else {
-                       msg = send_msg( ctx, msg, nn_sock, max_to );                    // send the last, and allocate a new buffer; drops the clone if it was
-               }
-       }
+               d1 = DATA1_ADDR( msg->header );
+               d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
+       }       
 
 
-       return msg;                                                                     // last message caries the status of last/only send attempt
+       return mtosend_msg( vctx, msg, max_to );
 }
 
 /*
 }
 
 /*
@@ -286,7 +207,16 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
        See rmr_stimeout() for info on setting the default timeout.
 */
 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
        See rmr_stimeout() for info on setting the default timeout.
 */
 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
-       return rmr_mtosend_msg( vctx, msg,  -1 );                       // retries <  uses default from ctx
+       char* d1;                                                                                                               // point at the call-id in the header
+
+       if( msg != NULL ) {
+               ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
+
+               d1 = DATA1_ADDR( msg->header );
+               d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
+       }       
+
+       return rmr_mtosend_msg( vctx, msg,  -1 );                                                       // retries < 0  uses default from ctx
 }
 
 /*
 }
 
 /*
@@ -314,17 +244,17 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
                The caller must check for this and handle.
 */
 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
                The caller must check for this and handle.
 */
 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
-       nng_socket nn_sock;                     // endpoint socket for send
+       nng_socket      nn_sock;                        // endpoint socket for send
        uta_ctx_t*      ctx;
        uta_ctx_t*      ctx;
-       int state;
-       uta_mhdr_t*     hdr;
-       char*   hold_src;                       // we need the original source if send fails
-       int             sock_ok;                        // true if we found a valid endpoint socket
+       int                     state;
+       char*           hold_src;                       // we need the original source if send fails
+       int                     sock_ok = 0;            // true if we found a valid endpoint socket
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
                if( msg != NULL ) {
                        msg->state = RMR_ERR_BADARG;
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
                if( msg != NULL ) {
                        msg->state = RMR_ERR_BADARG;
+                       msg->tp_state = errno;
                }
                return msg;
        }
                }
                return msg;
        }
@@ -333,22 +263,29 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
        if( msg->header == NULL ) {
                fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" );
                msg->state = RMR_ERR_NOHDR;
        if( msg->header == NULL ) {
                fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" );
                msg->state = RMR_ERR_NOHDR;
+               msg->tp_state = errno;
                return msg;
        }
 
                return msg;
        }
 
-       sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock );                        // socket of specific endpoint
+       ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
+       if( HDR_VERSION( msg->header ) > 2 ) {                                                  // new version uses sender's ip address for rts
+               sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock );                      // default to IP based rts
+       }
        if( ! sock_ok ) {
        if( ! sock_ok ) {
-               msg->state = RMR_ERR_NOENDPT;
-               return msg;                                                     // preallocated msg can be reused since not given back to nn
+               sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock );                // IP  not in rt, try name
+               if( ! sock_ok ) {
+                       msg->state = RMR_ERR_NOENDPT;
+                       return msg;                                                                                                                             // preallocated msg can be reused since not given back to nn
+               }
        }
 
        }
 
-       msg->state = RMR_OK;                                    // ensure it is clear before send
-       hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                                 // the dest where we're returning the message to
-       strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );                        // must overlay the source to be ours
+       msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
+       hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
+       strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );        // must overlay the source to be ours
        msg = send_msg( ctx, msg, nn_sock, -1 );
        if( msg ) {
        msg = send_msg( ctx, msg, nn_sock, -1 );
        if( msg ) {
-               strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SID );                    // always return original source so rts can be called again
-               msg->flags |= MFL_ADDSRC;                                                                                                       // if msg given to send() it must add source
+               strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );    // always return original source so rts can be called again
+               msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
        }
 
        free( hold_src );
        }
 
        free( hold_src );
@@ -356,6 +293,16 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
 }
 
 /*
 }
 
 /*
+       If multi-threading call is turned on, this invokes that mechanism with the special call
+       id of 1 and a max wait of 1 second.  If multi threaded call is not on, then the original
+       behavour (described below) is carried out.  This is safe to use when mt is enabled, but
+       the user app is invoking rmr_call() from only one thread, and the caller doesn't need 
+       a flexible timeout.
+
+       On timeout this function will return a nil pointer. If the original message could not
+       be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status.
+
+       Original behavour:
        Call sends the message based on message routing using the message type, and waits for a
        response message to arrive with the same transaction id that was in the outgoing message.
        If, while wiating for the expected response,  messages are received which do not have the
        Call sends the message based on message routing using the message type, and waits for a
        response message to arrive with the same transaction id that was in the outgoing message.
        If, while wiating for the expected response,  messages are received which do not have the
@@ -373,8 +320,6 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
                EAGAIN  -- the underlying message system wsa interrupted or the device was busy;
                                        user should call this function with the message again.
 
                EAGAIN  -- the underlying message system wsa interrupted or the device was busy;
                                        user should call this function with the message again.
 
-
-       QUESTION:  should user specify the number of messages to allow to queue?
 */
 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
        uta_ctx_t*              ctx;
 */
 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
        uta_ctx_t*              ctx;
@@ -387,6 +332,10 @@ extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
                return msg;
        }
 
                return msg;
        }
 
+       if( ctx->flags & CFL_MTC_ENABLED ) {                            // if multi threaded call is on, use that
+               return rmr_mt_call( vctx, msg, 1, 1000 );               // use the reserved call-id of 1 and wait up to 1 sec
+       }
+
        memcpy( expected_id, msg->xaction, RMR_MAX_XID );
        expected_id[RMR_MAX_XID] = 0;                                   // ensure it's a string
        if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rmr_call is making call, waiting for (%s)\n", expected_id );
        memcpy( expected_id, msg->xaction, RMR_MAX_XID );
        expected_id[RMR_MAX_XID] = 0;                                   // ensure it's a string
        if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rmr_call is making call, waiting for (%s)\n", expected_id );
@@ -398,6 +347,7 @@ extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
                if( msg->state != RMR_ERR_RETRY ) {
                        msg->state = RMR_ERR_CALLFAILED;                // errno not available to all wrappers; don't stomp if marked retry
                }
                if( msg->state != RMR_ERR_RETRY ) {
                        msg->state = RMR_ERR_CALLFAILED;                // errno not available to all wrappers; don't stomp if marked retry
                }
+               msg->tp_state = errno;
                return msg;
        }
 
                return msg;
        }
 
@@ -418,14 +368,19 @@ extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
        rmr_mbuf_t*     qm;                             // message that was queued on the ring
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
        rmr_mbuf_t*     qm;                             // message that was queued on the ring
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               errno = EINVAL;
                if( old_msg != NULL ) {
                        old_msg->state = RMR_ERR_BADARG;
                if( old_msg != NULL ) {
                        old_msg->state = RMR_ERR_BADARG;
+                       old_msg->tp_state = errno;
                }
                }
-               errno = EINVAL;
                return old_msg;
        }
        errno = 0;
 
                return old_msg;
        }
        errno = 0;
 
+       if( ctx->flags & CFL_MTC_ENABLED ) {                                            // must pop from ring with a semaphore dec first
+               return rmr_mt_rcv( ctx, old_msg, -1 );
+       }
+
        qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
        if( qm != NULL ) {
                if( old_msg ) {
        qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
        if( qm != NULL ) {
                if( old_msg ) {
@@ -451,13 +406,18 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
        rmr_mbuf_t* msg;
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
        rmr_mbuf_t* msg;
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               errno = EINVAL;
                if( old_msg != NULL ) {
                        old_msg->state = RMR_ERR_BADARG;
                if( old_msg != NULL ) {
                        old_msg->state = RMR_ERR_BADARG;
+                       old_msg->tp_state = errno;
                }
                }
-               errno = EINVAL;
                return old_msg;
        }
 
                return old_msg;
        }
 
+       if( ctx->flags & CFL_MTC_ENABLED ) {                                            // must pop from ring with a semaphore dec first
+               return rmr_mt_rcv( ctx, old_msg, ms_to );
+       }
+
        qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
        if( qm != NULL ) {
                if( old_msg ) {
        qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
        if( qm != NULL ) {
                if( old_msg ) {
@@ -502,6 +462,7 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
        nready = epoll_wait( eps->ep_fd, eps->events, 1, ms_to );     // block until something or timedout
        if( nready <= 0 ) {                                             // we only wait on ours, so we assume ready means it's ours
                msg->state = RMR_ERR_TIMEOUT;
        nready = epoll_wait( eps->ep_fd, eps->events, 1, ms_to );     // block until something or timedout
        if( nready <= 0 ) {                                             // we only wait on ours, so we assume ready means it's ours
                msg->state = RMR_ERR_TIMEOUT;
+               msg->tp_state = errno;
        } else {
                return rcv_msg( ctx, msg );                                                             // receive it and return it
        }
        } else {
                return rcv_msg( ctx, msg );                                                             // receive it and return it
        }
@@ -525,10 +486,11 @@ extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect,
        int     exp_len = 0;                    // length of expected ID
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
        int     exp_len = 0;                    // length of expected ID
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               errno = EINVAL;
                if( msg != NULL ) {
                        msg->state = RMR_ERR_BADARG;
                if( msg != NULL ) {
                        msg->state = RMR_ERR_BADARG;
+                       msg->tp_state = errno;
                }
                }
-               errno = EINVAL;
                return msg;
        }
 
                return msg;
        }
 
@@ -569,17 +531,15 @@ extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect,
        return NULL;
 }
 
        return NULL;
 }
 
-//  CAUTION:  these are not supported as they must be set differently (between create and open) in NNG.
-//                             until those details are worked out, these generate a warning.
 /*
 /*
-       Set send timeout. The value time is assumed to be microseconds.  The timeout is the
-       rough maximum amount of time that RMr will block on a send attempt when the underlying
+       Set send timeout. The value time is assumed to be milliseconds.  The timeout is the
+       _rough_ maximum amount of time that RMr will block on a send attempt when the underlying
        mechnism indicates eagain or etimeedout.  All other error conditions are reported
        without this delay. Setting a timeout of 0 causes no retries to be attempted in
        mechnism indicates eagain or etimeedout.  All other error conditions are reported
        without this delay. Setting a timeout of 0 causes no retries to be attempted in
-       RMr code. Setting a timeout of 1 causes RMr to spin up to 10K retries before returning,
-       but without issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us)
-       after every 10K send attempts until the time value is reached. Retries are abandoned
-       if NNG returns anything other than NNG_AGAIN or NNG_TIMEDOUT.
+       RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
+       but _without_ issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us)
+       after every 1K send attempts until the "time" value is reached. Retries are abandoned
+       if NNG returns anything other than NNG_EAGAIN or NNG_ETIMEDOUT.
 
        The default, if this function is not used, is 1; meaning that RMr will retry, but will
        not enter a sleep.  In all cases the caller should check the status in the message returned
 
        The default, if this function is not used, is 1; meaning that RMr will retry, but will
        not enter a sleep.  In all cases the caller should check the status in the message returned
@@ -604,6 +564,8 @@ extern int rmr_set_stimeout( void* vctx, int time ) {
 
 /*
        Set receive timeout -- not supported in nng implementation
 
 /*
        Set receive timeout -- not supported in nng implementation
+
+       CAUTION:  this is not supported as they must be set differently (between create and open) in NNG.
 */
 extern int rmr_set_rtimeout( void* vctx, int time ) {
        fprintf( stderr, "[WRN] Current implementation of RMR ontop of NNG does not support setting a receive timeout\n" );
 */
 extern int rmr_set_rtimeout( void* vctx, int time ) {
        fprintf( stderr, "[WRN] Current implementation of RMR ontop of NNG does not support setting a receive timeout\n" );
@@ -650,7 +612,14 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        memset( ctx, 0, sizeof( uta_ctx_t ) );
 
        ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
        memset( ctx, 0, sizeof( uta_ctx_t ) );
 
        ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
-       ctx->mring = uta_mk_ring( 128 );                                // message ring to hold asynch msgs received while waiting for call response
+       ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
+
+       if( flags & RMRFL_MTCALL ) {                                    // mt call support is on, need bigger ring
+               ctx->mring = uta_mk_ring( 2048 );                       // message ring filled by rcv thread
+               init_mtcall( ctx );                                                     // set up call chutes
+       } else {
+               ctx->mring = uta_mk_ring( 128 );                        // ring filled only on blocking call
+       }
 
        ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
        if( max_msg_size > 0 ) {
 
        ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
        if( max_msg_size > 0 ) {
@@ -684,14 +653,35 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        if( (tok = strchr( wbuf, '.' )) != NULL ) {
                *tok = 0;                                                                       // we don't keep domain portion
        }
        if( (tok = strchr( wbuf, '.' )) != NULL ) {
                *tok = 0;                                                                       // we don't keep domain portion
        }
-       ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SID );
-       if( snprintf( ctx->my_name, RMR_MAX_SID, "%s:%s", wbuf, port ) >= RMR_MAX_SID ) {                       // our registered name is host:port
-               fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SID, wbuf, port );
+       ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
+       if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
+               fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
                return NULL;
        }
 
                return NULL;
        }
 
-       ctx->ip_list = mk_ip_list( port );              // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
+       if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
+               if( atoi( tok ) > 0 ) {
+                       flags |= RMRFL_NAME_ONLY;                                       // don't allow IP addreess to go out in messages
+               }
+       }
+
+       ctx->ip_list = mk_ip_list( port );                              // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
+       if( flags & RMRFL_NAME_ONLY ) {
+               ctx->my_ip = strdup( ctx->my_name );                    // user application or env var has specified that IP address is NOT sent out, use name
+       } else {
+               ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
+               if( ctx->my_ip == NULL ) {
+                       fprintf( stderr, "[WRN] rmr_init: default ip address could not be sussed out, using name\n" );
+                       strcpy( ctx->my_ip, ctx->my_name );                     // if we cannot suss it out, use the name rather than a nil pointer
+               }
+       }
+       if( DEBUG ) fprintf( stderr, "[DBUG] default ip address: %s\n", ctx->my_ip );
 
 
+       if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
+               if( *tok == '1' ) {
+                       ctx->flags |= CTXFL_WARN;                                       // turn on some warnings (not all, just ones that shouldn't impact performance)
+               }
+       }
 
 
        if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
 
 
        if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
@@ -701,7 +691,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        //       rather than using this generic listen() call.
        snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port );
        if( (state = nng_listen( ctx->nn_sock, bind_info, NULL, NO_FLAGS )) != 0 ) {
        //       rather than using this generic listen() call.
        snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port );
        if( (state = nng_listen( ctx->nn_sock, bind_info, NULL, NO_FLAGS )) != 0 ) {
-               fprintf( stderr, "[CRIT] rmr_init: unable to start nng listener for %s: %s\n", bind_info, nng_strerror( state ) );
+               fprintf( stderr, "[CRI] rmr_init: unable to start nng listener for %s: %s\n", bind_info, nng_strerror( state ) );
                nng_close( ctx->nn_sock );
                free_ctx( ctx );
                return NULL;
                nng_close( ctx->nn_sock );
                free_ctx( ctx );
                return NULL;
@@ -709,10 +699,18 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
 
        if( !(flags & FL_NOTHREAD) ) {                                                                          // skip if internal function that doesnt need an rtc
                if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the rt collector thread
 
        if( !(flags & FL_NOTHREAD) ) {                                                                          // skip if internal function that doesnt need an rtc
                if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the rt collector thread
-                       fprintf( stderr, "[WARN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
+                       fprintf( stderr, "[WRN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
                }
        }
 
                }
        }
 
+       if( (flags & RMRFL_MTCALL) && ! (ctx->flags & CFL_MTC_ENABLED) ) {      // mt call support is on, must start the listener thread if not running
+               ctx->flags |= CFL_MTC_ENABLED;
+               if( pthread_create( &ctx->mtc_th,  NULL, mt_receive, (void *) ctx ) ) {         // kick the receiver
+                       fprintf( stderr, "[WRN] rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
+               }
+               
+       }
+
        free( proto_port );
        return (void *) ctx;
 }
        free( proto_port );
        return (void *) ctx;
 }
@@ -818,4 +816,229 @@ extern void rmr_close( void* vctx ) {
 }
 
 
 }
 
 
+// ----- multi-threaded call/receive support -------------------------------------------------
 
 
+/*
+       Blocks on the receive ring chute semaphore and then reads from the ring
+       when it is tickled.  If max_wait is -1 then the function blocks until
+       a message is ready on the ring. Else max_wait is assumed to be the number
+       of millaseconds to wait before returning a timeout message.
+*/
+extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
+       uta_ctx_t*      ctx;
+       uta_mhdr_t*     hdr;                    // header in the transport buffer
+       chute_t*        chute;
+       struct timespec ts;                     // time info if we have a timeout
+       long    new_ms;                         // adjusted mu-sec
+       long    seconds = 0;            // max wait seconds
+       long    nano_sec;                       // max wait xlated to nano seconds
+       int             state;
+       rmr_mbuf_t*     ombuf;                  // mbuf user passed; if we timeout we return state here
+       
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               errno = EINVAL;
+               if( mbuf ) {
+                       mbuf->state = RMR_ERR_BADARG;
+                       mbuf->tp_state = errno;
+               }
+               return mbuf;
+       }
+
+       if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
+               errno = EINVAL;
+               if( mbuf != NULL ) {
+                       mbuf->state = RMR_ERR_NOTSUPP;
+                       mbuf->tp_state = errno;
+               }
+               return mbuf;
+       }
+
+       ombuf = mbuf;
+       if( ombuf ) {
+               ombuf->state = RMR_ERR_TIMEOUT;                 // preset if for failure
+               ombuf->len = 0;
+       }
+
+       chute = &ctx->chutes[0];                                        // chute 0 used only for its semaphore
+       
+       if( max_wait > 0 ) {
+               clock_gettime( CLOCK_REALTIME, &ts );   
+
+               if( max_wait > 999 ) {
+                       seconds = (max_wait - 999)/1000;
+                       max_wait -= seconds * 1000;
+                       ts.tv_sec += seconds;
+               }
+               if( max_wait > 0 ) {
+                       nano_sec = max_wait * 1000000;
+                       ts.tv_nsec += nano_sec;
+                       if( ts.tv_nsec > 999999999 ) {
+                               ts.tv_nsec -= 999999999;
+                               ts.tv_sec++;
+                       }
+               }
+
+               seconds = 1;                                                                                                    // use as flag later to invoked timed wait
+       }
+
+       errno = 0;
+       state = 0;
+       while( chute->mbuf == NULL && ! errno ) {
+               if( seconds ) {
+                       state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
+               } else {
+                       state = sem_wait( &chute->barrier );
+               }
+
+               if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
+                       errno = 0;
+               }
+       }
+
+       if( state < 0 ) {
+               mbuf = ombuf;                           // return caller's buffer if they passed one in
+       } else {
+               if( DEBUG ) fprintf( stderr, "[DBUG] mt_rcv extracting from normal ring\n" );
+               if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
+                       if( mbuf ) {
+                               mbuf->state = RMR_OK;
+
+                               if( ombuf ) {
+                                       rmr_free_msg( ombuf );                                  // we cannot reuse as mbufs are queued on the ring
+                               }
+                       } else {
+                               mbuf = ombuf;                           // no buffer, return user's if there
+                       }
+               }
+       }
+
+       if( mbuf ) {
+               mbuf->tp_state = errno;
+       }
+       return mbuf;
+}
+
+/*
+       Accept a message buffer and caller ID, send the message and then wait
+       for the receiver to tickle the semaphore letting us know that a message
+       has been received. The call_id is a value between 2 and 255, inclusive; if
+       it's not in this range an error will be returned. Max wait is the amount
+       of time in millaseconds that the call should block for. If 0 is given
+       then no timeout is set.
+
+       If the mt_call feature has not been initialised, then the attempt to use this
+       funciton will fail with RMR_ERR_NOTSUPP
+
+       If no matching message is received before the max_wait period expires, a
+       nil pointer is returned, and errno is set to ETIMEOUT. If any other error
+       occurs after the message has been sent, then a nil pointer is returned
+       with errno set to some other value.
+*/
+extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
+       rmr_mbuf_t* ombuf;                      // original mbuf passed in
+       uta_ctx_t*      ctx;
+       uta_mhdr_t*     hdr;                    // header in the transport buffer
+       chute_t*        chute;
+       unsigned char*  d1;                     // d1 data in header
+       struct timespec ts;                     // time info if we have a timeout
+       long    new_ms;                         // adjusted mu-sec
+       long    seconds = 0;            // max wait seconds
+       long    nano_sec;                       // max wait xlated to nano seconds
+       int             state;
+       
+       errno = EINVAL;
+       if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
+               if( mbuf ) {
+                       mbuf->tp_state = errno;
+                       mbuf->state = RMR_ERR_BADARG;
+               }
+               return mbuf;
+       }
+
+       if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
+               mbuf->state = RMR_ERR_NOTSUPP;
+               mbuf->tp_state = errno;
+               return mbuf;
+       }
+
+       if( call_id > MAX_CALL_ID || call_id < 2 ) {                                    // 0 and 1 are reserved; user app cannot supply them
+               mbuf->state = RMR_ERR_BADARG;
+               mbuf->tp_state = errno;
+               return mbuf;
+       }
+
+       ombuf = mbuf;                                                                                                   // save to return timeout status with
+
+       chute = &ctx->chutes[call_id];
+       if( chute->mbuf != NULL ) {                                                                             // probably a delayed message that wasn't dropped
+               rmr_free_msg( chute->mbuf );
+               chute->mbuf = NULL;
+       }
+       
+       hdr = (uta_mhdr_t *) mbuf->header;
+       hdr->flags |= HFL_CALL_MSG;                                                                             // must signal this sent with a call
+       memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID );                    // xaction that we will wait for
+       d1 = DATA1_ADDR( hdr );
+       d1[D1_CALLID_IDX] = (unsigned char) call_id;                                    // set the caller ID for the response
+       mbuf->flags |= MFL_NOALLOC;                                                                             // send message without allocating a new one (expect nil from mtosend
+
+       if( max_wait > 0 ) {
+               clock_gettime( CLOCK_REALTIME, &ts );   
+
+               if( max_wait > 999 ) {
+                       seconds = (max_wait - 999)/1000;
+                       max_wait -= seconds * 1000;
+                       ts.tv_sec += seconds;
+               }
+               if( max_wait > 0 ) {
+                       nano_sec = max_wait * 1000000;
+                       ts.tv_nsec += nano_sec;
+                       if( ts.tv_nsec > 999999999 ) {
+                               ts.tv_nsec -= 999999999;
+                               ts.tv_sec++;
+                       }
+               }
+
+               seconds = 1;                                                                            // use as flag later to invoked timed wait
+       }
+
+       mbuf = mtosend_msg( ctx, mbuf, 0 );                                             // use internal function so as not to strip call-id; should be nil on success!
+       if( mbuf ) {
+               if( mbuf->state != RMR_OK ) {
+                       mbuf->tp_state = errno;
+                       return mbuf;                                                                    // timeout or unable to connect or no endpoint are most likely issues
+               }
+       }
+
+       state = 0;
+       errno = 0;
+       while( chute->mbuf == NULL && ! errno ) {
+               if( seconds ) {
+                       state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
+               } else {
+                       state = sem_wait( &chute->barrier );
+               }
+
+               if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
+                       errno = 0;
+               }
+
+               if( chute->mbuf != NULL ) {                                                                             // offload receiver thread and check xaction buffer here
+                       if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
+                               rmr_free_msg( chute->mbuf );
+                               chute->mbuf = NULL;
+                               errno = 0;
+                       }
+               }
+       }
+
+       if( state < 0 ) {
+               return NULL;                                    // leave errno as set by sem wait call
+       }
+
+       mbuf = chute->mbuf;
+       mbuf->state = RMR_OK;
+       chute->mbuf = NULL;
+
+       return mbuf;
+}