Release commit to push 3.6.3 in package cloud
[ric-plt/lib/rmr.git] / src / rmr / nng / src / rmr_nng.c
index e77c9f3..8d55b44 100644 (file)
@@ -1,8 +1,8 @@
-// : vi ts=4 sw=4 noet :
+// vim: ts=4 sw=4 noet :
 /*
 ==================================================================================
 /*
 ==================================================================================
-       Copyright (c) 2019 Nokia
-       Copyright (c) 2018-2019 AT&T Intellectual Property.
+       Copyright (c) 2019-2020 Nokia
+       Copyright (c) 2018-2020 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
@@ -50,6 +50,8 @@
 #include <unistd.h>
 #include <time.h>
 #include <arpa/inet.h>
 #include <unistd.h>
 #include <time.h>
 #include <arpa/inet.h>
+#include <semaphore.h>
+#include <pthread.h>
 
 #include <nng/nng.h>
 #include <nng/protocol/pubsub0/pub.h>
 
 #include <nng/nng.h>
 #include <nng/protocol/pubsub0/pub.h>
@@ -62,6 +64,7 @@
 #include "rmr_agnostic.h"              // agnostic things (must be included before private)
 #include "rmr_nng_private.h"   // things that we need too
 #include "rmr_symtab.h"
 #include "rmr_agnostic.h"              // agnostic things (must be included before private)
 #include "rmr_nng_private.h"   // things that we need too
 #include "rmr_symtab.h"
+#include "rmr_logging.h"
 
 #include "ring_static.c"                       // message ring support
 #include "rt_generic_static.c"         // route table things not transport specific
 
 #include "ring_static.c"                       // message ring support
 #include "rt_generic_static.c"         // route table things not transport specific
@@ -70,6 +73,8 @@
 #include "tools_static.c"
 #include "sr_nng_static.c"                     // send/receive static functions
 #include "wormholes.c"                         // wormhole api externals and related static functions (must be LAST!)
 #include "tools_static.c"
 #include "sr_nng_static.c"                     // send/receive static functions
 #include "wormholes.c"                         // wormhole api externals and related static functions (must be LAST!)
+#include "mt_call_static.c"
+#include "mt_call_nng_static.c"
 
 
 //------------------------------------------------------------------------------
 
 
 //------------------------------------------------------------------------------
@@ -181,103 +186,20 @@ extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
 }
 
 /*
 }
 
 /*
-       send message with maximum timeout.
-       Accept a message and send it to an endpoint based on message type.
-       If NNG reports that the send attempt timed out, or should be retried,
-       RMr will retry for approximately max_to microseconds; rounded to the next
-       higher value of 10.
-
-       Allocates a new message buffer for the next send. If a message type has
-       more than one group of endpoints defined, then the message will be sent
-       in round robin fashion to one endpoint in each group.
-
-       An endpoint will be looked up in the route table using the message type and
-       the subscription id. If the subscription id is "UNSET_SUBID", then only the
-       message type is used.  If the initial lookup, with a subid, fails, then a
-       second lookup using just the mtype is tried.
-
-       CAUTION: this is a non-blocking send.  If the message cannot be sent, then
-               it will return with an error and errno set to eagain. If the send is
-               a limited fanout, then the returned status is the status of the last
-               send attempt.
-
+       This is a wrapper to the real timeout send. We must wrap it now to ensure that
+       the call flag and call-id are reset
 */
 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
 */
 extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
-       nng_socket nn_sock;                     // endpoint socket for send
-       uta_ctx_t*      ctx;
-       int     group;                                  // selected group to get socket for
-       int send_again;                         // true if the message must be sent again
-       rmr_mbuf_t*     clone_m;                // cloned message for an nth send
-       int sock_ok;                            // got a valid socket from round robin select
-       uint64_t key;                           // mtype or sub-id/mtype sym table key
-       int     altk_ok = 0;                    // set true if we can lookup on alternate key if mt/sid lookup fails
-
-       if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
-               errno = EINVAL;                                                                                         // if msg is null, this is their clue
-               if( msg != NULL ) {
-                       msg->state = RMR_ERR_BADARG;
-                       errno = EINVAL;                                                                                 // must ensure it's not eagain
-               }
-               return msg;
-       }
-
-       errno = 0;                                                                                                      // clear; nano might set, but ensure it's not left over if it doesn't
-       if( msg->header == NULL ) {
-               fprintf( stderr, "rmr_send_msg: ERROR: message had no header\n" );
-               msg->state = RMR_ERR_NOHDR;
-               errno = EBADMSG;                                                                                        // must ensure it's not eagain
-               return msg;
-       }
-
-       if( max_to < 0 ) {
-               max_to = ctx->send_retries;             // convert to retries
-       }
-
-       send_again = 1;                                                                                 // force loop entry
-       group = 0;                                                                                              // always start with group 0
-
-       key = build_rt_key( msg->sub_id, msg->mtype );                  // route table key to find the entry
-       if( msg->sub_id != UNSET_SUBID ) {
-               altk_ok = 1;                                                                            // if caller's sub-id doesn't hit with mtype, allow mtype only key for retry
-       }
-       while( send_again ) {
-               sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock );              // round robin sel epoint; again set if mult groups
-               if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d ak_ok=%d\n",
-                               msg->mtype, send_again, group, msg->len, sock_ok, altk_ok );
-
-               if( ! sock_ok ) {
-                       if( altk_ok ) {                                                                                 // we can try with the alternate (no sub-id) key
-                               altk_ok = 0;
-                               key = build_rt_key( UNSET_SUBID, msg->mtype );          // build with just the mtype and try again
-                               send_again = 1;                                                                         // ensure we don't exit the while
-                               continue;
-                       }
+       char* d1;                                                                                                                       // point at the call-id in the header
 
 
-                       msg->state = RMR_ERR_NOENDPT;
-                       errno = ENXIO;                                                                                  // must ensure it's not eagain
-                       return msg;                                                                                             // caller can resend (maybe) or free
-               }
-
-               group++;
+       if( msg != NULL ) {
+               ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
 
 
-               if( send_again ) {
-                       clone_m = clone_msg( msg );                                                             // must make a copy as once we send this message is not available
-                       if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
-                       msg->flags |= MFL_NOALLOC;                                                              // send should not allocate a new buffer
-                       msg = send_msg( ctx, msg, nn_sock, max_to );                    // do the hard work, msg should be nil on success
-                       /*
-                       if( msg ) {
-                               // error do we need to count successes/errors, how to report some success, esp if last fails?
-                       }
-                       */
-
-                       msg = clone_m;                                                                                  // clone will be the next to send
-               } else {
-                       msg = send_msg( ctx, msg, nn_sock, max_to );                    // send the last, and allocate a new buffer; drops the clone if it was
-               }
-       }
+               d1 = DATA1_ADDR( msg->header );
+               d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
+       }       
 
 
-       return msg;                                                                     // last message caries the status of last/only send attempt
+       return mtosend_msg( vctx, msg, max_to );
 }
 
 /*
 }
 
 /*
@@ -286,7 +208,16 @@ extern rmr_mbuf_t* rmr_mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
        See rmr_stimeout() for info on setting the default timeout.
 */
 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
        See rmr_stimeout() for info on setting the default timeout.
 */
 extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
-       return rmr_mtosend_msg( vctx, msg,  -1 );                       // retries <  uses default from ctx
+       char* d1;                                                                                                               // point at the call-id in the header
+
+       if( msg != NULL ) {
+               ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
+
+               d1 = DATA1_ADDR( msg->header );
+               d1[D1_CALLID_IDX] = NO_CALL_ID;                                                                         // must blot out so it doesn't queue on a chute at the other end
+       }       
+
+       return rmr_mtosend_msg( vctx, msg,  -1 );                                                       // retries < 0  uses default from ctx
 }
 
 /*
 }
 
 /*
@@ -314,48 +245,86 @@ extern rmr_mbuf_t* rmr_send_msg( void* vctx, rmr_mbuf_t* msg ) {
                The caller must check for this and handle.
 */
 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
                The caller must check for this and handle.
 */
 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
-       nng_socket nn_sock;                     // endpoint socket for send
+       nng_socket      nn_sock;                        // endpoint socket for send
        uta_ctx_t*      ctx;
        uta_ctx_t*      ctx;
-       int state;
-       uta_mhdr_t*     hdr;
-       char*   hold_src;                       // we need the original source if send fails
-       int             sock_ok;                        // true if we found a valid endpoint socket
+       int                     state;
+       char*           hold_src;                       // we need the original source if send fails
+       char*           hold_ip;                        // also must hold original ip
+       int                     sock_ok = 0;            // true if we found a valid endpoint socket
+       endpoint_t*     ep;                                     // end point to track counts
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
                if( msg != NULL ) {
                        msg->state = RMR_ERR_BADARG;
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
                if( msg != NULL ) {
                        msg->state = RMR_ERR_BADARG;
+                       msg->tp_state = errno;
                }
                return msg;
        }
 
        errno = 0;                                                                                                              // at this point any bad state is in msg returned
        if( msg->header == NULL ) {
                }
                return msg;
        }
 
        errno = 0;                                                                                                              // at this point any bad state is in msg returned
        if( msg->header == NULL ) {
-               fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" );
+               rmr_vlog( RMR_VL_ERR, "rmr_send_msg: message had no header\n" );
                msg->state = RMR_ERR_NOHDR;
                msg->state = RMR_ERR_NOHDR;
+               msg->tp_state = errno;
                return msg;
        }
 
                return msg;
        }
 
-       sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock );                        // socket of specific endpoint
+       ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
+
+       sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep );                   // src is always used first for rts
        if( ! sock_ok ) {
        if( ! sock_ok ) {
-               msg->state = RMR_ERR_NOENDPT;
-               return msg;                                                     // preallocated msg can be reused since not given back to nn
+               if( HDR_VERSION( msg->header ) > 2 ) {                                                  // with ver2 the ip is there, try if src name not known
+                       sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep );
+               }
+               if( ! sock_ok ) {
+                       msg->state = RMR_ERR_NOENDPT;
+                       return msg;                                                                                                                             // preallocated msg can be reused since not given back to nn
+               }
        }
 
        }
 
