// vim: ts=4 sw=4 noet :
/*
==================================================================================
- Copyright (c) 2019-2020 Nokia
- Copyright (c) 2018-2020 AT&T Intellectual Property.
+ Copyright (c) 2019-2021 Nokia
+ Copyright (c) 2018-2021 AT&T Intellectual Property.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
#include "si95/socket_if.h"
#include "si95/siproto.h"
+
#define SI95_BUILD 1 // we drop some common functions for si
#include "rmr.h" // things the users see
#include "rmr_agnostic.h" // agnostic things (must be included before private)
#include "rmr_si_private.h" // things that we need too
+
#include "rmr_symtab.h"
#include "rmr_logging.h"
#include "ring_static.c" // message ring support
#include "rt_generic_static.c" // route table things not transport specific
#include "rtable_si_static.c" // route table things -- transport specific
+#include "alarm.c"
#include "rtc_static.c" // route table collector (thread code)
#include "tools_static.c"
#include "sr_si_static.c" // send/receive static functions
#include "wormholes.c" // wormhole api externals and related static functions (must be LAST!)
#include "mt_call_static.c"
#include "mt_call_si_static.c"
+#include "rmr_debug_si.c" // debuging functions
//------------------------------------------------------------------------------
+/*
+ If we have an EP, up the counters based on state.
+ This isn't needed, but it makes driving the code under unit test easier so we
+ induldge in the bean counter's desire for coverage numbers.
+*/
+static inline void incr_ep_counts( int state, endpoint_t* ep ) {
+ if( ep != NULL ) {
+ switch( state ) {
+ case RMR_OK:
+ ep->scounts[EPSC_GOOD]++;
+ break;
+
+ case RMR_ERR_RETRY:
+ ep->scounts[EPSC_TRANS]++;
+ break;
+
+ default:
+ ep->scounts[EPSC_FAIL]++;
+ break;
+ }
+ }
+}
/*
Clean up a context.
*/
static void free_ctx( uta_ctx_t* ctx ) {
if( ctx ) {
- if( ctx->rtg_addr ) {
+ if( ctx->rtg_addr ){
free( ctx->rtg_addr );
}
+ uta_ring_free( ctx->mring );
+ uta_ring_free( ctx->zcb_mring );
+ if( ctx->chutes ){
+ free( ctx->chutes );
+ }
+ if( ctx->fd2ep ){
+ rmr_sym_free( ctx->fd2ep );
+ }
+ if( ctx->my_name ){
+ free( ctx->my_name );
+ }
+ if( ctx->my_ip ){
+ free( ctx->my_ip );
+ }
+ if( ctx->rtable ){
+ rmr_sym_free( ctx->rtable->hash );
+ free( ctx->rtable );
+ }
+ if ( ctx->ephash ){
+ free( ctx->ephash );
+ }
+ free( ctx );
}
}
This provides an external path to the realloc static function as it's called by an
outward facing mbuf api function. Used to reallocate a message with a different
trace data size.
+
+ User programmes must use this with CAUTION! The mbuf passed in is NOT freed and
+ is still valid following this call. The caller is reponsible for maintainting
+ a pointer to both old and new messages and invoking rmr_free_msg() on both!
*/
extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* msg, int new_tr_size ) {
return realloc_msg( msg, new_tr_size );
Return the message to the available pool, or free it outright.
*/
extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
- //fprintf( stderr, "SKIPPING FREE: %p\n", mbuf );
- //return;
-
if( mbuf == NULL ) {
+ fprintf( stderr, ">>>FREE nil buffer\n" );
return;
}
- if( !mbuf->ring || ! uta_ring_insert( mbuf->ring, mbuf ) ) { // just queue, free if ring is full
+#ifdef KEEP
+ if( mbuf->flags & MFL_HUGE || // don't cache oversized messages
+ ! mbuf->ring || // cant cache if no ring
+ ! uta_ring_insert( mbuf->ring, mbuf ) ) { // or ring is full
+
if( mbuf->tp_buf ) {
free( mbuf->tp_buf );
mbuf->tp_buf = NULL; // just in case user tries to reuse this mbuf; this will be an NPE
mbuf->cookie = 0; // should signal a bad mbuf (if not reallocated)
free( mbuf );
}
+#else
+ // always free, never manage a pool
+ if( mbuf->tp_buf ) {
+ free( mbuf->tp_buf );
+ mbuf->tp_buf = NULL; // just in case user tries to reuse this mbuf; this will be an NPE
+ }
+
+ mbuf->cookie = 0; // should signal a bad mbuf (if not reallocated)
+ free( mbuf );
+#endif
}
/*
extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
int nn_sock; // endpoint socket for send
uta_ctx_t* ctx;
- int state;
char* hold_src; // we need the original source if send fails
char* hold_ip; // also must hold original ip
int sock_ok = 0; // true if we found a valid endpoint socket
}
}
-
msg->state = RMR_OK; // ensure it is clear before send
hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to
hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip ); // both the src host and src ip
- strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours
+ zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours
msg = send_msg( ctx, msg, nn_sock, -1 );
if( msg ) {
- if( ep != NULL ) {
- switch( msg->state ) {
- case RMR_OK:
- ep->scounts[EPSC_GOOD]++;
- break;
-
- case RMR_ERR_RETRY:
- ep->scounts[EPSC_TRANS]++;
- break;
-
- default:
- // FIX ME uta_fd_failed( nn_sock ); // we don't have an ep so this requires a look up/search to mark it failed
- ep->scounts[EPSC_FAIL]++;
- break;
- }
- }
- strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC ); // always return original source so rts can be called again
- strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC ); // always return original source so rts can be called again
+ incr_ep_counts( msg->state, ep ); // update counts
+
+ zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->src, hold_src, RMR_MAX_SRC ); // always replace original source & ip so rts can be called again
+ zt_buf_fill( (char *) ((uta_mhdr_t *)msg->header)->srcip, hold_ip, RMR_MAX_SRC );
msg->flags |= MFL_ADDSRC; // if msg given to send() it must add source
}
return msg;
}
- return rmr_mt_call( vctx, msg, 1, 1000 ); // use the reserved call-id of 1 and wait up to 1 sec
+ return mt_call( vctx, msg, 1, 1000, NULL ); // use the reserved call-id of 1 and wait up to 1 sec
}
/*
}
/*
+ DEPRECATED -- this function is not needed in the SI world, and when NNG goes away this will
+ too. This function likely will not behave as expected in SI, and we are pretty sure it
+ isn't being used as there was an abort triggering reference to rmr_rcv() until now.
+
This blocks until the message with the 'expect' ID is received. Messages which are received
before the expected message are queued onto the message ring. The function will return
a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n", expect );
while( queued < allow2queue ) {
- msg = rcv_msg( ctx, msg ); // hard wait for next
- if( msg->state == RMR_OK ) {
- if( memcmp( msg->xaction, expect, exp_len ) == 0 ) { // got it -- return it
- if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
- return msg;
- }
-
- if( ! uta_ring_insert( ctx->mring, msg ) ) { // just queue, error if ring is full
- if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
- errno = ENOBUFS;
- return NULL;
+ msg = rmr_rcv_msg( ctx, msg ); // hard wait for next
+ if( msg != NULL ) {
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific checking message; queued=%d allowed=%d state=%d\n", queued, allow2queue, msg->state );
+ if( msg->state == RMR_OK ) {
+ if( memcmp( msg->xaction, expect, exp_len ) == 0 ) { // got it -- return it
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific matched (%s); %d messages were queued\n", msg->xaction, queued );
+ return msg;
+ }
+
+ if( ! uta_ring_insert( ctx->mring, msg ) ) { // just queue, error if ring is full
+ if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
+ errno = ENOBUFS;
+ return NULL;
+ }
+
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
+ queued++;
+ msg = NULL;
}
-
- if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
- queued++;
- msg = NULL;
}
}
return 0;
}
+/*
+ Common cleanup on initialisation error. These are hard to force, and this helps to ensure
+ all code is tested by providing a callable rather than a block of "goto" code.
+
+ There is a return value so that where we need this we get dinked only for one
+ uncovered line rather than two:
+ init_err(...);
+ return NULL;
+
+ That's a hack, and is yet another example of the testing tail wagging the dog.
+*/
+static inline void* init_err( char* msg, void* ctx, void* port, int errval ) {
+ if( errval != 0 ) { // if not letting it be what a sysllib set it to...
+ errno = errval;
+ }
+
+ if( port ) { // free things if allocated
+ free( port );
+ }
+ if( ctx ) {
+ free_ctx( ctx );
+ }
+
+ if( msg ) { // crit message if supplied
+ rmr_vlog( RMR_VL_CRIT, "rmr_init: %s: %s", msg, strerror( errno ) );
+ }
+
+ return NULL;
+}
/*
This is the actual init workhorse. The user visible function meerly ensures that the
that we know about. The _user_ should ensure that the supplied length also
includes the trace data length maximum as they are in control of that.
*/
-static void* init( char* uproto_port, int max_msg_size, int flags ) {
+static void* init( char* uproto_port, int def_msg_size, int flags ) {
static int announced = 0;
uta_ctx_t* ctx = NULL;
char bind_info[256]; // bind info
char* proto = "tcp"; // pointer into the proto/port string user supplied
- char* port;
+ char* port; // pointer into the proto_port buffer at the port value
char* interface = NULL; // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
char* proto_port;
char wbuf[1024]; // work buffer
int old_vlevel;
old_vlevel = rmr_vlog_init(); // initialise and get the current level
- rmr_set_vlevel( RMR_VL_INFO ); // we WILL announce our version etc
if( ! announced ) {
- rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/g mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
- RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
+ rmr_set_vlevel( RMR_VL_INFO ); // we WILL announce our version
+ rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95 p=%s mv=%d flg=%02x id=a (%s %s.%s.%s built: %s)\n",
+ uproto_port, RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
announced = 1;
+
+ rmr_set_vlevel( old_vlevel ); // return logging to the desired state
+ uta_dump_env(); // spit out environment settings meaningful to us if in info mode
}
- rmr_set_vlevel( old_vlevel ); // return logging to the desired state
errno = 0;
if( uproto_port == NULL ) {
proto_port = strdup( DEF_COMM_PORT );
+ rmr_vlog( RMR_VL_WARN, "user passed nil as the listen port, using default: %s\n", proto_port );
} else {
proto_port = strdup( uproto_port ); // so we can modify it
}
+ if ( proto_port == NULL ){
+ return init_err( "unable to alloc proto port string", NULL, NULL, ENOMEM );
+ }
+
if( (ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) )) == NULL ) {
- errno = ENOMEM;
- return NULL;
+ return init_err( "unable to allocate context", ctx, proto_port, ENOMEM );
}
memset( ctx, 0, sizeof( uta_ctx_t ) );
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" );
- ctx->nrivers = 256; // number of input flows we'll manage
+ ctx->snarf_rt_fd = -1;
+ ctx->nrivers = MAX_RIVERS; // the array allows for fast index mapping for fd values < max
ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
+ ctx->river_hash = rmr_sym_alloc( 129 ); // connections with fd values > FD_MAX have to e hashed
memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers );
for( i = 0; i < ctx->nrivers; i++ ) {
ctx->rivers[i].state = RS_NEW; // force allocation of accumulator on first received packet
ctx->send_retries = 1; // default is not to sleep at all; RMr will retry about 10K times before returning
ctx->d1_len = 4; // data1 space in header -- 4 bytes for now
- ctx->max_ibm = max_msg_size < 1024 ? 1024 : max_msg_size; // larger than their request doesn't hurt
+ ctx->max_ibm = def_msg_size < 1024 ? 1024 : def_msg_size; // larger than their request doesn't hurt
ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + TP_HDR_LEN + 64; // add in header size, transport hdr, and a bit of fudge
ctx->mring = uta_mk_ring( 4096 ); // message ring is always on for si
if( ! (flags & RMRFL_NOLOCK) ) { // user did not specifically ask that it be off; turn it on
uta_ring_config( ctx->mring, RING_RLOCK ); // concurrent rcv calls require read lock
uta_ring_config( ctx->zcb_mring, RING_WLOCK ); // concurrent free calls from userland require write lock
+ uta_ring_config( ctx->zcb_mring, RING_FRLOCK ); // concurrent message allocatieon calls from userland require read lock, but can be fast
} else {
rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" );
}
ctx->max_plen = RMR_MAX_RCV_BYTES; // max user payload lengh
- if( max_msg_size > 0 ) {
- ctx->max_plen = max_msg_size;
+ if( def_msg_size > 0 ) {
+ ctx->max_plen = def_msg_size;
}
- // we're using a listener to get rtg updates, so we do NOT need this.
- //uta_lookup_rtg( ctx ); // attempt to fill in rtg info; rtc will handle missing values/errors
-
ctx->si_ctx = SIinitialise( SI_OPT_FG ); // FIX ME: si needs to streamline and drop fork/bg stuff
if( ctx->si_ctx == NULL ) {
- rmr_vlog( RMR_VL_CRIT, "unable to initialise SI95 interface\n" );
- free_ctx( ctx );
- return NULL;
+ return init_err( "unable to initialise SI95 interface\n", ctx, proto_port, 0 );
}
if( (port = strchr( proto_port, ':' )) != NULL ) {
} else {
port = proto_port; // assume something like "1234" was passed
}
+ rmr_vlog( RMR_VL_INFO, "listen port = %s\n", port );
- if( (tok = getenv( "ENV_RTG_PORT" )) != NULL ) { // must check port here -- if < 1 then we just start static file 'listener'
- if( atoi( tok ) < 1 ) {
- static_rtc = 1;
- }
+ if( (tok = getenv( ENV_RTG_PORT )) != NULL && atoi( tok ) < 0 ) { // must check here -- if < 0 then we just start static file 'listener'
+ static_rtc = 1;
}
if( (tok = getenv( ENV_SRC_ID )) != NULL ) { // env var overrides what we dig from system
free( tok );
} else {
if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
- rmr_vlog( RMR_VL_CRIT, "rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
- return NULL;
+ return init_err( "cannot determine localhost name\n", ctx, proto_port, 0 );
}
if( (tok = strchr( wbuf, '.' )) != NULL ) {
*tok = 0; // we don't keep domain portion
ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) { // our registered name is host:port
- rmr_vlog( RMR_VL_CRIT, "rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
- return NULL;
+ return init_err( "hostname + port is too long", ctx, proto_port, EINVAL );
}
if( (tok = getenv( ENV_NAME_ONLY )) != NULL ) {
if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
if( *tok == '1' ) {
- ctx->flags |= CTXFL_WARN; // turn on some warnings (not all, just ones that shouldn't impact performance)
+ ctx->flags |= CFL_WARN; // turn on some warnings (not all, just ones that shouldn't impact performance)
}
}
- if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
+ if( (interface = getenv( ENV_BIND_IF )) == NULL ) { // if specific interface not defined, listen on all
interface = "0.0.0.0";
}
- snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port ); // FIXME -- si only supports 0.0.0.0 by default
+ snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port );
if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
- free_ctx( ctx );
- return NULL;
+ return init_err( NULL, ctx, proto_port, 0 );
}
// finish all flag setting before threads to keep helgrind quiet
ctx->flags |= CFL_MTC_ENABLED; // for SI threaded receiver is the only way
- if( flags & RMRFL_NOTHREAD ) { // thread set to off; no route table collector started (could be called by the rtc thread itself)
- ctx->rtable = rt_clone_space( NULL, NULL, 0 ); // creates an empty route table so that wormholes still can be used
+
+ // ---------------- setup for route table collector before invoking ----------------------------------
+ ctx->rtgate = (pthread_mutex_t *) malloc( sizeof( *ctx->rtgate ) ); // single mutex required to gate access to moving rtables
+ if( ctx->rtgate != NULL ) {
+ pthread_mutex_init( ctx->rtgate, NULL );
+ }
+
+ ctx->ephash = rmr_sym_alloc( 129 ); // host:port to ep symtab exists outside of any route table
+ if( ctx->ephash == NULL ) {
+ return init_err( "unable to allocate ep hash\n", ctx, proto_port, ENOMEM );
+ }
+
+ pthread_mutex_destroy(ctx->rtgate);
+ ctx->rtable = rt_clone_space( ctx, NULL, NULL, 0 ); // create an empty route table so that wormhole/rts calls can be used
+ if( flags & RMRFL_NOTHREAD ) { // no thread prevents the collector start for very special cases
+ ctx->rtable_ready = 1; // route based sends will always fail, but rmr is ready for the non thread case
} else {
+ ctx->rtable_ready = 0; // no sends until a real route table is loaded in the rtc thread
+
if( static_rtc ) {
+ rmr_vlog( RMR_VL_INFO, "rmr_init: file based route table only for context on port %s\n", uproto_port );
if( pthread_create( &ctx->rtc_th, NULL, rtc_file, (void *) ctx ) ) { // kick the rt collector thread as just file reader
rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
}
} else {
+ rmr_vlog( RMR_VL_INFO, "rmr_init: dynamic route table for context on port %s\n", uproto_port );
if( pthread_create( &ctx->rtc_th, NULL, rtc, (void *) ctx ) ) { // kick the real rt collector thread
rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
}
without drastically changing anything. The user should invoke with RMRFL_NONE to
avoid any misbehavour as there are internal flags which are suported
*/
-extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
- return init( uproto_port, max_msg_size, flags & UFL_MASK ); // ensure any internal flags are off
+extern void* rmr_init( char* uproto_port, int def_msg_size, int flags ) {
+ return init( uproto_port, def_msg_size, flags & UFL_MASK ); // ensure any internal flags are off
}
/*
return FALSE;
}
- if( ctx->rtable != NULL ) {
- return TRUE;
- }
-
- return FALSE;
+ return ctx->rtable_ready;
}
/*
return -1;
}
-/*
- if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
- rmr_vlog( RMR_VL_WARN, "rmr cannot get recv fd: %s\n", nng_strerror( state ) );
- return -1;
- }
-*/
-
return uta_ring_getpfd( ctx->mring );
}
return;
}
+ if( ctx->seed_rt_fname != NULL ) {
+ free( ctx->seed_rt_fname );
+ }
+
ctx->shutdown = 1;
SItp_stats( ctx->si_ctx ); // dump some interesting stats
*/
extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
uta_ctx_t* ctx;
- uta_mhdr_t* hdr; // header in the transport buffer
chute_t* chute;
struct timespec ts; // time info if we have a timeout
long new_ms; // adjusted mu-sec
if( max_wait == 0 ) { // one shot poll; handle wihtout sem check as that is SLOW!
if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued
+ clock_gettime( CLOCK_REALTIME, &ts ); // pass current time as expriry time
+ sem_timedwait( &chute->barrier, &ts ); // must pop the count (ring is locking so if we got a message we can pop)
if( ombuf ) {
rmr_free_msg( ombuf ); // can't reuse, caller's must be trashed now
}
if( mbuf != NULL ) {
mbuf->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src
}
+
return mbuf;
}
return mbuf;
}
- if( call_id > MAX_CALL_ID || call_id < 2 ) { // 0 and 1 are reserved; user app cannot supply them
- mbuf->state = RMR_ERR_BADARG;
- mbuf->tp_state = errno;
- return mbuf;
- }
-
ombuf = mbuf; // save to return timeout status with
chute = &ctx->chutes[call_id];
This is now just an outward facing wrapper so we can support wormhole calls.
*/
extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
+
+ // must vet call_id here, all others vetted by workhorse mt_call() function
+ if( call_id > MAX_CALL_ID || call_id < 2 ) { // 0 and 1 are reserved; user app cannot supply them
+ if( mbuf != NULL ) {
+ mbuf->state = RMR_ERR_BADARG;
+ mbuf->tp_state = EINVAL;
+ }
+ return mbuf;
+ }
+
return mt_call( vctx, mbuf, call_id, max_wait, NULL );
}