#include "si95/socket_if.h"
#include "si95/siproto.h"
+#define SI95_BUILD 1 // we drop some common functions for si
#include "rmr.h" // things the users see
#include "rmr_agnostic.h" // agnostic things (must be included before private)
-#include "rmr_si_private.h" // things that we need too
+#include "rmr_si_private.h" // things that we need too
#include "rmr_symtab.h"
+#include "rmr_logging.h"
#include "ring_static.c" // message ring support
#include "rt_generic_static.c" // route table things not transport specific
#include "rtable_si_static.c" // route table things -- transport specific
-#include "rtc_static.c" // route table collector
-#include "rtc_si_static.c" // our private test function
+#include "rtc_static.c" // route table collector (thread code)
#include "tools_static.c"
#include "sr_si_static.c" // send/receive static functions
#include "wormholes.c" // wormhole api externals and related static functions (must be LAST!)
The allocated len stored in the msg is:
transport header length +
- message header +
- user requested payload
+ message header +
+ user requested payload
The msg header is a combination of the fixed RMR header and the variable
trace data and d2 fields which may vary for each message.
if( !mbuf->ring || ! uta_ring_insert( mbuf->ring, mbuf ) ) { // just queue, free if ring is full
if( mbuf->tp_buf ) {
free( mbuf->tp_buf );
+ mbuf->tp_buf = NULL; // just in case user tries to reuse this mbuf; this will be an NPE
}
+
+ mbuf->cookie = 0; // should signal a bad mbuf (if not reallocated)
free( mbuf );
}
}
d1 = DATA1_ADDR( msg->header );
d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end
- }
+ }
return mtosend_msg( vctx, msg, max_to );
}
d1 = DATA1_ADDR( msg->header );
d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end
- }
+ }
return rmr_mtosend_msg( vctx, msg, -1 ); // retries < 0 uses default from ctx
}
/*
Return to sender allows a message to be sent back to the endpoint where it originated.
- In the SI world the file descriptor that was the source of the message is captured in
- the mbuffer and thus can be used to quickly find the target for an RTS call.
+ With SI95 it was thought that the return to sender would be along the same open conneciton
+ and thus no table lookup would be needed to open a 'reverse direction' path. However, for
+ applications sending at high message rates, returning responses on the same connection
+ causes major strife. Thus the decision was made to use the same method as NNG and just
+ open a second connection for reverse path.
- The source information in the message is used to select the socket on which to write
- the message rather than using the message type and round-robin selection. This
- should return a message buffer with the state of the send operation set. On success
- (state is RMR_OK, the caller may use the buffer for another receive operation), and on
- error it can be passed back to this function to retry the send if desired. On error,
- errno will liklely have the failure reason set by the nng send processing.
- The following are possible values for the state in the message buffer:
+ We will attempt to use the name in the received message to look up the endpoint. If
+ that failes, then we will write on the connection that the message arrived on as a
+ falback.
+
+ On success (state is RMR_OK, the caller may use the buffer for another receive operation),
+ and on error it can be passed back to this function to retry the send if desired. On error,
+ errno will liklely have the failure reason set by the nng send processing. The following
+ are possible values for the state in the message buffer:
Message states returned:
RMR_ERR_BADARG - argument (context or msg) was nil or invalid
failure. The value of errno might give a clue as to what is wrong.
CAUTION:
- Like send_msg(), this is non-blocking and will return the msg if there is an errror.
+ Like send_msg(), this is non-blocking and will return the msg if there is an error.
The caller must check for this and handle it properly.
*/
extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
errno = 0; // at this point any bad state is in msg returned
if( msg->header == NULL ) {
- fprintf( stderr, "[ERR] rmr_send_msg: message had no header\n" );
+ rmr_vlog( RMR_VL_ERR, "rmr_send_msg: message had no header\n" );
msg->state = RMR_ERR_NOHDR;
msg->tp_state = errno;
return msg;
((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
-/*
- sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep, ctx->si_ctx ); // src is always used first for rts
+ sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep ); // always try src first
if( ! sock_ok ) {
-*/
- if( (nn_sock = msg->rts_fd) < 0 ) {
- if( HDR_VERSION( msg->header ) > 2 ) { // with ver2 the ip is there, try if src name not known
- sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep, ctx->si_ctx );
- }
- if( ! sock_ok ) {
- msg->state = RMR_ERR_NOENDPT;
- return msg; // preallocated msg can be reused since not given back to nn
+ if( (nn_sock = msg->rts_fd) < 0 ) {
+ if( HDR_VERSION( msg->header ) > 2 ) { // with ver2 the ip is there, try if src name not known
+ sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep );
+ }
+ if( ! sock_ok ) {
+ msg->state = RMR_ERR_NOENDPT;
+ return msg;
+ }
}
}
case RMR_OK:
ep->scounts[EPSC_GOOD]++;
break;
-
+
case RMR_ERR_RETRY:
ep->scounts[EPSC_TRANS]++;
break;
If multi-threading call is turned on, this invokes that mechanism with the special call
id of 1 and a max wait of 1 second. If multi threaded call is not on, then the original
behavour (described below) is carried out. This is safe to use when mt is enabled, but
- the user app is invoking rmr_call() from only one thread, and the caller doesn't need
+ the user app is invoking rmr_call() from only one thread, and the caller doesn't need
a flexible timeout.
On timeout this function will return a nil pointer. If the original message could not
if( exp_len > RMR_MAX_XID ) {
exp_len = RMR_MAX_XID;
}
- if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific waiting for id=%s\n", expect );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n", expect );
while( queued < allow2queue ) {
msg = rcv_msg( ctx, msg ); // hard wait for next
if( msg->state == RMR_OK ) {
if( memcmp( msg->xaction, expect, exp_len ) == 0 ) { // got it -- return it
- if( DEBUG ) fprintf( stderr, "[DBUG] rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
return msg;
}
if( ! uta_ring_insert( ctx->mring, msg ) ) { // just queue, error if ring is full
- if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_specific ring is full\n" );
+ if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
errno = ENOBUFS;
return NULL;
}
- if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific queued message type=%d\n", msg->mtype );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
queued++;
msg = NULL;
}
}
- if( DEBUG ) fprintf( stderr, "[DBUG] rcv_specific timeout waiting for %s\n", expect );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific timeout waiting for %s\n", expect );
errno = ETIMEDOUT;
return NULL;
}
/*
Set send timeout. The value time is assumed to be milliseconds. The timeout is the
- _rough_ maximum amount of time that RMr will block on a send attempt when the underlying
+ _rough_ maximum amount of time that RMR will block on a send attempt when the underlying
mechnism indicates eagain or etimeedout. All other error conditions are reported
without this delay. Setting a timeout of 0 causes no retries to be attempted in
RMr code. Setting a timeout of 1 causes RMr to spin up to 1K retries before returning,
but _without_ issuing a sleep. If timeout is > 1, then RMr will issue a sleep (1us)
after every 1K send attempts until the "time" value is reached. Retries are abandoned
- if NNG returns anything other than NNG_EAGAIN or NNG_ETIMEDOUT.
+ if NNG returns anything other than EAGAIN or EINTER is returned.
The default, if this function is not used, is 1; meaning that RMr will retry, but will
not enter a sleep. In all cases the caller should check the status in the message returned
CAUTION: this is not supported as they must be set differently (between create and open) in NNG.
*/
extern int rmr_set_rtimeout( void* vctx, int time ) {
- fprintf( stderr, "[WRN] Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" );
+ rmr_vlog( RMR_VL_WARN, "Current underlying transport mechanism (SI) does not support rcv timeout; not set\n" );
return 0;
}
invokes this. Internal functions (the route table collector) which need additional
open ports without starting additional route table collectors, will invoke this
directly with the proper flag.
+
+ CAUTION: The max_ibm (max inbound message) size is the supplied user max plus the lengths
+ that we know about. The _user_ should ensure that the supplied length also
+ includes the trace data length maximum as they are in control of that.
*/
static void* init( char* uproto_port, int max_msg_size, int flags ) {
static int announced = 0;
uta_ctx_t* ctx = NULL;
- char bind_info[NNG_MAXADDRLEN]; // bind info
+ char bind_info[256]; // bind info
char* proto = "tcp"; // pointer into the proto/port string user supplied
char* port;
char* interface = NULL; // interface to bind to (from RMR_BIND_IF, 0.0.0.0 if not defined)
char wbuf[1024]; // work buffer
char* tok; // pointer at token in a buffer
char* tok2;
+ int static_rtc = 0; // if rtg env var is < 1, then we set and don't listen on a port
int state;
int i;
+ int old_vlevel;
+
+ old_vlevel = rmr_vlog_init(); // initialise and get the current level
+ rmr_set_vlevel( RMR_VL_INFO ); // we WILL announce our version etc
if( ! announced ) {
- fprintf( stderr, "[INFO] ric message routing library on SI95 mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
+ rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/g mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
announced = 1;
}
+ rmr_set_vlevel( old_vlevel ); // return logging to the desired state
errno = 0;
if( uproto_port == NULL ) {
}
memset( ctx, 0, sizeof( uta_ctx_t ) );
- if( DEBUG ) fprintf( stderr, "[DBUG] rmr_init: allocating 266 rivers\n" );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rmr_init: allocating 266 rivers\n" );
ctx->nrivers = 256; // number of input flows we'll manage
ctx->rivers = (river_t *) malloc( sizeof( river_t ) * ctx->nrivers );
memset( ctx->rivers, 0, sizeof( river_t ) * ctx->nrivers );
ctx->send_retries = 1; // default is not to sleep at all; RMr will retry about 10K times before returning
ctx->d1_len = 4; // data1 space in header -- 4 bytes for now
- ctx->max_ibm = max_msg_size; // default to user supplied message size
+ ctx->max_ibm = max_msg_size < 1024 ? 1024 : max_msg_size; // larger than their request doesn't hurt
+ ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + TP_HDR_LEN + 64; // add in header size, transport hdr, and a bit of fudge
ctx->mring = uta_mk_ring( 4096 ); // message ring is always on for si
+ ctx->zcb_mring = uta_mk_ring( 128 ); // zero copy buffer mbuf ring to reduce malloc/free calls
+
+ if( ! (flags & RMRFL_NOLOCK) ) { // user did not specifically ask that it be off; turn it on
+ uta_ring_config( ctx->mring, RING_RLOCK ); // concurrent rcv calls require read lock
+ uta_ring_config( ctx->zcb_mring, RING_WLOCK ); // concurrent free calls from userland require write lock
+ } else {
+ rmr_vlog( RMR_VL_INFO, "receive ring locking disabled by user application\n" );
+ }
init_mtcall( ctx ); // set up call chutes
+ fd2ep_init( ctx ); // initialise the fd to endpoint sym tab
- ctx->zcb_mring = uta_mk_ring( 128 ); // zero copy buffer mbuf ring
ctx->max_plen = RMR_MAX_RCV_BYTES; // max user payload lengh
if( max_msg_size > 0 ) {
ctx->si_ctx = SIinitialise( SI_OPT_FG ); // FIX ME: si needs to streamline and drop fork/bg stuff
if( ctx->si_ctx == NULL ) {
- fprintf( stderr, "[CRI] unable to initialise SI95 interface\n" );
+ rmr_vlog( RMR_VL_CRIT, "unable to initialise SI95 interface\n" );
free_ctx( ctx );
return NULL;
}
port = proto_port; // assume something like "1234" was passed
}
+ if( (tok = getenv( "ENV_RTG_PORT" )) != NULL ) { // must check port here -- if < 1 then we just start static file 'listener'
+ if( atoi( tok ) < 1 ) {
+ static_rtc = 1;
+ }
+ }
+
if( (tok = getenv( ENV_SRC_ID )) != NULL ) { // env var overrides what we dig from system
tok = strdup( tok ); // something we can destroy
if( *tok == '[' ) { // we allow an ipv6 address here
free( tok );
} else {
if( (gethostname( wbuf, sizeof( wbuf ) )) != 0 ) {
- fprintf( stderr, "[CRI] rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
+ rmr_vlog( RMR_VL_CRIT, "rmr_init: cannot determine localhost name: %s\n", strerror( errno ) );
return NULL;
}
if( (tok = strchr( wbuf, '.' )) != NULL ) {
ctx->my_name = (char *) malloc( sizeof( char ) * RMR_MAX_SRC );
if( snprintf( ctx->my_name, RMR_MAX_SRC, "%s:%s", wbuf, port ) >= RMR_MAX_SRC ) { // our registered name is host:port
- fprintf( stderr, "[CRI] rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
+ rmr_vlog( RMR_VL_CRIT, "rmr_init: hostname + port must be less than %d characters; %s:%s is not\n", RMR_MAX_SRC, wbuf, port );
return NULL;
}
} else {
ctx->my_ip = get_default_ip( ctx->ip_list ); // and (guess) at what should be the default to put into messages as src
if( ctx->my_ip == NULL ) {
- fprintf( stderr, "[WRN] rmr_init: default ip address could not be sussed out, using name\n" );
+ rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
strcpy( ctx->my_ip, ctx->my_name ); // if we cannot suss it out, use the name rather than a nil pointer
}
}
- if( DEBUG ) fprintf( stderr, "[DBUG] default ip address: %s\n", ctx->my_ip );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " default ip address: %s\n", ctx->my_ip );
if( (tok = getenv( ENV_WARNINGS )) != NULL ) {
if( *tok == '1' ) {
if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
interface = "0.0.0.0";
}
-
+
snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port ); // FIXME -- si only supports 0.0.0.0 by default
if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
- fprintf( stderr, "[CRI] rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
+ rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
free_ctx( ctx );
return NULL;
}
- if( !(flags & FL_NOTHREAD) ) { // skip if internal function that doesnt need an rtc
- if( pthread_create( &ctx->rtc_th, NULL, rtc_file, (void *) ctx ) ) { // kick the rt collector thread
- fprintf( stderr, "[WRN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
+ // finish all flag setting before threads to keep helgrind quiet
+ ctx->flags |= CFL_MTC_ENABLED; // for SI threaded receiver is the only way
+
+ if( flags & RMRFL_NOTHREAD ) { // thread set to off; no route table collector started (could be called by the rtc thread itself)
+ ctx->rtable = rt_clone_space( NULL, NULL, 0 ); // creates an empty route table so that wormholes still can be used
+ } else {
+ if( static_rtc ) {
+ if( pthread_create( &ctx->rtc_th, NULL, rtc_file, (void *) ctx ) ) { // kick the rt collector thread as just file reader
+ rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
+ }
+ } else {
+ if( pthread_create( &ctx->rtc_th, NULL, rtc, (void *) ctx ) ) { // kick the real rt collector thread
+ rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
+ }
}
}
- //fprintf( stderr, ">>>>> starting threaded receiver with ctx=%p si_ctx=%p\n", ctx, ctx->si_ctx );
- ctx->flags |= CFL_MTC_ENABLED; // for SI threaded receiver is the only way
if( pthread_create( &ctx->mtc_th, NULL, mt_receive, (void *) ctx ) ) { // so kick it
- fprintf( stderr, "[WRN] rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
+ rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
}
free( proto_port );
/*
if( (state = nng_getopt_int( ctx->nn_sock, NNG_OPT_RECVFD, &fd )) != 0 ) {
- fprintf( stderr, "[WRN] rmr cannot get recv fd: %s\n", nng_strerror( state ) );
+ rmr_vlog( RMR_VL_WARN, "rmr cannot get recv fd: %s\n", nng_strerror( state ) );
return -1;
}
*/
long nano_sec; // max wait xlated to nano seconds
int state;
rmr_mbuf_t* ombuf; // mbuf user passed; if we timeout we return state here
-
+
if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
errno = EINVAL;
if( mbuf ) {
if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued
if( ombuf ) {
rmr_free_msg( ombuf ); // can't reuse, caller's must be trashed now
- }
+ }
} else {
mbuf = ombuf; // return original if it was given with timeout status
if( ombuf != NULL ) {
}
}
+ mbuf->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src
return mbuf;
}
mbuf = ombuf; // return caller's buffer if they passed one in
} else {
errno = 0; // interrupted call state could be left; clear
- if( DEBUG ) fprintf( stderr, "[DBUG] mt_rcv extracting from normal ring\n" );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " mt_rcv extracting from normal ring\n" );
if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued
mbuf->state = RMR_OK;
+ mbuf->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src
if( ombuf ) {
rmr_free_msg( ombuf ); // we cannot reuse as mbufs are queued on the ring
long seconds = 0; // max wait seconds
long nano_sec; // max wait xlated to nano seconds
int state;
-
+
errno = EINVAL;
if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
if( mbuf ) {
rmr_free_msg( chute->mbuf );
chute->mbuf = NULL;
}
-
+
hdr = (uta_mhdr_t *) mbuf->header;
hdr->flags |= HFL_CALL_MSG; // must signal this sent with a call
memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID ); // xaction that we will wait for
mbuf->flags |= MFL_NOALLOC; // send message without allocating a new one (expect nil from mtosend
if( max_wait >= 0 ) {
- clock_gettime( CLOCK_REALTIME, &ts );
+ clock_gettime( CLOCK_REALTIME, &ts );
if( max_wait > 999 ) {
seconds = max_wait / 1000;
return mbuf;
}
+/*
+ Given an existing message buffer, reallocate the payload portion to
+ be at least new_len bytes. The message header will remain such that
+ the caller may use the rmr_rts_msg() function to return a payload
+ to the sender.
+
+ The mbuf passed in may or may not be reallocated and the caller must
+ use the returned pointer and should NOT assume that it can use the
+ pointer passed in with the exceptions based on the clone flag.
+
+ If the clone flag is set, then a duplicated message, with larger payload
+ size, is allocated and returned. The old_msg pointer in this situation is
+ still valid and must be explicitly freed by the application. If the clone
+ message is not set (0), then any memory management of the old message is
+ handled by the function.
+
+ If the copy flag is set, the contents of the old message's payload is
+ copied to the reallocated payload. If the flag is not set, then the
+ contents of the payload is undetermined.
+*/
+extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
+ if( old_msg == NULL ) {
+ return NULL;
+ }
+
+ return realloc_payload( old_msg, new_len, copy, clone ); // message allocation is transport specific, so this is a passthrough
+}
+
/*
Enable low latency things in the transport (when supported).
*/