#include "si95/socket_if.h"
#include "si95/siproto.h"
+#define SI95_BUILD 1 // we drop some common functions for si
#include "rmr.h" // things the users see
#include "rmr_agnostic.h" // agnostic things (must be included before private)
-#include "rmr_si_private.h" // things that we need too
+#include "rmr_si_private.h" // things that we need too
#include "rmr_symtab.h"
+#include "rmr_logging.h"
#include "ring_static.c" // message ring support
#include "rt_generic_static.c" // route table things not transport specific
#include "rtable_si_static.c" // route table things -- transport specific
-#include "rtc_si_static.c" // specific RMR only route table collector (SI only for now)
+#include "rtc_static.c" // route table collector (thread code)
#include "tools_static.c"
#include "sr_si_static.c" // send/receive static functions
#include "wormholes.c" // wormhole api externals and related static functions (must be LAST!)
if( !mbuf->ring || ! uta_ring_insert( mbuf->ring, mbuf ) ) { // just queue, free if ring is full
if( mbuf->tp_buf ) {
free( mbuf->tp_buf );
+ mbuf->tp_buf = NULL; // just in case user tries to reuse this mbuf; this will be an NPE
}
+
+ mbuf->cookie = 0; // should signal a bad mbuf (if not reallocated)
free( mbuf );
}
}
errno = 0; // at this point any bad state is in msg returned
if( msg->header == NULL ) {
- fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" );
+ rmr_vlog( RMR_VL_ERR, "rmr_send_msg: message had no header\n" );
msg->state = RMR_ERR_NOHDR;
msg->tp_state = errno;
return msg;
*/
if( (nn_sock = msg->rts_fd) < 0 ) {
if( HDR_VERSION( msg->header ) > 2 ) { // with ver2 the ip is there, try if src name not known
- sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep, ctx->si_ctx );
+ //sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep, ctx->si_ctx );
+ sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep );
}
if( ! sock_ok ) {
msg->state = RMR_ERR_NOENDPT;
}
}
-
msg->state = RMR_OK; // ensure it is clear before send
hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to
hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip ); // both the src host and src ip
if( exp_len > RMR_MAX_XID ) {
exp_len = RMR_MAX_XID;
}
- if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n", expect );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n", expect );
while( queued < allow2queue ) {
msg = rcv_msg( ctx, msg ); // hard wait for next
if( msg->state == RMR_OK ) {
if( memcmp( msg->xaction, expect, exp_len ) == 0 ) { // got it -- return it
- if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
return msg;
}
if( ! uta_ring_insert( ctx->mring, msg ) ) { // just queue, error if ring is full
- if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" );
+ if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
errno = ENOBUFS;
return NULL;
}
- if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
queued++;
msg = NULL;
}
}
- if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific timeout waiting for %s\n", expect );
errno = ETIMEDOUT;
return NULL;
}
/*
Set send timeout. The value time is assumed to be milliseconds. The timeout is the
- _rough_ maximum amount of time that RMr will block on a send attempt when the underlying
+ _rough_ maximum amount of time that RMR will block on a send attempt when the underlying
mechnism indicates eagain or etimeedout. All other error conditions are reported
without this delay. Setting a timeout of 0 causes no retries to be attempted in
RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
but _without_ issuing a sleep. If timeout is > 1, then RMr will issue a sleep (1us)
after every 1K send attempts until the "time" value is reached. Retries are abandoned
- if NNG returns anything other than NNG_EAGAIN or NNG_ETIMEDOUT.
+ if NNG returns anything other than EAGAIN or EINTER is returned.
The default, if this function is not used, is 1; meaning that RMr will retry, but will
not enter a sleep. In all cases the caller should check the status in the message returned
CAUTION: this is not supported as they must be set differently (between create and open) in NNG.
*/
extern int rmr_set_rtimeout( void* vctx, int time ) {
- fprintf( stderr, "[WRN] Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" );
+ rmr_vlog( RMR_VL_WARN, "Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" );
return 0;
}
invokes this. Internal functions (the route table collector) which need additional
open ports without starting additional route table collectors, will invoke this
directly with the proper flag.
+
+ CAUTION: The max_ibm (max inbound message) size is the supplied user max plus the lengths
+ that we know about. The _user_ should ensure that the supplied length also
+ includes the trace data length maximum as they are in control of that.
*/
static void* init( char* uproto_port, int max_msg_size, int flags ) {
static int announced = 0;
uta_ctx_t* ctx = NULL;
- char bind_info[NNG_MAXADDRLEN]; // bind info
+ char bind_info[256]; // bind info
char* proto = "tcp"; // pointer into the proto/port string user supplied
char* port;
char* interface = NULL; // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
int static_rtc = 0; // if rtg env var is < 1, then we set and don't listen on a port
int state;
int i;
+ int old_vlevel;
+
+ old_vlevel = rmr_vlog_init(); // initialise and get the current level
+ rmr_set_vlevel( RMR_VL_INFO ); // we WILL announce our version etc
if( ! announced ) {
- fprintf( stderr, "[INFO] ric message routing library on SI95/b mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
+ rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/f mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
announced = 1;
}
+ rmr_set_vlevel( old_vlevel ); // return logging to the desired state
errno = 0;
if( uproto_port == NULL ) {
}
memset( ctx, 0, sizeof( uta_ctx_t ) );
- if( DEBUG ) fprintf( stderr, "[DBUG] rmr_init: allocating 266 rivers\n" );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" );
ctx->nrivers = 256; // number of input flows we'll manage
ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers );
ctx->send_retries = 1; // default is not to sleep at all; RMr will retry about 10K times before returning
ctx->d1_len = 4; // data1 space in header -- 4 bytes for now
- ctx->max_ibm = max_msg_size; // default to user supplied message size
+ ctx->max_ibm = max_msg_size < 1024 ? 1024 : max_msg_size; // larger than their request doesn't hurt
+ ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + TP_HDR_LEN + 64; // add in header size, transport hdr, and a bit of fudge
ctx->mring = uta_mk_ring( 4096 ); // message ring is always on for si
+ ctx->zcb_mring = uta_mk_ring( 128 ); // zero copy buffer mbuf ring to reduce malloc/free calls
+
+ if( ! (flags & RMRFL_NOLOCK) ) { // user did not specifically ask that it be off; turn it on
+ uta_ring_config( ctx->mring, RING_RLOCK ); // concurrent rcv calls require read lock
+ uta_ring_config( ctx->zcb_mring, RING_WLOCK ); // concurrent free calls from userland require write lock
+ } else {
+ rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" );
+ }
init_mtcall( ctx ); // set up call chutes
+ fd2ep_init( ctx ); // initialise the fd to endpoint sym tab
- ctx->zcb_mring = uta_mk_ring( 128 ); // zero copy buffer mbuf ring
ctx->max_plen = RMR_MAX_RCV_BYTES; // max user payload lengh
if( max_msg_size > 0 ) {
ctx->si_ctx = SIinitialise( SI_OPT_FG ); // FIX ME: si needs to streamline and drop fork/bg stuff
if( ctx->si_ctx == NULL ) {
- fprintf( stderr, "[CRI] unable to initialise SI95 interface\n" );
+ rmr_vlog( RMR_VL_CRIT, "unable to initialise SI95 interface\n" );
free_ctx( ctx );
return NULL;
}
free( tok );
} else {
if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
- fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
+ rmr_vlog( RMR_VL_CRIT, "rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
return NULL;
}
if( (tok = strchr( wbuf, '.' )) != NULL ) {
ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) { // our registered name is host:port
- fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
+ rmr_vlog( RMR_VL_CRIT, "rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
return NULL;
}
} else {
ctx->my_ip = get_default_ip( ctx->ip_list ); // and (guess) at what should be the default to put into messages as src
if( ctx->my_ip == NULL ) {
- fprintf( stderr, "[WRN] rmr_init: default ip address could not be sussed out, using name\n" );
+ rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
strcpy( ctx->my_ip, ctx->my_name ); // if we cannot suss it out, use the name rather than a nil pointer
}
}
- if( DEBUG ) fprintf( stderr, "[DBUG] default ip address: %s\n", ctx->my_ip );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " default ip address: %s\n", ctx->my_ip );
if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
if( *tok == '1' ) {
snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port ); // FIXME -- si only supports 0.0.0.0 by default
if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
- fprintf( stderr, "[CRI] rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
+ rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
free_ctx( ctx );
return NULL;
}
- if( !(flags & FL_NOTHREAD) ) { // skip if internal function that doesnt need a RTC
+ if( flags & FL_NOTHREAD ) { // thread set to off; no rout table collector started (could be called by the rtc thread itself)
+ ctx->rtable = rt_clone_space( NULL, NULL, 0 ); // creates an empty route table so that wormholes still can be used
+ } else {
if( static_rtc ) {
if( pthread_create( &ctx->rtc_th, NULL, rtc_file, (void *) ctx ) ) { // kick the rt collector thread as just file reader
- fprintf( stderr, "[WRN] rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
+ rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
}
} else {
if( pthread_create( &ctx->rtc_th, NULL, rtc, (void *) ctx ) ) { // kick the real rt collector thread
- fprintf( stderr, "[WRN] rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
+ rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
}
}
}
ctx->flags |= CFL_MTC_ENABLED; // for SI threaded receiver is the only way
if( pthread_create( &ctx->mtc_th, NULL, mt_receive, (void *) ctx ) ) { // so kick it
- fprintf( stderr, "[WRN] rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
+ rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
}
free( proto_port );
/*
if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
- fprintf( stderr, "[WRN] rmr cannot get recv fd: %s\n", nng_strerror( state ) );
+ rmr_vlog( RMR_VL_WARN, "rmr cannot get recv fd: %s\n", nng_strerror( state ) );
return -1;
}
*/
mbuf = ombuf; // return caller's buffer if they passed one in
} else {
errno = 0; // interrupted call state could be left; clear
- if( DEBUG ) fprintf( stderr, "[DBUG] mt_rcv extracting from normal ring\n" );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " mt_rcv extracting from normal ring\n" );
if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued
mbuf->state = RMR_OK;
return mbuf;
}
+/*
+ Given an existing message buffer, reallocate the payload portion to
+ be at least new_len bytes. The message header will remain such that
+ the caller may use the rmr_rts_msg() function to return a payload
+ to the sender.
+
+ The mbuf passed in may or may not be reallocated and the caller must
+ use the returned pointer and should NOT assume that it can use the
+ pointer passed in with the exceptions based on the clone flag.
+
+ If the clone flag is set, then a duplicated message, with larger payload
+ size, is allocated and returned. The old_msg pointer in this situation is
+ still valid and must be explicitly freed by the application. If the clone
+ message is not set (0), then any memory management of the old message is
+ handled by the function.
+
+ If the copy flag is set, the contents of the old message's payload is
+ copied to the reallocated payload. If the flag is not set, then the
+ contents of the payload is undetermined.
+*/
+extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
+ if( old_msg == NULL ) {
+ return NULL;
+ }
+
+ return realloc_payload( old_msg, new_len, copy, clone ); // message allocation is transport specific, so this is a passthrough
+}
+
/*
Enable low latency things in the transport (when supported).
*/