nng messages.
*/
static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state ) {
- size_t mlen;
+ size_t mlen; // size of the transport buffer that we'll allocate
uta_mhdr_t* hdr; // convenience pointer
- mlen = sizeof( uta_mhdr_t ); // figure size should we not have a msg buffer
+ mlen = sizeof( uta_mhdr_t ) + ctx->trace_data_len + ctx->d1_len + ctx->d2_len; // start with header and trace/data lengths
mlen += (size > 0 ? size : ctx->max_plen); // add user requested size or size set during init
if( msg == NULL ) {
}
msg->header = nng_msg_body( msg->tp_buf );
+ memset( msg->header, 0, sizeof( uta_mhdr_t ) ); // ensure no junk in the header area
if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
- hdr->rmr_ver = RMR_MSG_VER; // version info should we need to recognised old style messages someday
+ hdr->rmr_ver = htonl( RMR_MSG_VER ); // set current version
+ SET_HDR_LEN( hdr ); // ensure these are converted to net byte order
+ SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
+ //SET_HDR_D1_LEN( hdr, ctx->d1_len ); // no need until we start using them
+ //SET_HDR_D2_LEN( hdr, ctx->d2_len );
}
msg->len = 0; // length of data in the payload
- msg->alloc_len = mlen; // length of allocated payload
- msg->payload = msg->header + sizeof( uta_mhdr_t ); // point past header to payload (single buffer allocation above)
+ msg->alloc_len = mlen; // length of allocated transport buffer
+ msg->payload = msg->header + PAYLOAD_OFFSET( hdr ); // past header, trace and other data bits
msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area
msg->state = state; // fill in caller's state (likely the state of the last operation)
msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
The alen parm is the assumed allocated length; assumed because it's a value likely
to have come from nng receive and the actual alloc len might be larger, but we
can only assume this is the total usable space.
+
+ This function returns the message with an error state set if it detects that the
+ received message might have been truncated. Check is done here as the calculation
+ is somewhat based on header version.
*/
static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen ) {
- uta_mhdr_t* hdr;
+ uta_mhdr_t* hdr; // current header
+ uta_v1mhdr_t* v1hdr; // version 1 header
+ int ver;
+ int hlen; // header len to use for a truncation check
msg->header = nng_msg_body( msg->tp_buf ); // header is the start of the transport buffer
+ v1hdr = (uta_v1mhdr_t *) msg->header; // v1 will always allow us to suss out the version
- hdr = (uta_mhdr_t *) msg->header;
- hdr->rmr_ver = RMR_MSG_VER; // version info should we need to recognised old style messages someday
- msg->len = ntohl( hdr->plen ); // length sender says is in the payload (received length could be larger)
- msg->alloc_len = alen; // length of whole tp buffer (including header)
- msg->payload = msg->header + sizeof( uta_mhdr_t ); // point past header to payload (single buffer allocation above)
- msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area
- msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
- msg->mtype = ntohl( hdr->mtype ); // capture and convert from network order to local order
- msg->state = RMR_OK;
+ if( v1hdr->rmr_ver == 1 ) { // bug in verion 1 didn't encode the version in network byte order
+ ver = 1;
+ v1hdr->rmr_ver = htonl( 1 ); // save it correctly in case we clone the message
+ } else {
+ ver = ntohl( v1hdr->rmr_ver );
+ }
+
+ switch( ver ) {
+ case 1:
+ msg->len = ntohl( v1hdr->plen ); // length sender says is in the payload (received length could be larger)
+ msg->alloc_len = alen; // length of whole tp buffer (including header, trace and data bits)
+ msg->payload = msg->header + sizeof( uta_v1mhdr_t ); // point past header to payload (single buffer allocation above)
+
+ msg->xaction = &v1hdr->xid[0]; // point at transaction id in header area
+ msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
+ msg->mtype = ntohl( v1hdr->mtype ); // capture and convert from network order to local order
+ msg->state = RMR_OK;
+ hlen = sizeof( uta_v1mhdr_t );
+ break;
+
+ default: // current version always lands here
+ hdr = (uta_mhdr_t *) msg->header;
+ msg->len = ntohl( hdr->plen ); // length sender says is in the payload (received length could be larger)
+ msg->alloc_len = alen; // length of whole tp buffer (including header, trace and data bits)
+
+ msg->payload = msg->header + PAYLOAD_OFFSET( hdr ); // past header, trace and other data bits
+ msg->xaction = &hdr->xid[0]; // point at transaction id in header area
+ msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
+ msg->mtype = ntohl( hdr->mtype ); // capture and convert from network order to local order
+ hlen = RMR_HDR_LEN( hdr ); // len to use for truncated check later
+ break;
+ }
+
+ if( msg->len > (msg->alloc_len - hlen ) ) { // more than we should have had room for; error
+ msg->state = RMR_ERR_TRUNC;
+ msg->len = msg->alloc_len - hlen; // adjust len down so user app doesn't overrun
+ } else {
+ msg->state = RMR_OK;
+ }
}
/*
rmr_mbuf_t* nm; // new message buffer
size_t mlen;
int state;
+ uta_mhdr_t* hdr;
+ uta_v1mhdr_t* v1hdr;
nm = (rmr_mbuf_t *) malloc( sizeof *nm );
if( nm == NULL ) {
exit( 1 );
}
- nm->header = nng_msg_body( nm->tp_buf );
+ nm->header = nng_msg_body( nm->tp_buf ); // set and copy the header from old message
+ v1hdr = (uta_v1mhdr_t *) old_msg->header; // v1 will work to dig header out of any version
+ switch( ntohl( v1hdr->rmr_ver ) ) {
+ case 1:
+ memcpy( v1hdr, old_msg->header, sizeof( *v1hdr ) ); // copy complete header
+ nm->payload = (void *) v1hdr + sizeof( *v1hdr );
+ break;
+
+ default: // current message always caught here
+ hdr = nm->header;
+ memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) ); // copy complete header, trace and other data
+ nm->payload = nm->header + PAYLOAD_OFFSET( hdr ); // point at the payload
+ break;
+ }
+
+ // --- these are all version agnostic -----------------------------------
nm->mtype = old_msg->mtype;
nm->len = old_msg->len; // length of data in the payload
nm->alloc_len = mlen; // length of allocated payload
- nm->payload = nm->header + sizeof( uta_mhdr_t ); // point past header to payload (single buffer allocation above)
- nm->xaction = ((uta_mhdr_t *)nm->header)->xid; // point at transaction id in header area
+
+ nm->xaction = hdr->xid; // reference xaction
nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
nm->flags = old_msg->flags | MFL_ZEROCOPY; // this is a zerocopy sendable message
-
- memcpy( ((uta_mhdr_t *)nm->header)->src, ((uta_mhdr_t *)old_msg->header)->src, RMR_MAX_SID );
memcpy( nm->payload, old_msg->payload, old_msg->len );
return nm;
msg->tp_buf = NULL;
} else {
- //msg = alloc_zcmsg( ctx, NULL, RMR_MAX_RCV_BYTES, RMR_OK ); // will abort on failure, no need to check
msg = alloc_mbuf( ctx, RMR_OK ); // msg without a transport buffer
}
+ msg->alloc_len = 0;
msg->len = 0;
msg->payload = NULL;
msg->xaction = NULL;
- //rsize = msg->alloc_len; // set to max, and we'll get len back here too
- //msg->state = nng_recv( ctx->nn_sock, msg->header, &rsize, NO_FLAGS ); // total space (header + payload len) allocated
msg->state = nng_recvmsg( ctx->nn_sock, (nng_msg **) &msg->tp_buf, NO_FLAGS ); // blocks hard until received
if( (msg->state = xlate_nng_state( msg->state, RMR_ERR_RCVFAILED )) != RMR_OK ) {
return msg;
}
rsize = nng_msg_len( msg->tp_buf );
- if( rsize >= sizeof( uta_mhdr_t ) ) { // we need at least a full header here
-
- ref_tpbuf( msg, rsize ); // point payload, header etc to the just received tp buffer
+ if( rsize >= sizeof( uta_v1mhdr_t ) ) { // we need at least a full type 1 (smallest) header here
+ ref_tpbuf( msg, rsize ); // point payload, header etc to the data and set trunc error if needed
hdr = (uta_mhdr_t *) msg->header;
- msg->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src
- if( msg->len > (msg->alloc_len - sizeof( uta_mhdr_t )) ) { // way more than we should have had room for; error
- msg->state = RMR_ERR_TRUNC;
- }
+ msg->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src
+
if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n",
msg->mtype, msg->state, msg->len, msg->payload - (unsigned char *) msg->header );
} else {
- msg->len = 0;
msg->state = RMR_ERR_EMPTY;
+ msg->len = 0;
+ msg->alloc_len = rsize;
+ msg->payload = NULL;
+ msg->xaction = NULL;
+ msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
+ msg->mtype = -1;
}
return msg;
errno = 0;
msg->state = RMR_OK;
if( msg->flags & MFL_ZEROCOPY ) { // faster sending with zcopy buffer
- //nng_flags |= NNG_FLAG_ALLOC; // indicate a zc buffer that nng is expected to free
-
do {
if( (state = nng_sendmsg( nn_sock, (nng_msg *) msg->tp_buf, nng_flags )) != 0 ) { // must check and retry some if transient failure
msg->state = state;
}
} while( state && retries > 0 );
} else {
+ // future: this should not happen as all buffers we deal with are zc buffers; might make sense to remove the test and else
msg->state = RMR_ERR_SENDFAILED;
errno = ENOTSUP;
return msg;
msg->state = RMR_ERR_RETRY; // errno will have nano reason
} else {
msg->state = xlate_nng_state( msg->state, RMR_ERR_SENDFAILED ); // xlate to our state and set errno
- //errno = -msg->state;
- //msg->state = RMR_ERR_SENDFAILED; // errno will have nano reason
}
if( DEBUG ) fprintf( stderr, "[DBUG] send failed: %d %s\n", (int) msg->state, strerror( msg->state ) );