The allocated len stored in the msg is:
transport header length +
- message header +
- user requested payload
+ message header +
+ user requested payload
The msg header is a combination of the fixed RMR header and the variable
trace data and d2 fields which may vary for each message.
if( !mbuf->ring || ! uta_ring_insert( mbuf->ring, mbuf ) ) { // just queue, free if ring is full
if( mbuf->tp_buf ) {
free( mbuf->tp_buf );
+ mbuf->tp_buf = NULL; // just in case user tries to reuse this mbuf; this will be an NPE
}
+
+ mbuf->cookie = 0; // should signal a bad mbuf (if not reallocated)
free( mbuf );
}
}
d1 = DATA1_ADDR( msg->header );
d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end
- }
+ }
return mtosend_msg( vctx, msg, max_to );
}
d1 = DATA1_ADDR( msg->header );
d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end
- }
+ }
return rmr_mtosend_msg( vctx, msg, -1 ); // retries < 0 uses default from ctx
}
/*
Return to sender allows a message to be sent back to the endpoint where it originated.
- In the SI world the file descriptor that was the source of the message is captured in
- the mbuffer and thus can be used to quickly find the target for an RTS call.
+ With SI95 it was thought that the return to sender would be along the same open conneciton
+ and thus no table lookup would be needed to open a 'reverse direction' path. However, for
+ applications sending at high message rates, returning responses on the same connection
+ causes major strife. Thus the decision was made to use the same method as NNG and just
+ open a second connection for reverse path.
- The source information in the message is used to select the socket on which to write
- the message rather than using the message type and round-robin selection. This
- should return a message buffer with the state of the send operation set. On success
- (state is RMR_OK, the caller may use the buffer for another receive operation), and on
- error it can be passed back to this function to retry the send if desired. On error,
- errno will liklely have the failure reason set by the nng send processing.
- The following are possible values for the state in the message buffer:
+ We will attempt to use the name in the received message to look up the endpoint. If
+ that failes, then we will write on the connection that the message arrived on as a
+ falback.
+
+ On success (state is RMR_OK, the caller may use the buffer for another receive operation),
+ and on error it can be passed back to this function to retry the send if desired. On error,
+ errno will liklely have the failure reason set by the nng send processing. The following
+ are possible values for the state in the message buffer:
Message states returned:
RMR_ERR_BADARG - argument (context or msg) was nil or invalid
failure. The value of errno might give a clue as to what is wrong.
CAUTION:
- Like send_msg(), this is non-blocking and will return the msg if there is an errror.
+ Like send_msg(), this is non-blocking and will return the msg if there is an error.
The caller must check for this and handle it properly.
*/
extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
-/*
- sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep, ctx->si_ctx ); // src is always used first for rts
+ sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep ); // always try src first
if( ! sock_ok ) {
-*/
- if( (nn_sock = msg->rts_fd) < 0 ) {
- if( HDR_VERSION( msg->header ) > 2 ) { // with ver2 the ip is there, try if src name not known
- //sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep, ctx->si_ctx );
- sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep );
- }
- if( ! sock_ok ) {
- msg->state = RMR_ERR_NOENDPT;
- return msg; // preallocated msg can be reused since not given back to nn
+ if( (nn_sock = msg->rts_fd) < 0 ) {
+ if( HDR_VERSION( msg->header ) > 2 ) { // with ver2 the ip is there, try if src name not known
+ sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep );
+ }
+ if( ! sock_ok ) {
+ msg->state = RMR_ERR_NOENDPT;
+ return msg;
+ }
}
}
+
msg->state = RMR_OK; // ensure it is clear before send
hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to
hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip ); // both the src host and src ip
case RMR_OK:
ep->scounts[EPSC_GOOD]++;
break;
-
+
case RMR_ERR_RETRY:
ep->scounts[EPSC_TRANS]++;
break;
If multi-threading call is turned on, this invokes that mechanism with the special call
id of 1 and a max wait of 1 second. If multi threaded call is not on, then the original
behavour (described below) is carried out. This is safe to use when mt is enabled, but
- the user app is invoking rmr_call() from only one thread, and the caller doesn't need
+ the user app is invoking rmr_call() from only one thread, and the caller doesn't need
a flexible timeout.
On timeout this function will return a nil pointer. If the original message could not
rmr_set_vlevel( RMR_VL_INFO ); // we WILL announce our version etc
if( ! announced ) {
- rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/e mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
+ rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/g mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
announced = 1;
}
ctx->send_retries = 1; // default is not to sleep at all; RMr will retry about 10K times before returning
ctx->d1_len = 4; // data1 space in header -- 4 bytes for now
ctx->max_ibm = max_msg_size < 1024 ? 1024 : max_msg_size; // larger than their request doesn't hurt
- ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + 64; // add in our header size and a bit of fudge
+ ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + TP_HDR_LEN + 64; // add in header size, transport hdr, and a bit of fudge
ctx->mring = uta_mk_ring( 4096 ); // message ring is always on for si
ctx->zcb_mring = uta_mk_ring( 128 ); // zero copy buffer mbuf ring to reduce malloc/free calls
if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
interface = "0.0.0.0";
}
-
+
snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port ); // FIXME -- si only supports 0.0.0.0 by default
if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
return NULL;
}
- if( flags & FL_NOTHREAD ) { // thread set to off; no rout table collector started (could be called by the rtc thread itself)
+ // finish all flag setting before threads to keep helgrind quiet
+ ctx->flags |= CFL_MTC_ENABLED; // for SI threaded receiver is the only way
+
+ if( flags & RMRFL_NOTHREAD ) { // thread set to off; no route table collector started (could be called by the rtc thread itself)
ctx->rtable = rt_clone_space( NULL, NULL, 0 ); // creates an empty route table so that wormholes still can be used
} else {
if( static_rtc ) {
}
}
- ctx->flags |= CFL_MTC_ENABLED; // for SI threaded receiver is the only way
if( pthread_create( &ctx->mtc_th, NULL, mt_receive, (void *) ctx ) ) { // so kick it
rmr_vlog( RMR_VL_WARN, "rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
}
long nano_sec; // max wait xlated to nano seconds
int state;
rmr_mbuf_t* ombuf; // mbuf user passed; if we timeout we return state here
-
+
if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
errno = EINVAL;
if( mbuf ) {
if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued
if( ombuf ) {
rmr_free_msg( ombuf ); // can't reuse, caller's must be trashed now
- }
+ }
} else {
mbuf = ombuf; // return original if it was given with timeout status
if( ombuf != NULL ) {
}
}
+ mbuf->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src
return mbuf;
}
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, " mt_rcv extracting from normal ring\n" );
if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued
mbuf->state = RMR_OK;
+ mbuf->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src
if( ombuf ) {
rmr_free_msg( ombuf ); // we cannot reuse as mbufs are queued on the ring
long seconds = 0; // max wait seconds
long nano_sec; // max wait xlated to nano seconds
int state;
-
+
errno = EINVAL;
if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
if( mbuf ) {
rmr_free_msg( chute->mbuf );
chute->mbuf = NULL;
}
-
+
hdr = (uta_mhdr_t *) mbuf->header;
hdr->flags |= HFL_CALL_MSG; // must signal this sent with a call
memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID ); // xaction that we will wait for
mbuf->flags |= MFL_NOALLOC; // send message without allocating a new one (expect nil from mtosend
if( max_wait >= 0 ) {
- clock_gettime( CLOCK_REALTIME, &ts );
+ clock_gettime( CLOCK_REALTIME, &ts );
if( max_wait > 999 ) {
seconds = max_wait / 1000;
Given an existing message buffer, reallocate the payload portion to
be at least new_len bytes. The message header will remain such that
the caller may use the rmr_rts_msg() function to return a payload
- to the sender.
+ to the sender.
The mbuf passed in may or may not be reallocated and the caller must
- use the returned pointer and should NOT assume that it can use the
+ use the returned pointer and should NOT assume that it can use the
pointer passed in with the exceptions based on the clone flag.
If the clone flag is set, then a duplicated message, with larger payload
size, is allocated and returned. The old_msg pointer in this situation is
- still valid and must be explicitly freed by the application. If the clone
+ still valid and must be explicitly freed by the application. If the clone
message is not set (0), then any memory management of the old message is
handled by the function.
- If the copy flag is set, the contents of the old message's payload is
- copied to the reallocated payload. If the flag is not set, then the
+ If the copy flag is set, the contents of the old message's payload is
+ copied to the reallocated payload. If the flag is not set, then the
contents of the payload is undetermined.
*/
extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {