#include <rmr/rmr.h>
#define TRACE_SIZE 40 // bytes in header to provide for trace junk
+#define WBUF_SIZE 2048
/*
Thread data
int drops = 0;
int fail_count = 0; // # of failure sends after first successful send
int successful = 0; // set to true after we have a successful send
- char wbuf[1024];
+ char* wbuf = NULL;
char xbuf[1024]; // build transaction string here
char trace[1024];
int xaction_id = 1;
char* tok;
int state = 0;
+ wbuf = (char *) malloc( sizeof( char ) * WBUF_SIZE );
+
if( (control = (tdata_t *) data) == NULL ) {
fprintf( stderr, "thread data was nil; bailing out\n" );
}
sbuf = rmr_alloc_msg( control->mrc, 512 ); // alloc first send buffer; subsequent buffers allcoated on send
- memset( trace, 0, sizeof( trace ) );
+ memset( trace, 0, sizeof( trace ) );
while( count < control->n2send ) { // we send n messages after the first message is successful
snprintf( trace, 100, "%lld", (long long) time( NULL ) );
rmr_set_trace( sbuf, trace, TRACE_SIZE ); // fully populate so we dont cause a buffer realloc
- snprintf( wbuf, 200, "count=%d tr=%s %d stand up and cheer! @ %d", count, trace, rand(), control->id );
+ snprintf( wbuf, WBUF_SIZE, "count=%d tr=%s %d stand up and cheer! @ %d", count, trace, rand(), control->id );
snprintf( sbuf->payload, 300, "%d %d|%s", sum( wbuf ), sum( trace ), wbuf );
snprintf( xbuf, 200, "%31d", xaction_id );
rmr_bytes2xact( sbuf, xbuf, 32 );
rt_count++;
}
while( sbuf != NULL && sbuf->state == RMR_ERR_RETRY ) { // send blocked; keep trying
- sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 100 ); // call and wait up to 100ms for a response
+ sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 5000 ); // call and wait up to 5s for a response
}
if( sbuf != NULL ) {
break;
default:
- fprintf( stderr, "unexpected error: tid=%d rmr-state=%d ernro=%d\n", control->id, sbuf->state, errno );
+ fprintf( stderr, "<CALLR> unexpected error: tid=%d rmr-state=%d ernro=%d\n", control->id, sbuf->state, errno );
sbuf = rmr_alloc_msg( control->mrc, 512 ); // allocate a sendable buffer
if( successful ) {
fail_count++; // count failures after first successful message
}
control->state = -state; // signal inactive to main thread; -1 == pass, 0 == fail
- fprintf( stderr, "<THRD> [%s] tid=%-2d sent=%d ok-acks=%d bad-acks=%d drops=%d failures=%d retries=%d\n",
+ fprintf( stderr, "<THRD> [%s] tid=%-2d sent=%d ok-acks=%d bad-acks=%d drops=%d failures=%d retries=%d\n",
state ? "PASS" : "FAIL", control->id, count, ok_msg, bad_msg, drops, fail_count, rt_count );
pt_info = malloc( sizeof( pthread_t ) * nthreads );
if( cvs == NULL ) {
fprintf( stderr, "<CALL> unable to allocate control vector\n" );
- exit( 1 );
+ exit( 1 );
}