Add the wormhole call function
[ric-plt/lib/rmr.git] / src / rmr / nng / src / rmr_nng.c
index 1b6e1f0..8d55b44 100644 (file)
@@ -1,8 +1,8 @@
-// : vi ts=4 sw=4 noet :
+// vim: ts=4 sw=4 noet :
 /*
 ==================================================================================
-       Copyright (c) 2019 Nokia
-       Copyright (c) 2018-2019 AT&T Intellectual Property.
+       Copyright (c) 2019-2020 Nokia
+       Copyright (c) 2018-2020 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
@@ -64,6 +64,7 @@
 #include "rmr_agnostic.h"              // agnostic things (must be included before private)
 #include "rmr_nng_private.h"   // things that we need too
 #include "rmr_symtab.h"
+#include "rmr_logging.h"
 
 #include "ring_static.c"                       // message ring support
 #include "rt_generic_static.c"         // route table things not transport specific
@@ -248,7 +249,9 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
        uta_ctx_t*      ctx;
        int                     state;
        char*           hold_src;                       // we need the original source if send fails
+       char*           hold_ip;                        // also must hold original ip
        int                     sock_ok = 0;            // true if we found a valid endpoint socket
+       endpoint_t*     ep;                                     // end point to track counts
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) {               // bad stuff, bail fast
                errno = EINVAL;                                                                                         // if msg is null, this is their clue
@@ -261,18 +264,19 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
 
        errno = 0;                                                                                                              // at this point any bad state is in msg returned
        if( msg->header == NULL ) {
-               fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" );
+               rmr_vlog( RMR_VL_ERR, "rmr_send_msg: message had no header\n" );
                msg->state = RMR_ERR_NOHDR;
                msg->tp_state = errno;
                return msg;
        }
 
        ((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG;                   // must ensure call flag is off
-       if( HDR_VERSION( msg->header ) > 2 ) {                                                  // new version uses sender's ip address for rts
-               sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock );                      // default to IP based rts
-       }
+
+       sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep );                   // src is always used first for rts
        if( ! sock_ok ) {
-               sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock );                // IP  not in rt, try name
+               if( HDR_VERSION( msg->header ) > 2 ) {                                                  // with ver2 the ip is there, try if src name not known
+                       sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep );
+               }
                if( ! sock_ok ) {
                        msg->state = RMR_ERR_NOENDPT;
                        return msg;                                                                                                                             // preallocated msg can be reused since not given back to nn
@@ -281,14 +285,32 @@ extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
 
        msg->state = RMR_OK;                                                                                                                            // ensure it is clear before send
        hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src );                                         // the dest where we're returning the message to
+       hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip );                                        // both the src host and src ip
        strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );        // must overlay the source to be ours
        msg = send_msg( ctx, msg, nn_sock, -1 );
        if( msg ) {
+               if( ep != NULL ) {
+                       switch( msg->state ) {
+                               case RMR_OK:
+                                       ep->scounts[EPSC_GOOD]++;
+                                       break;
+                       
+                               case RMR_ERR_RETRY:
+                                       ep->scounts[EPSC_TRANS]++;
+                                       break;
+
+                               default:
+                                       ep->scounts[EPSC_FAIL]++;
+                                       break;
+                       }
+               }
                strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC );    // always return original source so rts can be called again
+               strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );   // always return original source so rts can be called again
                msg->flags |= MFL_ADDSRC;                                                                                                               // if msg given to send() it must add source
        }
 
        free( hold_src );
+       free( hold_ip );
        return msg;
 }
 
@@ -338,7 +360,7 @@ extern rmr_mbuf_t* rmr_call( void* vctx, rmr_mbuf_t* msg ) {
 
        memcpy( expected_id, msg->xaction, RMR_MAX_XID );
        expected_id[RMR_MAX_XID] = 0;                                   // ensure it's a string
-       if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rmr_call is making call, waiting for (%s)\n", expected_id );
+       if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rmr_call is making call, waiting for (%s)\n", expected_id );
        errno = 0;
        msg->flags |= MFL_NOALLOC;                                              // we don't need a new buffer from send
 
@@ -433,7 +455,12 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
                if( (eps->ep_fd = epoll_create1( 0 )) < 0 ) {
                fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno );
                        free( eps );
-                       return NULL;
+                       ctx->eps = NULL;
+                       if( old_msg != NULL ) {
+                               old_msg->state = RMR_ERR_INITFAILED;
+                               old_msg->tp_state = errno;
+                       }
+                       return old_msg;
                }
 
                eps->nng_fd = rmr_get_rcvfd( ctx );
@@ -443,7 +470,12 @@ extern rmr_mbuf_t* rmr_torcv_msg( void* vctx, rmr_mbuf_t* old_msg, int ms_to ) {
                if( epoll_ctl( eps->ep_fd, EPOLL_CTL_ADD, eps->nng_fd, &eps->epe ) != 0 )  {
                fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
                        free( eps );
-                       return NULL;
+                       ctx->eps = NULL;
+                       if( old_msg != NULL ) {
+                               old_msg->state = RMR_ERR_INITFAILED;
+                               old_msg->tp_state = errno;
+                       }
+                       return old_msg;
                }
 
                ctx->eps = eps;
@@ -504,29 +536,29 @@ extern rmr_mbuf_t* rmr_rcv_specific( void* vctx, rmr_mbuf_t* msg, char* expect,
        if( exp_len > RMR_MAX_XID ) {
                exp_len = RMR_MAX_XID;
        }
-       if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n",  expect );
+       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rcv_specific waiting for id=%s\n",  expect );
 
        while( queued < allow2queue ) {
                msg = rcv_msg( ctx, msg );                                      // hard wait for next
                if( msg->state == RMR_OK ) {
                        if( memcmp( msg->xaction, expect, exp_len ) == 0 ) {                    // got it -- return it
-                               if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
+                               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
                                return msg;
                        }
 
                        if( ! uta_ring_insert( ctx->mring, msg ) ) {                                    // just queue, error if ring is full
-                               if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" );
+                               if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rcv_specific ring is full\n" );
                                errno = ENOBUFS;
                                return NULL;
                        }
 
-                       if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype );
+                       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rcv_specific queued message type=%d\n", msg->mtype );
                        queued++;
                        msg = NULL;
                }
        }
 
-       if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect );
+       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rcv_specific timeout waiting for %s\n", expect );
        errno = ETIMEDOUT;
        return NULL;
 }
@@ -568,7 +600,7 @@ extern int rmr_set_stimeout( void* vctx, int time ) {
        CAUTION:  this is not supported as they must be set differently (between create and open) in NNG.
 */
 extern int rmr_set_rtimeout( void* vctx, int time ) {
-       fprintf( stderr, "[WRN] Current implementation of RMR ontop of NNG does not support setting a receive timeout\n" );
+       rmr_vlog( RMR_VL_WARN, "Current implementation of RMR ontop of NNG does not support setting a receive timeout\n" );
        return 0;
 }
 
@@ -590,13 +622,19 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        char*   proto_port;
        char    wbuf[1024];                                     // work buffer
        char*   tok;                                            // pointer at token in a buffer
+       char*   tok2;
        int             state;
+       int             old_vlevel = 0;
+
+       old_vlevel = rmr_vlog_init();           // initialise and get the current level
+       rmr_set_vlevel( RMR_VL_INFO );          // we WILL announce our version etc
 
        if( ! announced ) {
-               fprintf( stderr, "[INFO] ric message routing library on NNG mv=%d (%s %s.%s.%s built: %s)\n",
-                       RMR_MSG_VER, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
+               rmr_vlog( RMR_VL_INFO, "ric message routing library on NNG/d mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
+                       RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
                announced = 1;
        }
+       rmr_set_vlevel( old_vlevel );           // return logging to the desired state
 
        errno = 0;
        if( uproto_port == NULL ) {
@@ -630,7 +668,7 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        //uta_lookup_rtg( ctx );                                                        // attempt to fill in rtg info; rtc will handle missing values/errors
 
        if( nng_pull0_open( &ctx->nn_sock )  !=  0 ) {          // and assign the mode
-               fprintf( stderr, "[CRI] rmr_init: unable to initialise nng listen (pull) socket: %d\n", errno );
+               rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to initialise nng listen (pull) socket: %d\n", errno );
                free_ctx( ctx );
                return NULL;
        }
@@ -646,16 +684,33 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
                port = proto_port;                      // assume something like "1234" was passed
        }
 
-       if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
-               fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
-               return NULL;
-       }
-       if( (tok = strchr( wbuf, '.' )) != NULL ) {
-               *tok = 0;                                                                       // we don't keep domain portion
+       if( (tok = getenv( ENV_SRC_ID )) != NULL ) {                                                    // env var overrides what we dig from system
+               tok = strdup( tok );                                    // something we can destroy
+               if( *tok == '[' ) {                                             // we allow an ipv6 address here
+                       tok2 = strchr( tok, ']' ) + 1;          // we will chop the port (...]:port) if given
+               } else {
+                       tok2 = strchr( tok, ':' );                      // find :port if there so we can chop
+               }
+               if( tok2  && *tok2 ) {                                  // if it's not the end of string marker
+                       *tok2 = 0;                                                      // make it so
+               }
+
+               snprintf( wbuf, RMR_MAX_SRC, "%s", tok );
+               free( tok );
+       } else {
+               if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
+                       rmr_vlog( RMR_VL_CRIT, "rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
+                       free( proto_port );
+                       return NULL;
+               }
+               if( (tok = strchr( wbuf, '.' )) != NULL ) {
+                       *tok = 0;                                                                       // we don't keep domain portion
+               }
        }
+
        ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
        if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) {                       // our registered name is host:port
-               fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
+               rmr_vlog( RMR_VL_CRIT, "rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
                return NULL;
        }
 
@@ -671,11 +726,11 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        } else {
                ctx->my_ip = get_default_ip( ctx->ip_list );    // and (guess) at what should be the default to put into messages as src
                if( ctx->my_ip == NULL ) {
-                       fprintf( stderr, "[WRN] rmr_init: default ip address could not be sussed out, using name\n" );
-                       strcpy( ctx->my_ip, ctx->my_name );                     // if we cannot suss it out, use the name rather than a nil pointer
+                       rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
+                       ctx->my_ip = strdup( ctx->my_name );            // if we cannot suss it out, use the name rather than a nil pointer
                }
        }
-       if( DEBUG ) fprintf( stderr, "[DBUG] default ip address: %s\n", ctx->my_ip );
+       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "default ip address: %s\n", ctx->my_ip );
 
        if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
                if( *tok == '1' ) {
@@ -691,22 +746,31 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        //       rather than using this generic listen() call.
        snprintf( bind_info, sizeof( bind_info ), "%s://%s:%s", proto, interface, port );
        if( (state = nng_listen( ctx->nn_sock, bind_info, NULL, NO_FLAGS )) != 0 ) {
-               fprintf( stderr, "[CRI] rmr_init: unable to start nng listener for %s: %s\n", bind_info, nng_strerror( state ) );
+               rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start nng listener for %s: %s\n", bind_info, nng_strerror( state ) );
                nng_close( ctx->nn_sock );
+               free( proto_port );
                free_ctx( ctx );
                return NULL;
        }
 
-       if( !(flags & FL_NOTHREAD) ) {                                                                          // skip if internal function that doesnt need an rtc
-               if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the rt collector thread
-                       fprintf( stderr, "[WRN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
+       if( flags & FL_NOTHREAD ) {                                                             // if no rtc thread, we still need an empty route table for wormholes
+               ctx->rtable = rt_clone_space( NULL, NULL, 0 );          // so create one
+       } else {
+               if( (tok = getenv( ENV_RTG_RAW )) != NULL  && *tok == '0' ) {                   // use RMR for Rmgr comm only when specifically off
+                       if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the rmr based rt collector thread
+                               rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
+                       }
+               } else {
+                       if( pthread_create( &ctx->rtc_th,  NULL, raw_rtc, (void *) ctx ) ) {    // kick the raw msg rt collector thread
+                               rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
+                       }
                }
        }
 
        if( (flags & RMRFL_MTCALL) && ! (ctx->flags & CFL_MTC_ENABLED) ) {      // mt call support is on, must start the listener thread if not running
                ctx->flags |= CFL_MTC_ENABLED;
                if( pthread_create( &ctx->mtc_th,  NULL, mt_receive, (void *) ctx ) ) {         // kick the receiver
-                       fprintf( stderr, "[WRN] rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
+                       rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
                }
                
        }
@@ -788,7 +852,7 @@ extern int rmr_get_rcvfd( void* vctx ) {
        }
 
        if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
-               fprintf( stderr, "[WRN] rmr cannot get recv fd: %s\n", nng_strerror( state ) );
+               rmr_vlog( RMR_VL_WARN, "rmr cannot get recv fd: %s\n", nng_strerror( state ) );
                return -1;
        }
 
@@ -860,12 +924,12 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
        }
 
        chute = &ctx->chutes[0];                                        // chute 0 used only for its semaphore
-       
-       if( max_wait > 0 ) {
+
+       if( max_wait >= 0 ) {
                clock_gettime( CLOCK_REALTIME, &ts );   
 
                if( max_wait > 999 ) {
-                       seconds = (max_wait - 999)/1000;
+                       seconds = max_wait / 1000;
                        max_wait -= seconds * 1000;
                        ts.tv_sec += seconds;
                }
@@ -881,34 +945,30 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
                seconds = 1;                                                                                                    // use as flag later to invoked timed wait
        }
 
-       errno = 0;
-       state = 0;
-       while( chute->mbuf == NULL && ! errno ) {
+       errno = EINTR;
+       state = -1;
+       while( state < 0 && errno == EINTR ) {
                if( seconds ) {
                        state = sem_timedwait( &chute->barrier, &ts );                          // wait for msg or timeout
                } else {
                        state = sem_wait( &chute->barrier );
                }
-
-               if( state < 0 && errno == EINTR ) {                                                             // interrupted go back and wait; all other errors cause exit
-                       errno = 0;
-               }
        }
 
        if( state < 0 ) {
                mbuf = ombuf;                           // return caller's buffer if they passed one in
        } else {
-               if( DEBUG ) fprintf( stderr, "[DBUG] mt_rcv extracting from normal ring\n" );
+               errno = 0;                                              // interrupted call state could be left; clear
+               if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mt_rcv extracting from normal ring\n" );
                if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) {                  // pop if queued
-                       if( mbuf ) {
-                               mbuf->state = RMR_OK;
-
-                               if( ombuf ) {
-                                       rmr_free_msg( ombuf );                                  // we cannot reuse as mbufs are queued on the ring
-                               }
-                       } else {
-                               mbuf = ombuf;                           // no buffer, return user's if there
+                       mbuf->state = RMR_OK;
+
+                       if( ombuf ) {
+                               rmr_free_msg( ombuf );                                  // we cannot reuse as mbufs are queued on the ring
                        }
+               } else {
+                       errno = ETIMEDOUT;
+                       mbuf = ombuf;                           // no buffer, return user's if there
                }
        }
 
@@ -918,23 +978,15 @@ extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
        return mbuf;
 }
 
-/*
-       Accept a message buffer and caller ID, send the message and then wait
-       for the receiver to tickle the semaphore letting us know that a message
-       has been received. The call_id is a value between 2 and 255, inclusive; if
-       it's not in this range an error will be returned. Max wait is the amount
-       of time in millaseconds that the call should block for. If 0 is given
-       then no timeout is set.
 
-       If the mt_call feature has not been initialised, then the attempt to use this
-       funciton will fail with RMR_ERR_NOTSUPP
+/*
+       This does the real work behind both of the outward facing call functions. See 
+       the rmr_mt_call() description for details modulo the comments blow.
 
-       If no matching message is received before the max_wait period expires, a
-       nil pointer is returned, and errno is set to ETIMEOUT. If any other error
-       occurs after the message has been sent, then a nil pointer is returned
-       with errno set to some other value.
+       If ep is given, then we skip the normal route table endpoint selection. This is
+       likely a wormhole call.
 */
-extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
+static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait, endpoint_t* ep ) {
        rmr_mbuf_t* ombuf;                      // original mbuf passed in
        uta_ctx_t*      ctx;
        uta_mhdr_t*     hdr;                    // header in the transport buffer
@@ -982,11 +1034,11 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
        d1[D1_CALLID_IDX] = (unsigned char) call_id;                                    // set the caller ID for the response
        mbuf->flags |= MFL_NOALLOC;                                                                             // send message without allocating a new one (expect nil from mtosend
 
-       if( max_wait > 0 ) {
+       if( max_wait >= 0 ) {
                clock_gettime( CLOCK_REALTIME, &ts );   
 
                if( max_wait > 999 ) {
-                       seconds = (max_wait - 999)/1000;
+                       seconds = max_wait / 1000;
                        max_wait -= seconds * 1000;
                        ts.tv_sec += seconds;
                }
@@ -1002,7 +1054,11 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
                seconds = 1;                                                                            // use as flag later to invoked timed wait
        }
 
-       mbuf = mtosend_msg( ctx, mbuf, 0 );                                             // use internal function so as not to strip call-id; should be nil on success!
+       if( ep != NULL ) {
+               mbuf = mtosend_msg( ctx, mbuf, 0 );                                     // use internal function so as not to strip call-id; should be nil on success!
+       } else {
+               mbuf = send_msg( ctx, mbuf, ep->nn_sock, -1 );
+       }
        if( mbuf ) {
                if( mbuf->state != RMR_OK ) {
                        mbuf->tp_state = errno;
@@ -1010,7 +1066,7 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
                }
        }
 
-       state = 0;
+       state = -1;                                                                                             
        errno = 0;
        while( chute->mbuf == NULL && ! errno ) {
                if( seconds ) {
@@ -1036,9 +1092,71 @@ extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int m
                return NULL;                                    // leave errno as set by sem wait call
        }
 
-       mbuf = chute->mbuf;
-       mbuf->state = RMR_OK;
+       if( (mbuf = chute->mbuf) != NULL ) {
+               mbuf->state = RMR_OK;
+       }
        chute->mbuf = NULL;
 
        return mbuf;
 }
+
+/*
+       Accept a message buffer and caller ID, send the message and then wait
+       for the receiver to tickle the semaphore letting us know that a message
+       has been received. The call_id is a value between 2 and 255, inclusive; if
+       it's not in this range an error will be returned. Max wait is the amount
+       of time in millaseconds that the call should block for. If 0 is given
+       then no timeout is set.
+
+       If the mt_call feature has not been initialised, then the attempt to use this
+       funciton will fail with RMR_ERR_NOTSUPP
+
+       If no matching message is received before the max_wait period expires, a
+       nil pointer is returned, and errno is set to ETIMEOUT. If any other error
+       occurs after the message has been sent, then a nil pointer is returned
+       with errno set to some other value.
+
+       This is now just a wrapper to the real work horse so that we can provide
+       this and wormhole call functions without duplicating code.
+
+*/
+extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
+       return mt_call( vctx, mbuf, call_id, max_wait, NULL );
+}
+
+/*
+       Given an existing message buffer, reallocate the payload portion to
+       be at least new_len bytes.  The message header will remain such that
+       the caller may use the rmr_rts_msg() function to return a payload
+       to the sender. 
+
+       The mbuf passed in may or may not be reallocated and the caller must
+       use the returned pointer and should NOT assume that it can use the 
+       pointer passed in with the exceptions based on the clone flag.
+
+       If the clone flag is set, then a duplicated message, with larger payload
+       size, is allocated and returned.  The old_msg pointer in this situation is
+       still valid and must be explicitly freed by the application. If the clone 
+       message is not set (0), then any memory management of the old message is
+       handled by the function.
+
+       If the copy flag is set, the contents of the old message's payload is 
+       copied to the reallocated payload.  If the flag is not set, then the 
+       contents of the payload is undetermined.
+*/
+extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
+       if( old_msg == NULL ) {
+               return NULL;
+       }
+
+       return realloc_payload( old_msg, new_len, copy, clone );        // message allocation is transport specific, so this is a passthrough
+}
+
+/*
+       The following functions are "dummies" as NNG has no concept of supporting
+       them, but are needed to resolve calls at link time.
+*/
+
+extern void rmr_set_fack( void* p ) {
+       return;
+}