Clean up a context.
*/
static void free_ctx( uta_ctx_t* ctx ) {
- if( ctx ) {
- if( ctx->rtg_addr ) {
- free( ctx->rtg_addr );
- }
+ if( ctx && ctx->rtg_addr ) {
+ free( ctx->rtg_addr );
}
}
Return the message to the available pool, or free it outright.
*/
extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
- //fprintf( stderr, "SKIPPING FREE: %p\n", mbuf );
- //return;
-
if( mbuf == NULL ) {
return;
}
- if( !mbuf->ring || ! uta_ring_insert( mbuf->ring, mbuf ) ) { // just queue, free if ring is full
+ if( mbuf->flags & MFL_HUGE || // don't cache oversized messages
+ ! mbuf->ring || // cant cache if no ring
+ ! uta_ring_insert( mbuf->ring, mbuf ) ) { // or ring is full
+
if( mbuf->tp_buf ) {
free( mbuf->tp_buf );
mbuf->tp_buf = NULL; // just in case user tries to reuse this mbuf; this will be an NPE
((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
d1 = DATA1_ADDR( msg->header );
- d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end
+ d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end
}
return mtosend_msg( vctx, msg, max_to );
extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
int nn_sock; // endpoint socket for send
uta_ctx_t* ctx;
- int state;
char* hold_src; // we need the original source if send fails
char* hold_ip; // also must hold original ip
int sock_ok = 0; // true if we found a valid endpoint socket
return msg;
}
- return rmr_mt_call( vctx, msg, 1, 1000 ); // use the reserved call-id of 1 and wait up to 1 sec
+ return mt_call( vctx, msg, 1, 1000, NULL ); // use the reserved call-id of 1 and wait up to 1 sec
}
/*
}
/*
+ DEPRECATED -- this function is not needed in the SI world, and when NNG goes away this will
+ too. This function likely will not behave as expected in SI, and we are pretty sure it
+ isn't being used as there was an abort triggering reference to rmr_rcv() until now.
+
This blocks until the message with the 'expect' ID is received. Messages which are received
before the expected message are queued onto the message ring. The function will return
a nil message and set errno to ETIMEDOUT if allow2queue messages are received before the
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific waiting for id=%s\n", expect );
while( queued < allow2queue ) {
- msg = rcv_msg( ctx, msg ); // hard wait for next
- if( msg->state == RMR_OK ) {
- if( memcmp( msg->xaction, expect, exp_len ) == 0 ) { // got it -- return it
- if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv-specific matched (%s); %d messages were queued\n", msg->xaction, queued );
- return msg;
- }
-
- if( ! uta_ring_insert( ctx->mring, msg ) ) { // just queue, error if ring is full
- if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
- errno = ENOBUFS;
- return NULL;
+ msg = rmr_rcv_msg( ctx, msg ); // hard wait for next
+ if( msg != NULL ) {
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific checking message; queued=%d allowed=%d state=%d\n", queued, allow2queue, msg->state );
+ if( msg->state == RMR_OK ) {
+ if( memcmp( msg->xaction, expect, exp_len ) == 0 ) { // got it -- return it
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific matched (%s); %d messages were queued\n", msg->xaction, queued );
+ return msg;
+ }
+
+ if( ! uta_ring_insert( ctx->mring, msg ) ) { // just queue, error if ring is full
+ if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific ring is full\n" );
+ errno = ENOBUFS;
+ return NULL;
+ }
+
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
+ queued++;
+ msg = NULL;
}
-
- if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " rcv_specific queued message type=%d\n", msg->mtype );
- queued++;
- msg = NULL;
}
}
that we know about. The _user_ should ensure that the supplied length also
includes the trace data length maximum as they are in control of that.
*/
-static void* init( char* uproto_port, int max_msg_size, int flags ) {
+static void* init( char* uproto_port, int def_msg_size, int flags ) {
static int announced = 0;
uta_ctx_t* ctx = NULL;
char bind_info[256]; // bind info
int old_vlevel;
old_vlevel = rmr_vlog_init(); // initialise and get the current level
- rmr_set_vlevel( RMR_VL_INFO ); // we WILL announce our version etc
if( ! announced ) {
+ rmr_set_vlevel( RMR_VL_INFO ); // we WILL announce our version
rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/g mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
announced = 1;
+
+ rmr_set_vlevel( old_vlevel ); // return logging to the desired state
+ uta_dump_env(); // spit out environment settings meaningful to us if in info mode
}
- rmr_set_vlevel( old_vlevel ); // return logging to the desired state
errno = 0;
if( uproto_port == NULL ) {
ctx->send_retries = 1; // default is not to sleep at all; RMr will retry about 10K times before returning
ctx->d1_len = 4; // data1 space in header -- 4 bytes for now
- ctx->max_ibm = max_msg_size < 1024 ? 1024 : max_msg_size; // larger than their request doesn't hurt
+ ctx->max_ibm = def_msg_size < 1024 ? 1024 : def_msg_size; // larger than their request doesn't hurt
ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + TP_HDR_LEN + 64; // add in header size, transport hdr, and a bit of fudge
ctx->mring = uta_mk_ring( 4096 ); // message ring is always on for si
ctx->max_plen = RMR_MAX_RCV_BYTES; // max user payload lengh
- if( max_msg_size > 0 ) {
- ctx->max_plen = max_msg_size;
+ if( def_msg_size > 0 ) {
+ ctx->max_plen = def_msg_size;
}
// we're using a listener to get rtg updates, so we do NOT need this.
port = proto_port; // assume something like "1234" was passed
}
- if( (tok = getenv( "ENV_RTG_PORT" )) != NULL ) { // must check port here -- if < 1 then we just start static file 'listener'
- if( atoi( tok ) < 1 ) {
- static_rtc = 1;
- }
+ if( (tok = getenv( ENV_RTG_PORT )) != NULL && atoi( tok ) < 0 ) { // must check here -- if < 0 then we just start static file 'listener'
+ static_rtc = 1;
}
if( (tok = getenv( ENV_SRC_ID )) != NULL ) { // env var overrides what we dig from system
ctx->my_ip = get_default_ip( ctx->ip_list ); // and (guess) at what should be the default to put into messages as src
if( ctx->my_ip == NULL ) {
rmr_vlog( RMR_VL_WARN, "rmr_init: default ip address could not be sussed out, using name\n" );
- strcpy( ctx->my_ip, ctx->my_name ); // if we cannot suss it out, use the name rather than a nil pointer
+ ctx->my_ip = strdup( ctx->my_name ); // if we cannot suss it out, use the name rather than a nil pointer
}
}
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " default ip address: %s\n", ctx->my_ip );
// finish all flag setting before threads to keep helgrind quiet
ctx->flags |= CFL_MTC_ENABLED; // for SI threaded receiver is the only way
- if( flags & FL_NOTHREAD ) { // thread set to off; no rout table collector started (could be called by the rtc thread itself)
- ctx->rtable = rt_clone_space( NULL, NULL, 0 ); // creates an empty route table so that wormholes still can be used
+ ctx->rtable = rt_clone_space( NULL, NULL, 0 ); // create an empty route table so that wormhole/rts calls can be used
+ if( flags & RMRFL_NOTHREAD ) { // no thread prevents the collector start for very special cases
+ ctx->rtable_ready = 1; // route based sends will always fail, but rmr is ready for the non thread case
} else {
+ ctx->rtable_ready = 0; // no sends until a real route table is loaded in the rtc thread
+
if( static_rtc ) {
+ rmr_vlog( RMR_VL_INFO, "rmr_init: file based route table only for context on port %s\n", uproto_port );
if( pthread_create( &ctx->rtc_th, NULL, rtc_file, (void *) ctx ) ) { // kick the rt collector thread as just file reader
rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
}
} else {
+ rmr_vlog( RMR_VL_INFO, "rmr_init: dynamic route table for context on port %s\n", uproto_port );
if( pthread_create( &ctx->rtc_th, NULL, rtc, (void *) ctx ) ) { // kick the real rt collector thread
rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
}
without drastically changing anything. The user should invoke with RMRFL_NONE to
avoid any misbehavour as there are internal flags which are suported
*/
-extern void* rmr_init( char* uproto_port, int max_msg_size, int flags ) {
- return init( uproto_port, max_msg_size, flags & UFL_MASK ); // ensure any internal flags are off
+extern void* rmr_init( char* uproto_port, int def_msg_size, int flags ) {
+ return init( uproto_port, def_msg_size, flags & UFL_MASK ); // ensure any internal flags are off
}
/*
return FALSE;
}
- if( ctx->rtable != NULL ) {
- return TRUE;
- }
-
- return FALSE;
+ return ctx->rtable_ready;
}
/*
*/
extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
uta_ctx_t* ctx;
- uta_mhdr_t* hdr; // header in the transport buffer
chute_t* chute;
struct timespec ts; // time info if we have a timeout
long new_ms; // adjusted mu-sec
if( max_wait == 0 ) { // one shot poll; handle wihtout sem check as that is SLOW!
if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued
+ clock_gettime( CLOCK_REALTIME, &ts ); // pass current time as expriry time
+ sem_timedwait( &chute->barrier, &ts ); // must pop the count (ring is locking so if we got a message we can pop)
if( ombuf ) {
rmr_free_msg( ombuf ); // can't reuse, caller's must be trashed now
}
}
}
+ if( mbuf != NULL ) {
+ mbuf->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src
+ }
+
return mbuf;
}
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " mt_rcv extracting from normal ring\n" );
if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued
mbuf->state = RMR_OK;
+ mbuf->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src
if( ombuf ) {
rmr_free_msg( ombuf ); // we cannot reuse as mbufs are queued on the ring
return mbuf;
}
-/*
- Accept a message buffer and caller ID, send the message and then wait
- for the receiver to tickle the semaphore letting us know that a message
- has been received. The call_id is a value between 2 and 255, inclusive; if
- it's not in this range an error will be returned. Max wait is the amount
- of time in millaseconds that the call should block for. If 0 is given
- then no timeout is set.
- If the mt_call feature has not been initialised, then the attempt to use this
- funciton will fail with RMR_ERR_NOTSUPP
- If no matching message is received before the max_wait period expires, a
- nil pointer is returned, and errno is set to ETIMEOUT. If any other error
- occurs after the message has been sent, then a nil pointer is returned
- with errno set to some other value.
+
+/*
+ This is the work horse for the multi-threaded call() function. It supports
+ both the rmr_mt_call() and the rmr_wormhole wh_call() functions. See the description
+ for for rmr_mt_call() modulo the caveat below.
+
+ If endpoint is given, then we assume that we're not doing normal route table
+ routing and that we should send directly to that endpoint (probably worm
+ hole).
*/
-extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
+static rmr_mbuf_t* mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait, endpoint_t* ep ) {
rmr_mbuf_t* ombuf; // original mbuf passed in
uta_ctx_t* ctx;
uta_mhdr_t* hdr; // header in the transport buffer
return mbuf;
}
- if( call_id > MAX_CALL_ID || call_id < 2 ) { // 0 and 1 are reserved; user app cannot supply them
- mbuf->state = RMR_ERR_BADARG;
- mbuf->tp_state = errno;
- return mbuf;
- }
-
ombuf = mbuf; // save to return timeout status with
chute = &ctx->chutes[call_id];
seconds = 1; // use as flag later to invoked timed wait
}
- mbuf = mtosend_msg( ctx, mbuf, 0 ); // use internal function so as not to strip call-id; should be nil on success!
+ if( ep == NULL ) { // normal routing
+ mbuf = mtosend_msg( ctx, mbuf, 0 ); // use internal function so as not to strip call-id; should be nil on success!
+ } else {
+ mbuf = send_msg( ctx, mbuf, ep->nn_sock, -1 );
+ }
if( mbuf ) {
if( mbuf->state != RMR_OK ) {
mbuf->tp_state = errno;
}
mbuf = chute->mbuf;
- mbuf->state = RMR_OK;
+ if( mbuf != NULL ) {
+ mbuf->state = RMR_OK;
+ }
chute->mbuf = NULL;
return mbuf;
}
+/*
+ Accept a message buffer and caller ID, send the message and then wait
+ for the receiver to tickle the semaphore letting us know that a message
+ has been received. The call_id is a value between 2 and 255, inclusive; if
+ it's not in this range an error will be returned. Max wait is the amount
+ of time in millaseconds that the call should block for. If 0 is given
+ then no timeout is set.
+
+ If the mt_call feature has not been initialised, then the attempt to use this
+ funciton will fail with RMR_ERR_NOTSUPP
+
+ If no matching message is received before the max_wait period expires, a
+ nil pointer is returned, and errno is set to ETIMEOUT. If any other error
+ occurs after the message has been sent, then a nil pointer is returned
+ with errno set to some other value.
+
+ This is now just an outward facing wrapper so we can support wormhole calls.
+*/
+extern rmr_mbuf_t* rmr_mt_call( void* vctx, rmr_mbuf_t* mbuf, int call_id, int max_wait ) {
+
+ // must vet call_id here, all others vetted by workhorse mt_call() function
+ if( call_id > MAX_CALL_ID || call_id < 2 ) { // 0 and 1 are reserved; user app cannot supply them
+ if( mbuf != NULL ) {
+ mbuf->state = RMR_ERR_BADARG;
+ mbuf->tp_state = EINVAL;
+ }
+ return mbuf;
+ }
+
+ return mt_call( vctx, mbuf, call_id, max_wait, NULL );
+}
+
+
/*
Given an existing message buffer, reallocate the payload portion to
be at least new_len bytes. The message header will remain such that