#ifndef _sr_si_static_c
#define _sr_si_static_c
-static void dump_40( char *p, char* label ) {
+static void dump_n( char *p, char* label, int n ) {
int i;
+ int j;
+ int t = 0;
+ int rows;
- if( label )
- fprintf( stderr, ">>>>> %s p=%p\n", label, p );
- for( i = 0; i < 40; i++ ) {
- fprintf( stderr, "%02x ", (unsigned char) *(p+i) );
+ if( label ) {
+ fprintf( stderr, "[DUMP] %s p=%p %d bytes\n", label, p, n );
}
- fprintf( stderr, "\n" );
+
+ rows = (n/16) + ((n % 16) ? 1 : 0);
+
+ for( j = 0; j < rows; j++ ) {
+ fprintf( stderr, "[DUMP] %04x: ", j * 16 );
+
+ for( i = 0; t < n && i < 16; i++, t++ ) {
+ fprintf( stderr, "%02x ", (unsigned char) *p );
+ p++;
+ }
+ fprintf( stderr, "\n" );
+ }
+}
+
+/*
+ backwards compatability.
+*/
+static void dump_40( char *p, char* label ) {
+ dump_n( p, label, 40 );
}
/*
The addition of the connection shut error code to the switch requires
that the NNG version at commit e618abf8f3db2a94269a (or after) be
- used for compiling RMR.
+ used for compiling RMR.
*/
static inline int xlate_si_state( int state, int def_state ) {
return state;
}
+/*
+ Given a message size and a buffer (assumed to be TP_SZFIELD_LEN or larger)
+ this will put in the size such that it is compatable with old versions
+ of RMR (that expect the message size to not be in network byte order)
+ and with new versions that do. See extract function in mt_call_si_static.c
+ for details on what ends up in the buffer.
+*/
+static inline void insert_mlen( uint32_t len, char* buf ) {
+ uint32_t* blen; // pointer into buffer where we'll add the len
+
+ blen = (uint32_t *) buf; // old systems expect an unconverted integer
+ *blen = len;
+
+ blen++;
+ *blen = htonl( len ); // new systems want a converted integer
+
+ buf[TP_SZFIELD_LEN-1] = TP_SZ_MARKER; // marker to flag this is generated by a new message
+}
+
/*
Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
a new message struct as well. Size is the size of the zc buffer to allocate (not
msg->alloc_len = 0; // force tp_buffer realloc below
if( msg->tp_buf ) {
free( msg->tp_buf );
+ msg->tp_buf = NULL;
}
} else {
mlen = msg->alloc_len; // msg given, allocate the same size as before
abort( ); // toss out a core file for this
}
-/*
- memset( msg->tp_buf, 0, mlen ); // NOT for production (debug only) valgrind will complain about uninitalised use if we don't set
- memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 ); // NOT for production -- debugging eyecatcher
-*/
- alen = (int *) msg->tp_buf;
- *alen = mlen; // FIX ME: need a stuct to go in these first bytes, not just dummy len
+ if( DEBUG ) {
+ // for speed we don't do this in production; for testing valgrind will complain about uninitialised use if not set
+ memset( msg->tp_buf, 0, mlen );
+ memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!==", 34 ); // do NOT use a $ in this string!
+ }
+
+ insert_mlen( (uint32_t) mlen, msg->tp_buf ); // this will likely be overwriten on send to shirnk
msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
memset( msg->header, 0, sizeof( uta_mhdr_t ) ); // ensure no junk in the header area
//SET_HDR_D2_LEN( hdr, ctx->d2_len ); // future
}
msg->len = 0; // length of data in the payload
+ msg->cookie = 0x4942;
msg->alloc_len = mlen; // length of allocated transport buffer (caller size + rmr header)
msg->sub_id = UNSET_SUBID;
msg->mtype = UNSET_MSGTYPE;
msg->payload = PAYLOAD_ADDR( hdr ); // point to payload (past all header junk)
msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area
msg->state = state; // fill in caller's state (likely the state of the last operation)
- msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
+ msg->flags = MFL_ZEROCOPY; // this is a zerocopy sendable message
msg->ring = ctx->zcb_mring; // original msg_free() api doesn't get context so must dup on eaach :(
strncpy( (char *) ((uta_mhdr_t *)msg->header)->src, ctx->my_name, RMR_MAX_SRC );
strncpy( (char *) ((uta_mhdr_t *)msg->header)->srcip, ctx->my_ip, RMR_MAX_SRC );
memset( msg, 0, sizeof( *msg ) );
+ msg->cookie = 0x4942;
msg->sub_id = UNSET_SUBID;
msg->mtype = UNSET_MSGTYPE;
msg->tp_buf = NULL;
nm->len = old_msg->len; // length of data in the payload
nm->alloc_len = mlen; // length of allocated payload
- nm->xaction = hdr->xid; // reference xaction
+ nm->xaction = &hdr->xid[0]; // reference xaction
nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message
memcpy( nm->payload, old_msg->payload, old_msg->len );
rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
exit( 1 );
}
- memset( nm->tp_buf, 0, tpb_len );
- memcpy( nm->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 ); // DEBUGGING
- alen = (int *) nm->tp_buf;
- *alen = tpb_len; // FIX ME: need a stuct to go in these first bytes, not just dummy len
+ if( DEBUG ) {
+ memset( nm->tp_buf, 0, tpb_len );
+ memcpy( nm->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!==", 34 ); // DEBUGGING do NOT use $ in this string!!
+ }
+
+ insert_mlen( (uint32_t) tpb_len, nm->tp_buf ); // this len will likely be reset on send to shrink
nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
nm->len = old_msg->len; // length of data in the payload
nm->alloc_len = mlen; // length of allocated payload
- nm->xaction = hdr->xid; // reference xaction
+ nm->xaction = &hdr->xid[0]; // reference xaction
nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message
memcpy( nm->payload, old_msg->payload, old_msg->len );
}
/*
- Realloc the message such that the payload is at least payload_len bytes.
+ Realloc the message such that the payload is at least payload_len bytes.
The clone and copy options affect what portion of the original payload is copied to
the reallocated message, and whether or not the original payload is lost after the
reallocation process has finished.
clone == false
The old payload will be lost after reallocation. The message buffer pointer which
is returned will likely reference the same structure (don't depend on that).
-
+
CAUTION:
If the message is not a message which was received, the mtype, sub-id, length values in the
omhdr = old_msg->header;
mlen = hdr_len + (payload_len > old_psize ? payload_len : old_psize); // must have larger in case copy is true
- if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "reallocate for payload increase. new message size: %d\n", (int) mlen );
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "reallocate for payload increase. new message size: %d\n", (int) mlen );
if( (nm->tp_buf = (char *) malloc( sizeof( char ) * mlen )) == NULL ) {
rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for zero copy buffer. bytes requested: %d\n", (int) mlen );
+ free( nm );
return NULL;
}
return NULL;
}
-/*
- Receives a 'raw' message from a non-RMr sender (no header expected). The returned
- message buffer cannot be used to send, and the length information may or may
- not be correct (it is set to the length received which might be more than the
- bytes actually in the payload).
-
- Mostly this supports the route table collector, but could be extended with an
- API external function.
-*/
-static void* rcv_payload( uta_ctx_t* ctx, rmr_mbuf_t* old_msg ) {
- return NULL;
-/*
-FIXME: do we need this in the SI world? The only user was the route table collector
- int state;
- rmr_mbuf_t* msg = NULL; // msg received
- size_t rsize; // nng needs to write back the size received... grrr
-
- if( old_msg ) {
- msg = old_msg;
- } else {
- msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK, DEF_TR_LEN ); // will abort on failure, no need to check
- }
-
- //msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS ); // blocks hard until received
- if( (msg->state = xlate_si_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
- return msg;
- }
- rsize = nng_msg_len( msg->tp_buf );
-
- // do NOT use ref_tpbuf() here! Must fill these in manually.
- msg->header = nng_msg_body( msg->tp_buf );
- msg->len = rsize; // len is the number of bytes received
- msg->alloc_len = rsize;
- msg->mtype = UNSET_MSGTYPE; // raw message has no type
- msg->sub_id = UNSET_SUBID; // nor a subscription id
- msg->state = RMR_OK;
- msg->flags = MFL_RAW;
- msg->payload = msg->header; // payload is the whole thing; no header
- msg->xaction = NULL;
-
- if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "rcv_payload: got something: type=%d state=%d len=%d\n", msg->mtype, msg->state, msg->len );
-
- return msg;
-*/
-}
-
/*
This does the hard work of actually sending the message to the given socket. On success,
a new message struct is returned. On error, the original msg is returned with the state
Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
validation has been done prior.
- When msg->state is not ok, this function must set tp_state in the message as some API
+ When msg->state is not ok, this function must set tp_state in the message as some API
fucntions return the message directly and do not propigate errno into the message.
*/
static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int retries ) {
msg->state = RMR_OK;
do {
tot_len = msg->len + PAYLOAD_OFFSET( hdr ) + TP_HDR_LEN; // we only send what was used + header lengths
- *((int*) msg->tp_buf) = tot_len;
+ if( tot_len > msg->alloc_len ) {
+ tot_len = msg->alloc_len; // likely bad length from user :(
+ }
+ insert_mlen( tot_len, msg->tp_buf ); // shrink to fit
if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg: ending %d (%x) bytes usr_len=%d alloc=%d retries=%d\n", tot_len, tot_len, msg->len, msg->alloc_len, retries );
if( DEBUG > 2 ) dump_40( msg->tp_buf, "sending" );
message type is used. If the initial lookup, with a subid, fails, then a
second lookup using just the mtype is tried.
- When msg->state is not OK, this function must set tp_state in the message as
- some API fucntions return the message directly and do not propigate errno into
+ When msg->state is not OK, this function must set tp_state in the message as
+ some API fucntions return the message directly and do not propigate errno into
the message.
CAUTION: this is a non-blocking send. If the message cannot be sent, then
send_again = 1; // force loop entry
group = 0; // always start with group 0
while( send_again ) {
- sock_ok = uta_epsock_rr( ctx, rte, group, &send_again, &nn_sock, &ep ); // select endpt from rr group and set again if more groups
+ if( rte->nrrgroups > 0 ) { // this is a round robin entry if groups are listed
+ sock_ok = uta_epsock_rr( ctx, rte, group, &send_again, &nn_sock, &ep ); // select endpt from rr group and set again if more groups
+ } else {
+ sock_ok = epsock_meid( ctx, ctx->rtable, msg, &nn_sock, &ep );
+ send_again = 0;
+ }
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
msg->flags |= MFL_NOALLOC; // keep send from allocating a new message; we have a clone to use
msg = send_msg( ctx, msg, nn_sock, max_to ); // do the hard work, msg should be nil on success
-
+
if( msg != NULL ) { // returned message indicates send error of some sort
rmr_free_msg( msg ); // must ditchone; pick msg so we don't have to unfiddle flags
msg = clone_m;
} else {
ok_sends++;
msg = clone_m; // clone will be the next to send
+ msg->state = RMR_OK;
}
} else {
msg = send_msg( ctx, msg, nn_sock, max_to ); // send the last, and allocate a new buffer; drops the clone if it was
if( DEBUG ) {
if( msg == NULL ) {
- rmr_vlog( RMR_VL_DEBUG, "mtosend_msg: send returned nil message!\n" );
+ rmr_vlog( RMR_VL_DEBUG, "mtosend_msg: send returned nil message!\n" );
}
}
}
case RMR_OK:
ep->scounts[EPSC_GOOD]++;
break;
-
+
case RMR_ERR_RETRY:
ep->scounts[EPSC_TRANS]++;
break;
if( ok_sends ) { // multiple rr-groups and one was successful; report ok
msg->state = RMR_OK;
}
-
+
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "final send stats: ok=%d group=%d state=%d\n", ok_sends, group, msg->state );
-
+
msg->tp_state = errno;
}