API and build change and fix summaries. Doc correctsions
and/or changes are not mentioned here; see the commit messages.
+2020 February 20; version 3.2.2
+ Fix receive thread related core dump (ring early unlock).
+
2020 February 19; version 3.2.1
Added missing message types (E2-Setup)
set( major_version "3" ) # should be automatically populated from git tag later, but until CI process sets a tag we use this
set( minor_version "2" )
-set( patch_level "1" )
+set( patch_level "2" )
set( install_root "${CMAKE_INSTALL_PREFIX}" )
set( install_inc "include/rmr" )
void* ring; // ring this buffer should be queued back to
int rts_fd; // SI fd for return to sender
+
+ int cookie; // cookie to detect user misuse of free'd msg
} rmr_mbuf_t;
ring_t* r;
uint16_t ti; // real index in data
int64_t ctr; // pfd counter
+ void* data;
if( !RING_FAST ) { // compiler should drop the conditional when always false
if( (r = (ring_t*) vr) == NULL ) {
}
*/
+ data = r->data[ti]; // secure data and clear before letting go of the lock
+ r->data[ti] = NULL;
+
if( r->rgate != NULL ) { // if locked above...
pthread_mutex_unlock( r->rgate );
}
- return r->data[ti];
+
+ return data;
}
/*
return 0;
}
+ r->data[r->head] = new_data;
+ r->head++;
+ if( r->head >= r->nelements ) {
+ r->head = 0;
+ }
+
write( r->pfd, &inc, sizeof( inc ) );
/*
future -- investigate if it's possible only to set/clear when empty or going to empty
}
*/
- r->data[r->head] = new_data;
- r->head++;
- if( r->head >= r->nelements ) {
- r->head = 0;
- }
-
if( r->wgate != NULL ) { // if lock exists we must unlock before going
pthread_mutex_unlock( r->wgate );
}
if( river->state == RS_NEW ) {
memset( river, 0, sizeof( *river ) );
//river->nbytes = sizeof( char ) * (8 * 1024);
- river->nbytes = sizeof( char ) * ctx->max_ibm; // max inbound message size
+ river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024); // max inbound message size
river->accum = (char *) malloc( river->nbytes );
river->ipt = 0;
} else {
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback enough in the buffer size=%d need=%d remain=%d\n", river->msg_size, need, remain );
if( (river->flags & RF_DROP) == 0 ) {
memcpy( &river->accum[river->ipt], buf+bidx, need ); // grab just what is needed (might be more)
- buf2mbuf( ctx, river->accum, river->msg_size, fd ); // build an RMR mbuf and queue
+ buf2mbuf( ctx, river->accum, river->nbytes, fd ); // build an RMR mbuf and queue
river->accum = (char *) malloc( sizeof( char ) * river->nbytes ); // fresh accumulator
} else {
if( !(river->flags & RF_NOTIFIED) ) {
if( !mbuf->ring || ! uta_ring_insert( mbuf->ring, mbuf ) ) { // just queue, free if ring is full
if( mbuf->tp_buf ) {
free( mbuf->tp_buf );
+ mbuf->tp_buf = NULL; // just in case user tries to reuse this mbuf; this will be an NPE
}
+
+ mbuf->cookie = 0; // should signal a bad mbuf (if not reallocated)
free( mbuf );
}
}
rmr_set_vlevel( RMR_VL_INFO ); // we WILL announce our version etc
if( ! announced ) {
- rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/e mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
+ rmr_vlog( RMR_VL_INFO, "ric message routing library on SI95/f mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
announced = 1;
}
ctx->send_retries = 1; // default is not to sleep at all; RMr will retry about 10K times before returning
ctx->d1_len = 4; // data1 space in header -- 4 bytes for now
ctx->max_ibm = max_msg_size < 1024 ? 1024 : max_msg_size; // larger than their request doesn't hurt
- ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + 64; // add in our header size and a bit of fudge
+ ctx->max_ibm += sizeof( uta_mhdr_t ) + ctx->d1_len + ctx->d2_len + TP_HDR_LEN + 64; // add in header size, transport hdr, and a bit of fudge
ctx->mring = uta_mk_ring( 4096 ); // message ring is always on for si
ctx->zcb_mring = uta_mk_ring( 128 ); // zero copy buffer mbuf ring to reduce malloc/free calls
/*
memset( msg->tp_buf, 0, mlen ); // NOT for production (debug only) valgrind will complain about uninitalised use if we don't set
- memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 ); // NOT for production -- debugging eyecatcher
+ memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", TPHDR_LEN ); // NOT for production -- debugging eyecatcher
*/
alen = (int *) msg->tp_buf;
*alen = mlen; // FIX ME: need a stuct to go in these first bytes, not just dummy len
//SET_HDR_D2_LEN( hdr, ctx->d2_len ); // future
}
msg->len = 0; // length of data in the payload
+ msg->cookie = 0x4942;
msg->alloc_len = mlen; // length of allocated transport buffer (caller size + rmr header)
msg->sub_id = UNSET_SUBID;
msg->mtype = UNSET_MSGTYPE;
memset( msg, 0, sizeof( *msg ) );
+ msg->cookie = 0x4942;
msg->sub_id = UNSET_SUBID;
msg->mtype = UNSET_MSGTYPE;
msg->tp_buf = NULL;
msg->state = RMR_OK;
do {
tot_len = msg->len + PAYLOAD_OFFSET( hdr ) + TP_HDR_LEN; // we only send what was used + header lengths
+ if( tot_len > msg->alloc_len ) {
+ tot_len = msg->alloc_len; // likely bad length from user :(
+ }
*((int*) msg->tp_buf) = tot_len;
if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg: ending %d (%x) bytes usr_len=%d alloc=%d retries=%d\n", tot_len, tot_len, msg->len, msg->alloc_len, retries );