default: // current message always caught here
hdr = nm->header;
- memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) + RMR_TR_LEN( old_msg->header ) + RMR_D1_LEN( old_msg->header ) + RMR_D2_LEN( old_msg->header )); // copy complete header, trace and other data
+ memcpy( hdr, old_msg->header, RMR_HDR_LEN( old_msg->header ) ); // copy complete header, trace and other data
nm->payload = PAYLOAD_ADDR( hdr ); // at user payload
break;
}
nm->sub_id = old_msg->sub_id;
nm->len = old_msg->len; // length of data in the payload
nm->alloc_len = mlen; // length of allocated payload
+ if( DEBUG ) fprintf( stderr, "[DBUG] clone values: mty=%d sid=%d len=%d alloc=%d\n", nm->mtype, nm->sub_id, nm->len, nm->alloc_len );
nm->xaction = hdr->xid; // reference xaction
nm->state = old_msg->state; // fill in caller's state (likely the state of the last operation)
return nm;
}
+/*
+ Realloc the message such that the payload is at least payload_len bytes. If the current
+ payload size is large enough, no action is taken. If copy is false, the actual payload
+ bytes are NOT copied. This allows a caller to realloc for a response message (to retain
+ the source information which would be lost on a simple alloc) which has no need for the
+ original message.
+
+ The old message buffer will reference the new underlying transport, and the original payload
+ will be lost unless clone is set to true. If clone is true, the old message buffer will continue
+ to reference the original payload, and a new message buffer will be allocated (even if the
+ payload size in the old message was larger than requested).
+
+ The return value is a pointer to the message with at least payload_len bytes allocated. It
+ will be the same as the old_message if clone is false.
+
+ CAUTION:
+ If the message is not a message which was received, the mtype, sub-id, length values in the
+ RMR header in the allocated transport buffer will NOT be accurate and will cause the resulting
+ mbuffer information for mtype and subid to be reset even when copy is true. To avoid silently
+ resetting information in the mbuffer, this funciton will reset the mbuf values from the current
+ settings and NOT from the copied RMR header in transport buffer.
+*/
+static inline rmr_mbuf_t* realloc_payload( rmr_mbuf_t* old_msg, int payload_len, int copy, int clone ) {
+ rmr_mbuf_t* nm = NULL; // new message buffer when cloning
+ size_t mlen;
+ int state;
+ uta_mhdr_t* omhdr; // old message header
+ uta_v1mhdr_t* v1hdr;
+ int tr_old_len; // tr size in new buffer
+ int old_psize = 0; // current size of message for payload
+ int hdr_len = 0; // length of RMR header in old msg
+ void* old_tp_buf; // pointer to the old tp buffer
+ int free_tp = 1; // free the transport buffer (old) when done (when not cloning)
+ int old_mt; // msg type and sub-id from the message passed in
+ int old_sid;
+ int old_len;
+
+ if( old_msg == NULL || payload_len <= 0 ) {
+ errno = EINVAL;
+ return NULL;
+ }
+
+ old_mt = old_msg->mtype;
+ old_sid = old_msg->sub_id;
+ old_len = old_msg->len;
+ old_psize = old_msg->alloc_len - RMR_HDR_LEN( old_msg->header ); // allocated transport size less the header and other data bits
+ if( !clone && payload_len <= old_psize ) { // old message is large enough, nothing to do
+ if( DEBUG ) fprintf( stderr, "[DBUG] rmr_realloc_payload: old msg payload larger than requested: cur=%d need=%d\n", old_psize, payload_len );
+ return old_msg;
+ }
+
+ hdr_len = RMR_HDR_LEN( old_msg->header );
+ old_tp_buf = old_msg->tp_buf;
+
+ if( clone ) {
+ if( DEBUG ) fprintf( stderr, "[DBUG] rmr_realloc_payload: cloning message\n" );
+ free_tp = 0;
+
+ nm = (rmr_mbuf_t *) malloc( sizeof( *nm ) );
+ if( nm == NULL ) {
+ fprintf( stderr, "[CRI] rmr_realloc_payload: cannot get memory for message buffer. bytes requested: %d\n", (int) sizeof(*nm) );
+ return NULL;
+ }
+ memset( nm, 0, sizeof( *nm ) );
+ } else {
+ nm = old_msg;
+ }
+
+ omhdr = old_msg->header;
+ mlen = hdr_len + (payload_len > old_psize ? payload_len : old_psize); // must have larger in case copy is true
+
+ if( DEBUG ) fprintf( stderr, "[DBUG] reallocate for payload increase. new message size: %d\n", (int) mlen );
+ if( (state = nng_msg_alloc( (nng_msg **) &nm->tp_buf, mlen )) != 0 ) {
+ fprintf( stderr, "[CRI] rmr_realloc_payload: cannot get memory for zero copy buffer. bytes requested: %d\n", (int) mlen );
+ return NULL;
+ }
+
+ nm->header = nng_msg_body( nm->tp_buf ); // set and copy the header from old message
+ SET_HDR_LEN( nm->header );
+
+ if( copy ) { // if we need to copy the old payload too
+ if( DEBUG ) fprintf( stderr, "[DBUG] rmr_realloc_payload: copy payload into new message: %d bytes\n", old_psize );
+ memcpy( nm->header, omhdr, sizeof( char ) * (old_psize + RMR_HDR_LEN( omhdr )) );
+ } else { // just need to copy header
+ if( DEBUG ) fprintf( stderr, "[DBUG] rmr_realloc_payload: copy only header into new message: %d bytes\n", RMR_HDR_LEN( nm->header ) );
+ memcpy( nm->header, omhdr, sizeof( char ) * RMR_HDR_LEN( omhdr ) );
+ }
+
+ ref_tpbuf( nm, mlen ); // set payload and other pointers in the message to the new tp buffer
+
+ if( !copy ) {
+ nm->mtype = -1; // didn't copy payload, so mtype and sub-id are invalid
+ nm->sub_id = -1;
+ nm->len = 0; // and len is 0
+ } else {
+ nm->len = old_len; // we must force these to avoid losing info if msg wasn't a received message
+ nm->mtype = old_mt;
+ nm->sub_id = old_sid;
+ }
+
+ if( free_tp ) {
+ free( old_tp_buf ); // we did not clone, so free b/c no references
+ }
+
+ return nm;
+}
+
/*
This is the receive work horse used by the outer layer receive functions.
It waits for a message to be received on our listen socket. If old msg