-       msg->state = RMR_OK;                                    // ensure it is clear before send
-       hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                                 // the dest where we're returning the message to
-       strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );                        // must overlay the source to be ours
+       msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
+       hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
+       hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );                                        // both the src host and src ip
+       strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );        // must overlay the source to be ours
        msg = send_msg( ctx, msg, nn_sock, -1 );
        if( msg ) {
        msg = send_msg( ctx, msg, nn_sock, -1 );
        if( msg ) {
-               strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SID );                    // always return original source so rts can be called again
-               msg->flags |= MFL_ADDSRC;                                                                                                       // if msg given to send() it must add source
+               if( ep != NULL ) {
+                       switch( msg->state ) {
+                               case RMR_OK:
+                                       ep->scounts[EPSC_GOOD]++;
+                                       break;
+                       
+                               case RMR_ERR_RETRY:
+                                       ep->scounts[EPSC_TRANS]++;
+                                       break;
+
+                               default:
+                                       ep->scounts[EPSC_FAIL]++;
+                                       break;
+                       }
+               }
+               strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );    // always return original source so rts can be called again
+               strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );   // always return original source so rts can be called again
+               msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
        }
 
        free( hold_src );
        }
 
        free( hold_src );
+       free( hold_ip );
        return msg;
 }
 
 /*
        return msg;
 }
 
 /*
+       If multi-threading call is turned on, this invokes that mechanism with the special call
+       id of 1 and a max wait of 1 second.  If multi threaded call is not on, then the original
+       behavour (described below) is carried out.  This is safe to use when mt is enabled, but
+       the user app is invoking rmr_call() from only one thread, and the caller doesn't need 
+       a flexible timeout.
+
+       On timeout this function will return a nil pointer. If the original message could not
+       be sent without blocking, it will be returned with the RMR_ERR_RETRY set as the status.
+
+       Original behavour:
        Call sends the message based on message routing using the message type, and waits for a
        response message to arrive with the same transaction id that was in the outgoing message.
        If, while wiating for the expected response,  messages are received which do not have the
        Call sends the message based on message routing using the message type, and waits for a
        response message to arrive with the same transaction id that was in the outgoing message.
        If, while wiating for the expected response,  messages are received which do not have the
@@ -373,8 +342,6 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
                EAGAIN  -- the underlying message system wsa interrupted or the device was busy;
                                        user should call this function with the message again.
 
                EAGAIN  -- the underlying message system wsa interrupted or the device was busy;
                                        user should call this function with the message again.
 
-
-       QUESTION:  should user specify the number of messages to allow to queue?
 */
 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
        uta_ctx_t*              ctx;
 */
 extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
        uta_ctx_t*              ctx;
@@ -387,9 +354,13 @@ extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
                return msg;
        }
 
                return msg;
        }
 
+       if( ctx->flags & CFL_MTC_ENABLED ) {                            // if multi threaded call is on, use that
+               return rmr_mt_call( vctx, msg, 1, 1000 );               // use the reserved call-id of 1 and wait up to 1 sec
+       }
+
        memcpy( expected_id, msg->xaction, RMR_MAX_XID );
        expected_id[RMR_MAX_XID] = 0;                                   // ensure it's a string
        memcpy( expected_id, msg->xaction, RMR_MAX_XID );
        expected_id[RMR_MAX_XID] = 0;                                   // ensure it's a string
-       if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rmr_call is making call, waiting for (%s)\n", expected_id );
+       if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rmr_call is making call, waiting for (%s)\n", expected_id );
        errno = 0;
        msg->flags |= MFL_NOALLOC;                                              // we don't need a new buffer from send
 
        errno = 0;
        msg->flags |= MFL_NOALLOC;                                              // we don't need a new buffer from send
 
@@ -398,6 +369,7 @@ extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
                if( msg->state != RMR_ERR_RETRY ) {
                        msg->state = RMR_ERR_CALLFAILED;                // errno not available to all wrappers; don't stomp if marked retry
                }
                if( msg->state != RMR_ERR_RETRY ) {
                        msg->state = RMR_ERR_CALLFAILED;                // errno not available to all wrappers; don't stomp if marked retry
                }
+               msg->tp_state = errno;
                return msg;
        }
 
                return msg;
        }
 
@@ -418,14 +390,19 @@ extern rmr_mbuf_t* rmr_rcv_msg( void* vctx, rmr_mbuf_t* old_msg ) {
        rmr_mbuf_t*     qm;                             // message that was queued on the ring
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
        rmr_mbuf_t*     qm;                             // message that was queued on the ring
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               errno = EINVAL;
                if( old_msg != NULL ) {
                        old_msg->state = RMR_ERR_BADARG;
                if( old_msg != NULL ) {
                        old_msg->state = RMR_ERR_BADARG;
+                       old_msg->tp_state = errno;
                }
                }
-               errno = EINVAL;
                return old_msg;
        }
        errno = 0;
 
                return old_msg;
        }
        errno = 0;
 
