hdr->sub_id = htonl( UNSET_SUBID );
SET_HDR_LEN( hdr ); // ensure these are converted to net byte order
SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
- //SET_HDR_D1_LEN( hdr, ctx->d1_len ); // no need until we start using them
- //SET_HDR_D2_LEN( hdr, ctx->d2_len );
+ SET_HDR_D1_LEN( hdr, ctx->d1_len );
+ //SET_HDR_D2_LEN( hdr, ctx->d2_len ); // future
}
msg->len = 0; // length of data in the payload
msg->alloc_len = mlen; // length of allocated transport buffer
msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area
msg->state = state; // fill in caller's state (likely the state of the last operation)
msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
- strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID );
+ strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
+ strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
if( DEBUG > 1 ) fprintf( stderr, "[DBUG] alloc_zcmsg mlen=%ld size=%d mpl=%d flags=%02x\n", (long) mlen, size, ctx->max_plen, msg->flags );
is somewhat based on header version.
*/
static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen ) {
- uta_mhdr_t* hdr; // current header
+ uta_mhdr_t* hdr = NULL; // current header
uta_v1mhdr_t* v1hdr; // version 1 header
int ver;
int hlen; // header len to use for a truncation check
break;
}
- if( msg->len > (msg->alloc_len - hlen ) ) { // more than we should have had room for; error
+ if( msg->len > (msg->alloc_len - hlen ) ) {
msg->state = RMR_ERR_TRUNC;
msg->len = msg->alloc_len - hlen; // adjust len down so user app doesn't overrun
} else {
uta_mhdr_t* hdr;
uta_v1mhdr_t* v1hdr;
int tr_old_len; // tr size in new buffer
- int coffset; // an offset to something in the header for copy
nm = (rmr_mbuf_t *) malloc( sizeof *nm );
if( nm == NULL ) {
nm->payload = (void *) v1hdr + sizeof( *v1hdr );
break;
- default: // current message always caught here
+ default: // current message version always caught here
hdr = nm->header;
- memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) ); // ONLY copy the header portion; trace and data might have changed
- if( RMR_D1_LEN( hdr ) ) {
- coffset = DATA1_OFFSET( hdr ); // offset to d1
- memcpy( hdr + coffset, old_msg->header + coffset, RMR_D1_LEN( hdr ) ); // copy data1 and data2 if necessary
+ memcpy( hdr, old_msg->header, sizeof( uta_mhdr_t ) ); // ONLY copy the header portion; trace and data offsets might have changed
+ SET_HDR_TR_LEN( hdr, tr_len ); // must adjust trace len in new message before copy
+ if( RMR_D1_LEN( hdr ) ) {
+ memcpy( DATA1_ADDR( hdr ), DATA1_ADDR( old_msg->header), RMR_D1_LEN( hdr ) ); // copy data1 and data2 if necessary
}
if( RMR_D2_LEN( hdr ) ) {
- coffset = DATA2_OFFSET( hdr ); // offset to d2
- memcpy( hdr + coffset, old_msg->header + coffset, RMR_D2_LEN( hdr ) ); // copy data2 and data2 if necessary
+ memcpy( DATA2_ADDR( hdr ), DATA2_ADDR( old_msg->header), RMR_D2_LEN( hdr ) );
}
- SET_HDR_TR_LEN( hdr, tr_len ); // MUST set before pointing payload
nm->payload = PAYLOAD_ADDR( hdr ); // directly at the payload
- SET_HDR_TR_LEN( hdr, tr_len ); // do NOT copy old trace data, just set the new header
break;
}
msg->len = 0;
msg->payload = NULL;
msg->xaction = NULL;
+ msg->tp_buf = NULL;
msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS ); // blocks hard until received
if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
int state;
uta_mhdr_t* hdr;
int nng_flags = NNG_FLAG_NONBLOCK; // if we need to set any nng flags (zc buffer) add it to this
- int spin_retries = 1000; // if eagain/timeout we'll spin this many times before giving up the CPU
+ int spin_retries = 1000; // if eagain/timeout we'll spin, at max, this many times before giving up the CPU
int tr_len; // trace len in sending message so we alloc new message with same trace size
// future: ensure that application did not overrun the XID buffer; last byte must be 0
tr_len = RMR_TR_LEN( hdr ); // snarf trace len before sending as hdr is invalid after send
if( msg->flags & MFL_ADDSRC ) { // buffer was allocated as a receive buffer; must add our source
- strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SID ); // must overlay the source to be ours
+ strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC ); // must overlay the source to be ours
+ strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
}
errno = 0;
if( retries > 0 && (state == NNG_EAGAIN || state == NNG_ETIMEDOUT) ) {
if( --spin_retries <= 0 ) { // don't give up the processor if we don't have to
retries--;
- usleep( 1 ); // sigh, give up the cpu and hope it's just 1 miscrosec
+ if( retries > 0 ) { // only if we'll loop through again
+ usleep( 1 ); // sigh, give up the cpu and hope it's just 1 miscrosec
+ }
spin_retries = 1000;
}
} else {
return msg;
}
+/*
+ send message with maximum timeout.
+ Accept a message and send it to an endpoint based on message type.
+ If NNG reports that the send attempt timed out, or should be retried,
+ RMr will retry for approximately max_to microseconds; rounded to the next
+ higher value of 10.
+
+ Allocates a new message buffer for the next send. If a message type has
+ more than one group of endpoints defined, then the message will be sent
+ in round robin fashion to one endpoint in each group.
+
+ An endpoint will be looked up in the route table using the message type and
+ the subscription id. If the subscription id is "UNSET_SUBID", then only the
+ message type is used. If the initial lookup, with a subid, fails, then a
+ second lookup using just the mtype is tried.
+
+ CAUTION: this is a non-blocking send. If the message cannot be sent, then
+ it will return with an error and errno set to eagain. If the send is
+ a limited fanout, then the returned status is the status of the last
+ send attempt.
+
+*/
+static rmr_mbuf_t* mtosend_msg( void* vctx, rmr_mbuf_t* msg, int max_to ) {
+ nng_socket nn_sock; // endpoint socket for send
+ uta_ctx_t* ctx;
+ int group; // selected group to get socket for
+ int send_again; // true if the message must be sent again
+ rmr_mbuf_t* clone_m; // cloned message for an nth send
+ int sock_ok; // got a valid socket from round robin select
+ uint64_t key; // mtype or sub-id/mtype sym table key
+ int altk_ok = 0; // set true if we can lookup on alternate key if mt/sid lookup fails
+ char* d1;
+
+ if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
+ errno = EINVAL; // if msg is null, this is their clue
+ if( msg != NULL ) {
+ msg->state = RMR_ERR_BADARG;
+ errno = EINVAL; // must ensure it's not eagain
+ }
+ return msg;
+ }
+
+ errno = 0; // clear; nano might set, but ensure it's not left over if it doesn't
+ if( msg->header == NULL ) {
+ fprintf( stderr, "rmr_send_msg: ERROR: message had no header\n" );
+ msg->state = RMR_ERR_NOHDR;
+ errno = EBADMSG; // must ensure it's not eagain
+ return msg;
+ }
+
+ if( max_to < 0 ) {
+ max_to = ctx->send_retries; // convert to retries
+ }
+
+ send_again = 1; // force loop entry
+ group = 0; // always start with group 0
+
+ key = build_rt_key( msg->sub_id, msg->mtype ); // route table key to find the entry
+ if( msg->sub_id != UNSET_SUBID ) {
+ altk_ok = 1; // if caller's sub-id doesn't hit with mtype, allow mtype only key for retry
+ }
+ while( send_again ) {
+ sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock ); // round robin sel epoint; again set if mult groups
+ if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d ak_ok=%d\n",
+ msg->mtype, send_again, group, msg->len, sock_ok, altk_ok );
+
+ if( ! sock_ok ) {
+ if( altk_ok ) { // we can try with the alternate (no sub-id) key
+ altk_ok = 0;
+ key = build_rt_key( UNSET_SUBID, msg->mtype ); // build with just the mtype and try again
+ send_again = 1; // ensure we don't exit the while
+ continue;
+ }
+
+ msg->state = RMR_ERR_NOENDPT;
+ errno = ENXIO; // must ensure it's not eagain
+ return msg; // caller can resend (maybe) or free
+ }
+
+ group++;
+
+ if( send_again ) {
+ clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available
+ if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
+ msg->flags |= MFL_NOALLOC; // send should not allocate a new buffer
+ msg = send_msg( ctx, msg, nn_sock, max_to ); // do the hard work, msg should be nil on success
+ /*
+ if( msg ) {
+ // error do we need to count successes/errors, how to report some success, esp if last fails?
+ }
+ */
+
+ msg = clone_m; // clone will be the next to send
+ } else {
+ msg = send_msg( ctx, msg, nn_sock, max_to ); // send the last, and allocate a new buffer; drops the clone if it was
+ }
+ }
+
+ return msg; // last message caries the status of last/only send attempt
+}
+
+
/*
A generic wrapper to the real send to keep wormhole stuff agnostic.
We assume the wormhole function vetted the buffer so we don't have to.