*/
static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state ) {
int mlen;
+ uta_mhdr_t* hdr;
mlen = sizeof( uta_mhdr_t ); // figure size should we not have a msg buffer
mlen += (size > 0 ? size : ctx->max_plen); // add user requested size or size set during init
exit( 1 );
}
- ((uta_mhdr_t *) msg->header)->rmr_ver = RMR_MSG_VER; // version info should we need to recognised old style messages someday
+ hdr = (uta_mhdr_t *) msg->header;
+ hdr->rmr_ver = htonl( RMR_MSG_VER ); // current version
+ SET_HDR_LEN( hdr );
+ SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
+ //SET_HDR_D1_LEN( hdr, ctx->d1_len ); // moot until we actually need these data areas
+ //SET_HDR_D2_LEN( hdr, ctx->d1_len );
+
msg->len = 0; // length of data in the payload
msg->alloc_len = mlen; // length of allocated payload
- msg->payload = msg->header + sizeof( uta_mhdr_t ); // point past header to payload (single buffer allocation above)
+ msg->payload = msg->header + PAYLOAD_OFFSET( hdr ); // point at the payload in transport
msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area
msg->state = state; // fill in caller's state (likely the state of the last operation)
msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
exit( 1 );
}
+ memcpy( nm->header, old_msg->header, RMR_HDR_LEN( old_msg->header ) ); // copy complete header, trace and other data
+
nm->mtype = old_msg->mtype;
nm->len = old_msg->len; // length of data in the payload
nm->alloc_len = mlen; // length of allocated payload
- nm->payload = nm->header + sizeof( uta_mhdr_t ); // point past header to payload (single buffer allocation above)
+ nm->payload = nm->header + PAYLOAD_OFFSET( nm->header );
nm->xaction = ((uta_mhdr_t *)nm->header)->xid; // point at transaction id in header area
nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
nm->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
- memcpy( ((uta_mhdr_t *)nm->header)->src, ((uta_mhdr_t *)old_msg->header)->src, RMR_MAX_SID );
memcpy( nm->payload, old_msg->payload, old_msg->len );
return nm;
if( msg->state > (int) sizeof( uta_mhdr_t ) ) { // we need more than just a header here
hdr = (uta_mhdr_t *) msg->header;
msg->len = ntohl( hdr->plen ); // length of data in the payload (likely < payload size)
- if( msg->len > msg->state - sizeof( uta_mhdr_t ) ) {
- fprintf( stderr, "[WARN] rmr_rcv indicated payload length < rcvd payload: expected %d got %ld\n",
- msg->len, msg->state - sizeof( uta_mhdr_t ) );
+ if( msg->len > msg->state - RMR_HDR_LEN( hdr ) ) {
+ msg->state = RMR_ERR_TRUNC;
+ msg->len = msg->state - RMR_HDR_LEN( hdr );
}
msg->mtype = ntohl( hdr->mtype ); // capture and convert from network order to local order
msg->state = RMR_OK;
msg->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src
- msg->payload = msg->header + sizeof( uta_mhdr_t );
+ msg->payload = msg->header + PAYLOAD_OFFSET( msg->header );
msg->xaction = &hdr->xid[0]; // provide user with ref to fixed space xaction id
if( DEBUG > 1 ) fprintf( stderr, "[DBUG] rcv_msg: got something: type=%d state=%d len=%d diff=%ld\n",
msg->mtype, msg->state, msg->len, msg->payload - (unsigned char *) msg->header );