+       if( ctx->flags & CFL_MTC_ENABLED ) {                                            // must pop from ring with a semaphore dec first
+               return rmr_mt_rcv( ctx, old_msg, -1 );
+       }
+
        qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
        if( qm != NULL ) {
                if( old_msg ) {
        qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
        if( qm != NULL ) {
                if( old_msg ) {
@@ -451,13 +428,18 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
        rmr_mbuf_t* msg;
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
        rmr_mbuf_t* msg;
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               errno = EINVAL;
                if( old_msg != NULL ) {
                        old_msg->state = RMR_ERR_BADARG;
                if( old_msg != NULL ) {
                        old_msg->state = RMR_ERR_BADARG;
+                       old_msg->tp_state = errno;
                }
                }
-               errno = EINVAL;
                return old_msg;
        }
 
                return old_msg;
        }
 
+       if( ctx->flags & CFL_MTC_ENABLED ) {                                            // must pop from ring with a semaphore dec first
+               return rmr_mt_rcv( ctx, old_msg, ms_to );
+       }
+
        qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
        if( qm != NULL ) {
                if( old_msg ) {
        qm = (rmr_mbuf_t *) uta_ring_extract( ctx->mring );                     // pop if queued
        if( qm != NULL ) {
                if( old_msg ) {
@@ -473,7 +455,12 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
                if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) {
                fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno );
                        free( eps );
                if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) {
                fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno );
                        free( eps );
-                       return NULL;
+                       ctx->eps = NULL;
+                       if( old_msg != NULL ) {
+                               old_msg->state = RMR_ERR_INITFAILED;
+                               old_msg->tp_state = errno;
+                       }
+                       return old_msg;
                }
 
                eps->nng_fd = rmr_get_rcvfd( ctx );
                }
 
                eps->nng_fd = rmr_get_rcvfd( ctx );
@@ -483,7 +470,12 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
                if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 )  {
                fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
                        free( eps );
                if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 )  {
                fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
                        free( eps );
-                       return NULL;
+                       ctx->eps = NULL;
+                       if( old_msg != NULL ) {
+                               old_msg->state = RMR_ERR_INITFAILED;
+                               old_msg->tp_state = errno;
+                       }
+                       return old_msg;
                }
 
                ctx->eps = eps;
                }
 
                ctx->eps = eps;
@@ -502,6 +494,7 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
        nready = epoll_wait( eps->ep_fd, eps->events, 1, ms_to );     // block until something or timedout
        if( nready <= 0 ) {                                             // we only wait on ours, so we assume ready means it's ours
                msg->state = RMR_ERR_TIMEOUT;
        nready = epoll_wait( eps->ep_fd, eps->events, 1, ms_to );     // block until something or timedout
        if( nready <= 0 ) {                                             // we only wait on ours, so we assume ready means it's ours
                msg->state = RMR_ERR_TIMEOUT;
+               msg->tp_state = errno;
        } else {
                return rcv_msg( ctx, msg );                                                             // receive it and return it
        }
        } else {
                return rcv_msg( ctx, msg );                                                             // receive it and return it
        }
@@ -525,10 +518,11 @@ extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect,
        int     exp_len = 0;                    // length of expected ID
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
        int     exp_len = 0;                    // length of expected ID
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               errno = EINVAL;
                if( msg != NULL ) {
                        msg->state = RMR_ERR_BADARG;
                if( msg != NULL ) {
                        msg->state = RMR_ERR_BADARG;
+                       msg->tp_state = errno;
                }
                }
-               errno = EINVAL;
                return msg;
        }
 
                return msg;
        }
 
@@ -542,44 +536,42 @@ extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect,
        if( exp_len > RMR_MAX_XID ) {
                exp_len = RMR_MAX_XID;
        }
        if( exp_len > RMR_MAX_XID ) {
                exp_len = RMR_MAX_XID;
        }
-       if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n",  expect );
+       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rcv_specific waiting for id=%s\n",  expect );
 
        while( queued < allow2queue ) {
                msg = rcv_msg( ctx, msg );                                      // hard wait for next
                if( msg->state == RMR_OK ) {
                        if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
 
        while( queued < allow2queue ) {
                msg = rcv_msg( ctx, msg );                                      // hard wait for next
                if( msg->state == RMR_OK ) {
                        if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
-                               if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
+                               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
                                return msg;
                        }
 
                        if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
                                return msg;
                        }
 
                        if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
-                               if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" );
+                               if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rcv_specific ring is full\n" );
                                errno = ENOBUFS;
                                return NULL;
                        }
 
                                errno = ENOBUFS;
                                return NULL;
                        }
 
-                       if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype );
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rcv_specific queued message type=%d\n", msg->mtype );
                        queued++;
                        msg = NULL;
                }
        }
 
                        queued++;
                        msg = NULL;
                }
        }
 
-       if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect );
+       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rcv_specific timeout waiting for %s\n", expect );
        errno = ETIMEDOUT;
        return NULL;
 }
 
        errno = ETIMEDOUT;
        return NULL;
 }
 
-//  CAUTION:  these are not supported as they must be set differently (between create and open) in NNG.
-//                             until those details are worked out, these generate a warning.
 /*
 /*
-       Set send timeout. The value time is assumed to be microseconds.  The timeout is the
-       rough maximum amount of time that RMr will block on a send attempt when the underlying
+       Set send timeout. The value time is assumed to be milliseconds.  The timeout is the
+       _rough_ maximum amount of time that RMr will block on a send attempt when the underlying
        mechnism indicates eagain or etimeedout.  All other error conditions are reported
        without this delay. Setting a timeout of 0 causes no retries to be attempted in
        mechnism indicates eagain or etimeedout.  All other error conditions are reported
        without this delay. Setting a timeout of 0 causes no retries to be attempted in
-       RMr code. Setting a timeout of 1 causes RMr to spin up to 10K retries before returning,
-       but without issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us)
-       after every 10K send attempts until the time value is reached. Retries are abandoned
-       if NNG returns anything other than NNG_AGAIN or NNG_TIMEDOUT.
+       RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
+       but _without_ issuing a sleep.  If timeout is > 1, then RMr will issue a sleep (1us)
+       after every 1K send attempts until the "time" value is reached. Retries are abandoned
+       if NNG returns anything other than NNG_EAGAIN or NNG_ETIMEDOUT.
 
        The default, if this function is not used, is 1; meaning that RMr will retry, but will
        not enter a sleep.  In all cases the caller should check the status in the message returned
 
        The default, if this function is not used, is 1; meaning that RMr will retry, but will
        not enter a sleep.  In all cases the caller should check the status in the message returned
@@ -604,9 +596,11 @@ extern int rmr_set_stimeout( void* vctx, int time ) {
 
 /*
        Set receive timeout -- not supported in nng implementation
 
 /*
        Set receive timeout -- not supported in nng implementation
+
+       CAUTION:  this is not supported as they must be set differently (between create and open) in NNG.
 */
 extern int rmr_set_rtimeout( void* vctx, int time ) {
 */
 extern int rmr_set_rtimeout( void* vctx, int time ) {
-       fprintf( stderr, "[WRN] Current implementation of RMR ontop of NNG does not support setting a receive timeout\n" );
+       rmr_vlog( RMR_VL_WARN, "Current implementation of RMR ontop of NNG does not support setting a receive timeout\n" );
        return 0;
 }
 
        return 0;
 }
 
@@ -628,13 +622,19 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        char*   proto_port;
        char    wbuf[1024];                                     // work buffer
        char*   tok;                                            // pointer at token in a buffer
        char*   proto_port;
        char    wbuf[1024];                                     // work buffer
        char*   tok;                                            // pointer at token in a buffer
+       char*   tok2;
        int             state;
        int             state;
+       int             old_vlevel = 0;
+
+       old_vlevel = rmr_vlog_init();           // initialise and get the current level
+       rmr_set_vlevel( RMR_VL_INFO );          // we WILL announce our version etc
 
        if( ! announced ) {
 
        if( ! announced ) {
-               fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d (%s %s.%s.%s built: %s)\n",
-                       RMR_MSG_VER, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
+               rmr_vlog( RMR_VL_INFO, "ric message routing library on NNG/d mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
+                       RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
                announced = 1;
        }
                announced = 1;
        }
+       rmr_set_vlevel( old_vlevel );           // return logging to the desired state
 
        errno = 0;
        if( uproto_port == NULL ) {
 
        errno = 0;
        if( uproto_port == NULL ) {
@@ -650,7 +650,14 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        memset( ctx, 0, sizeof( uta_ctx_t ) );
 
        ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
        memset( ctx, 0, sizeof( uta_ctx_t ) );
 
        ctx->send_retries = 1;                                                  // default is not to sleep at all; RMr will retry about 10K times before returning
-       ctx->mring = uta_mk_ring( 128 );                                // message ring to hold asynch msgs received while waiting for call response
+       ctx->d1_len = 4;                                                                // data1 space in header -- 4 bytes for now
+
+       if( flags & RMRFL_MTCALL ) {                                    // mt call support is on, need bigger ring
+               ctx->mring = uta_mk_ring( 2048 );                       // message ring filled by rcv thread
+               init_mtcall( ctx );                                                     // set up call chutes
+       } else {
+               ctx->mring = uta_mk_ring( 128 );                        // ring filled only on blocking call
+       }
 
        ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
        if( max_msg_size > 0 ) {
 
        ctx->max_plen = RMR_MAX_RCV_BYTES;                              // max user payload lengh
        if( max_msg_size > 0 ) {
@@ -661,7 +668,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        //uta_lookup_rtg( ctx );                                                        // attempt to fill in rtg info; rtc will handle missing values/errors
 
        if( nng_pull0_open( &ctx->nn_sock )  !=  0 ) {          // and assign the mode
        //uta_lookup_rtg( ctx );                                                        // attempt to fill in rtg info; rtc will handle missing values/errors
 
        if( nng_pull0_open( &ctx->nn_sock )  !=  0 ) {          // and assign the mode
-               fprintf( stderr, "[CRI] rmr_init: unable to initialise nng listen (pull) socket: %d\n", errno );
+               rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to initialise nng listen (pull) socket: %d\n", errno );
                free_ctx( ctx );
                return NULL;
        }
                free_ctx( ctx );
                return NULL;
        }
@@ -677,21 +684,59 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
                port = proto_port;                      // assume something like "1234" was passed
        }
 
                port = proto_port;                      // assume something like "1234" was passed
        }
 
-       if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
-               fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
-               return NULL;
-       }
-       if( (tok = strchr( wbuf, '.' )) != NULL ) {
-               *tok = 0;                                                                       // we don't keep domain portion
+       if( (tok = getenv( ENV_SRC_ID )) != NULL ) {                                                    // env var overrides what we dig from system
+               tok = strdup( tok );                                    // something we can destroy
+               if( *tok == '[' ) {                                             // we allow an ipv6 address here
+                       tok2 = strchr( tok, ']' ) + 1;          // we will chop the port (...]:port) if given
+               } else {
+                       tok2 = strchr( tok, ':' );                      // find :port if there so we can chop
+               }
+               if( tok2  && *tok2 ) {                                  // if it's not the end of string marker
+                       *tok2 = 0;                                                      // make it so
+               }
+
+               snprintf( wbuf, RMR_MAX_SRC, "%s", tok );
+               free( tok );
+       } else {
+               if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
+                       rmr_vlog( RMR_VL_CRIT, "rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
+                       free( proto_port );
+                       return NULL;
+               }
+               if( (tok = strchr( wbuf, '.' )) != NULL ) {
+                       *tok = 0;                                                                       // we don't keep domain portion
+               }
        }
        }
-       ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SID );
-       if( snprintf( ctx->my_name, RMR_MAX_SID, "%s:%s", wbuf, port ) >= RMR_MAX_SID ) {                       // our registered name is host:port
-               fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SID, wbuf, port );
+
+       ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
+       if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
+               rmr_vlog( RMR_VL_CRIT, "rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
                return NULL;
        }
 
                return NULL;
        }
 
-       ctx->ip_list = mk_ip_list( port );              // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
+       if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
+               if( atoi( tok ) > 0 ) {
+                       flags |= RMRFL_NAME_ONLY;                                       // don't allow IP addreess to go out in messages
+               }
+       }
+
+       ctx->ip_list = mk_ip_list( port );                              // suss out all IP addresses we can find on the box, and bang on our port for RT comparisons
+       if( flags & RMRFL_NAME_ONLY ) {
+               ctx->my_ip = strdup( ctx->my_name );                    // user application or env var has specified that IP address is NOT sent out, use name
+       } else {
+               ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
+               if( ctx->my_ip == NULL ) {
+                       rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
+                       ctx->my_ip = strdup( ctx->my_name );            // if we cannot suss it out, use the name rather than a nil pointer
+               }
+       }
+       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "default ip address: %s\n", ctx->my_ip );
 
 
+       if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
+               if( *tok == '1' ) {
+                       ctx->flags |= CTXFL_WARN;                                       // turn on some warnings (not all, just ones that shouldn't impact performance)
+               }
+       }
 
 
        if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
 
 
        if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
@@ -701,16 +746,33 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        //       rather than using this generic listen() call.
        snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port );
        if( (state = nng_listen( ctx->nn_sock, bind_info, NULL, NO_FLAGS )) != 0 ) {
        //       rather than using this generic listen() call.
        snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port );
        if( (state = nng_listen( ctx->nn_sock, bind_info, NULL, NO_FLAGS )) != 0 ) {
-               fprintf( stderr, "[CRIT] rmr_init: unable to start nng listener for %s: %s\n", bind_info, nng_strerror( state ) );
+               rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start nng listener for %s: %s\n", bind_info, nng_strerror( state ) );
                nng_close( ctx->nn_sock );
                nng_close( ctx->nn_sock );
+               free( proto_port );
                free_ctx( ctx );
                return NULL;
        }
 
                free_ctx( ctx );
                return NULL;
        }
 
-       if( !(flags & FL_NOTHREAD) ) {                                                                          // skip if internal function that doesnt need an rtc
-               if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the rt collector thread
-                       fprintf( stderr, "[WARN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
+       if( flags & FL_NOTHREAD ) {                                                             // if no rtc thread, we still need an empty route table for wormholes
+               ctx->rtable = rt_clone_space( NULL, NULL, 0 );          // so create one
+       } else {
+               if( (tok = getenv( ENV_RTG_RAW )) != NULL  && *tok == '0' ) {                   // use RMR for Rmgr comm only when specifically off
+                       if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the rmr based rt collector thread
+                               rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
+                       }
+               } else {
+                       if( pthread_create( &ctx->rtc_th,  NULL, raw_rtc, (void *) ctx ) ) {    // kick the raw msg rt collector thread
+                               rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
+                       }
+               }
+       }
+
+       if( (flags & RMRFL_MTCALL) && ! (ctx->flags & CFL_MTC_ENABLED) ) {      // mt call support is on, must start the listener thread if not running
+               ctx->flags |= CFL_MTC_ENABLED;
+               if( pthread_create( &ctx->mtc_th,  NULL, mt_receive, (void *) ctx ) ) {         // kick the receiver
+                       rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
                }
                }
+               
        }
 
        free( proto_port );
        }
 
        free( proto_port );
@@ -790,7 +852,7 @@ extern int rmr_get_rcvfd( void* vctx ) {
        }
 
        if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
        }
 
        if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
-               fprintf( stderr, "[WRN] rmr cannot get recv fd: %s\n", nng_strerror( state ) );
+               rmr_vlog( RMR_VL_WARN, "rmr cannot get recv fd: %s\n", nng_strerror( state ) );
                return -1;
        }
 
                return -1;
        }
 
@@ -818,4 +880,283 @@ extern void rmr_close( void* vctx ) {
 }
 
 
 }
 
 
+// ----- multi-threaded call/receive support -------------------------------------------------
 
 
+/*
+       Blocks on the receive ring chute semaphore and then reads from the ring
+       when it is tickled.  If max_wait is -1 then the function blocks until
+       a message is ready on the ring. Else max_wait is assumed to be the number
+       of millaseconds to wait before returning a timeout message.
+*/
+extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
+       uta_ctx_t*      ctx;
+       uta_mhdr_t*     hdr;                    // header in the transport buffer
+       chute_t*        chute;
+       struct timespec ts;                     // time info if we have a timeout
+       long    new_ms;                         // adjusted mu-sec
+       long    seconds = 0;            // max wait seconds
+       long    nano_sec;                       // max wait xlated to nano seconds
+       int             state;
+       rmr_mbuf_t*     ombuf;                  // mbuf user passed; if we timeout we return state here
+       
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               errno = EINVAL;
+               if( mbuf ) {
+                       mbuf->state = RMR_ERR_BADARG;
+                       mbuf->tp_state = errno;
+               }
+               return mbuf;
+       }
+
+       if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
+               errno = EINVAL;
+               if( mbuf != NULL ) {
+                       mbuf->state = RMR_ERR_NOTSUPP;
+                       mbuf->tp_state = errno;
+               }
+               return mbuf;
+       }
+
+       ombuf = mbuf;
+       if( ombuf ) {
+               ombuf->state = RMR_ERR_TIMEOUT;                 // preset if for failure
+               ombuf->len = 0;
+       }
+
+       chute = &ctx->chutes[0];                                        // chute 0 used only for its semaphore
+
+       if( max_wait >= 0 ) {
+               clock_gettime( CLOCK_REALTIME, &ts );   
+
+               if( max_wait > 999 ) {
+                       seconds = max_wait / 1000;
+                       max_wait -= seconds * 1000;
+                       ts.tv_sec += seconds;
+               }
+               if( max_wait > 0 ) {
+                       nano_sec = max_wait * 1000000;
+                       ts.tv_nsec += nano_sec;
+                       if( ts.tv_nsec > 999999999 ) {
+                               ts.tv_nsec -= 999999999;
+                               ts.tv_sec++;
+                       }
+               }
+
+               seconds = 1;                                                                                                    // use as flag later to invoked timed wait
+       }
+
+       errno = EINTR;
+       state = -1;
+       while( state < 0 && errno == EINTR ) {
+               if( seconds ) {
+                       state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
+               } else {
+                       state = sem_wait( &chute->barrier );
+               }
+       }
+
+       if( state < 0 ) {
+               mbuf = ombuf;                           // return caller's buffer if they passed one in
+       } else {
+               errno = 0;                                              // interrupted call state could be left; clear
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mt_rcv extracting from normal ring\n" );
+               if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
+                       mbuf->state = RMR_OK;
+
+                       if( ombuf ) {
+                               rmr_free_msg( ombuf );                                  // we cannot reuse as mbufs are queued on the ring
+                       }
+               } else {
+                       errno = ETIMEDOUT;
+                       mbuf = ombuf;                           // no buffer, return user's if there
+               }
+       }
+
+       if( mbuf ) {
+               mbuf->tp_state = errno;
+       }
+       return mbuf;
+}
+
+
+/*
+       This does the real work behind both of the outward facing call functions. See 
+       the rmr_mt_call() description for details modulo the comments blow.
+
+       If ep is given, then we skip the normal route table endpoint selection. This is
+       likely a wormhole call.
+*/
+static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait, endpoint_t* ep ) {
+       rmr_mbuf_t* ombuf;                      // original mbuf passed in
+       uta_ctx_t*      ctx;
+       uta_mhdr_t*     hdr;                    // header in the transport buffer
+       chute_t*        chute;
+       unsigned char*  d1;                     // d1 data in header
+       struct timespec ts;                     // time info if we have a timeout
+       long    new_ms;                         // adjusted mu-sec
+       long    seconds = 0;            // max wait seconds
+       long    nano_sec;                       // max wait xlated to nano seconds
+       int             state;
+       
+       errno = EINVAL;
+       if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
+               if( mbuf ) {
+                       mbuf->tp_state = errno;
+                       mbuf->state = RMR_ERR_BADARG;
+               }
+               return mbuf;
+       }
+
+       if( ! (ctx->flags & CFL_MTC_ENABLED) ) {
+               mbuf->state = RMR_ERR_NOTSUPP;
+               mbuf->tp_state = errno;
+               return mbuf;
+       }
+
+       if( call_id > MAX_CALL_ID || call_id < 2 ) {                                    // 0 and 1 are reserved; user app cannot supply them
+               mbuf->state = RMR_ERR_BADARG;
+               mbuf->tp_state = errno;
+               return mbuf;
+       }
+
+       ombuf = mbuf;                                                                                                   // save to return timeout status with
+
+       chute = &ctx->chutes[call_id];
+       if( chute->mbuf != NULL ) {                                                                             // probably a delayed message that wasn't dropped
+               rmr_free_msg( chute->mbuf );
+               chute->mbuf = NULL;
+       }
+       
+       hdr = (uta_mhdr_t *) mbuf->header;
+       hdr->flags |= HFL_CALL_MSG;                                                                             // must signal this sent with a call
+       memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID );                    // xaction that we will wait for
+       d1 = DATA1_ADDR( hdr );
+       d1[D1_CALLID_IDX] = (unsigned char) call_id;                                    // set the caller ID for the response
+       mbuf->flags |= MFL_NOALLOC;                                                                             // send message without allocating a new one (expect nil from mtosend
+
+       if( max_wait >= 0 ) {
+               clock_gettime( CLOCK_REALTIME, &ts );   
+
+               if( max_wait > 999 ) {
+                       seconds = max_wait / 1000;
+                       max_wait -= seconds * 1000;
+                       ts.tv_sec += seconds;
+               }
+               if( max_wait > 0 ) {
+                       nano_sec = max_wait * 1000000;
+                       ts.tv_nsec += nano_sec;
+                       if( ts.tv_nsec > 999999999 ) {
+                               ts.tv_nsec -= 999999999;
+                               ts.tv_sec++;
+                       }
+               }
+
+               seconds = 1;                                                                            // use as flag later to invoked timed wait
+       }
+
+       if( ep != NULL ) {
+               mbuf = mtosend_msg( ctx, mbuf, 0 );                                     // use internal function so as not to strip call-id; should be nil on success!
+       } else {
+               mbuf = send_msg( ctx, mbuf, ep->nn_sock, -1 );
+       }
+       if( mbuf ) {
+               if( mbuf->state != RMR_OK ) {
+                       mbuf->tp_state = errno;
+                       return mbuf;                                                                    // timeout or unable to connect or no endpoint are most likely issues
+               }
+       }
+
+       state = -1;                                                                                             
+       errno = 0;
+       while( chute->mbuf == NULL && ! errno ) {
+               if( seconds ) {
+                       state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
+               } else {
+                       state = sem_wait( &chute->barrier );
+               }
+
+               if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
+                       errno = 0;
+               }
+
+               if( chute->mbuf != NULL ) {                                                                             // offload receiver thread and check xaction buffer here
+                       if( memcmp( chute->expect, chute->mbuf->xaction, RMR_MAX_XID ) != 0 ) {
+                               rmr_free_msg( chute->mbuf );
+                               chute->mbuf = NULL;
+                               errno = 0;
+                       }
+               }
+       }
+
+       if( state < 0 ) {
+               return NULL;                                    // leave errno as set by sem wait call
+       }
+
+       if( (mbuf = chute->mbuf) != NULL ) {
+               mbuf->state = RMR_OK;
+       }
+       chute->mbuf = NULL;
+
+       return mbuf;
+}
+
+/*
+       Accept a message buffer and caller ID, send the message and then wait
+       for the receiver to tickle the semaphore letting us know that a message
+       has been received. The call_id is a value between 2 and 255, inclusive; if
+       it's not in this range an error will be returned. Max wait is the amount
+       of time in millaseconds that the call should block for. If 0 is given
+       then no timeout is set.
+
+       If the mt_call feature has not been initialised, then the attempt to use this
+       funciton will fail with RMR_ERR_NOTSUPP
+
+       If no matching message is received before the max_wait period expires, a
+       nil pointer is returned, and errno is set to ETIMEOUT. If any other error
+       occurs after the message has been sent, then a nil pointer is returned
+       with errno set to some other value.
+
+       This is now just a wrapper to the real work horse so that we can provide
+       this and wormhole call functions without duplicating code.
+
+*/
+extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
+       return mt_call( vctx, mbuf, call_id, max_wait, NULL );
+}
+
+/*
+       Given an existing message buffer, reallocate the payload portion to
+       be at least new_len bytes.  The message header will remain such that
+       the caller may use the rmr_rts_msg() function to return a payload
+       to the sender. 
+
+       The mbuf passed in may or may not be reallocated and the caller must
+       use the returned pointer and should NOT assume that it can use the 
+       pointer passed in with the exceptions based on the clone flag.
+
+       If the clone flag is set, then a duplicated message, with larger payload
+       size, is allocated and returned.  The old_msg pointer in this situation is
+       still valid and must be explicitly freed by the application. If the clone 
+       message is not set (0), then any memory management of the old message is
+       handled by the function.
+
+       If the copy flag is set, the contents of the old message's payload is 
+       copied to the reallocated payload.  If the flag is not set, then the 
+       contents of the payload is undetermined.
+*/
+extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
+       if( old_msg == NULL ) {
+               return NULL;
+       }
+
+       return realloc_payload( old_msg, new_len, copy, clone );        // message allocation is transport specific, so this is a passthrough
+}
+
+/*
+       The following functions are "dummies" as NNG has no concept of supporting
+       them, but are needed to resolve calls at link time.
+*/
+
+extern void rmr_set_fack( void* p ) {
+       return;
+}