#include <string.h>
#include <rmr/rmr.h>
+extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone );
+
#define HDR_SIZE 64 // the size of the header we place into the message
#define MSG_SIZE 1024 // the size of the message that will be sent (hdr+payload)
int main( int argc, char** argv ) {
void* mrc; // msg router context
rmr_mbuf_t* msg = NULL; // message received
+ rmr_mbuf_t* omsg = NULL; // original message if cloning
int i;
int j;
int state;
char ack_data[DATA_SIZE]; // data randomly generated for each response
int need; // amount of something that we need
int sv; // checksum valu
+ int clone = 0;
+ int copy = 0;
+ char osrc[128]; // src strings to test when cloning
+ char nsrc[128];
+ int verbose = 0;
data = getenv( "RMR_RTG_SVC" );
if( data == NULL ) {
setenv( "RMR_RTG_SVC", "19289", 1 ); // set one that won't collide with the sender if on same host
}
- if( argc > 1 ) {
- listen_port = argv[1];
+ i = 1;
+ for( i=1; i < argc && argv[i] != NULL && *(argv[i]) == '-'; i++ ) {
+ data = argv[i];
+ switch( *(data+1) ) {
+ case 'c': // copy clone values as xy value: 00 01 10 11
+ if( i < argc-1 ) {
+ j = atoi( argv[i+1] );
+ clone = j % 10 != 0;
+ copy = (j/10) != 0;
+
+ i++;
+ }
+ break;
+
+ case 'p':
+ if( i < argc-1 ) {
+ listen_port = argv[i+1];
+ i++;
+ }
+ break;
+
+ case 'v':
+ verbose = 1;
+ break;
+ }
}
memset( count_bins, 0, sizeof( count_bins ) );
- fprintf( stderr, "<RCVR> listening on port: %s for a max of %d messages\n", listen_port, nmsgs );
+ fprintf( stderr, "<EXRCVR> listening on port: %s for a max of %d messages\n", listen_port, nmsgs );
+ fprintf( stderr, "<EXRCVR> copy=%d clone=%d\n", copy, clone );
#ifdef MTC
- fprintf( stderr, "<RCVR> starting in multi-threaded mode\n" );
+ fprintf( stderr, "<EXRCVR> starting in multi-threaded mode\n" );
mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_MTCALL ); // start RMr in mt-receive mode
#else
- fprintf( stderr, "<RCVR> starting in direct receive mode\n" );
+ fprintf( stderr, "<EXRCVR> starting in direct receive mode\n" );
mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE ); // start your engines!
#endif
if( mrc == NULL ) {
- fprintf( stderr, "<RCVR> ABORT: unable to initialise RMr\n" );
+ fprintf( stderr, "<EXRCVR> ABORT: unable to initialise RMr\n" );
exit( 1 );
}
timeout = time( NULL ) + 20;
while( ! rmr_ready( mrc ) ) { // wait for RMr to load a route table
- fprintf( stderr, "<RCVR> waiting for RMr to show ready\n" );
+ fprintf( stderr, "<EXRCVR> waiting for RMr to show ready\n" );
sleep( 1 );
if( time( NULL ) > timeout ) {
- fprintf( stderr, "<RCVR> giving up\n" );
+ fprintf( stderr, "<EXRCVR> giving up\n" );
exit( 1 );
}
}
- fprintf( stderr, "<RCVR> rmr now shows ready, listening begins\n" );
+ fprintf( stderr, "<EXRCVR> rmr now shows ready, listening begins\n" );
timeout = time( NULL ) + 20;
while( 1 ) {
count_bins[msg->mtype]++;
}
- need = generate_payload( ack_header, ack_data, 0, 0 ); // create an ack w/ random payload in payload, and set data in header
- if( rmr_payload_size( msg ) < need ) { // received message too small
+ if( clone ) {
+ need = rmr_payload_size( msg );
+ } else {
+ need = generate_payload( ack_header, ack_data, 0, 0 ); // create an ack w/ random payload in payload, and set data in header
+ }
+
+ if( clone || rmr_payload_size( msg ) < need ) { // received message too small or we want to clone the original for test
resized++;
- msg = rmr_realloc_payload( msg, need, 0, 0 ); // reallocate the message with a payload big enough
+ omsg = msg;
+ rmr_get_src( omsg, osrc );
+ msg = rmr_realloc_payload( msg, need, copy, clone ); // reallocate the message with a payload big enough or clone if set
+ rmr_get_src( msg, nsrc );
+
+ if( strcmp( osrc, nsrc ) != 0 ) {
+ fprintf( stderr, "[ERR] realloc source strings don't match (%s) new=(%s)\n", osrc, nsrc );
+ } else {
+ if( verbose ) {
+ fprintf( stderr, "[OK] realloc source strings match (%s) new=(%s)\n", osrc, nsrc );
+ }
+ }
+
+ if( copy ) {
+ if( memcmp( omsg->payload, msg->payload, rmr_payload_size( omsg ) ) != 0 ) {
+ fprintf( stderr, "[ERR] realloc payload contents don't match %d bytes\n", rmr_payload_size( omsg ) );
+ //spew( omsg->payload, rmr_payload_size( omsg ) );
+ //fprintf( stderr, "\n---\n\n" );
+ //spew( msg->payload, rmr_payload_size( omsg ) );
+ exit( 1 );
+ }
+ }
+
+ if( clone ) {
+ rmr_free_msg( omsg ); // we're done with the old msg, and must free b/c we cloned it
+ }
if( msg == NULL ) {
fprintf( stderr, "[ERR] realloc returned a nil pointer\n" );
continue;
}
}
- fill_payload( msg, ack_header, 0, ack_data, 0 ); // push headers (with default lengths) into message
+ if( ! clone ) {
+ fill_payload( msg, ack_header, 0, ack_data, 0 ); // push headers (with default lengths) into message
+ }
msg->mtype = 99;
msg->sub_id = -1;
msg->len = need;
-
msg = rmr_rts_msg( mrc, msg ); // return our ack message
rt_count = 1000;
while( rt_count > 0 && msg != NULL && msg->state == RMR_ERR_RETRY ) { // to work right in nano we need this :(
}
if( time( NULL ) > timeout ) {
- fprintf( stderr, "<RCVR> stopping, no recent messages received\n" );
+ fprintf( stderr, "<EXRCVR> stopping, no recent messages received\n" );
break;
} else {
if( time( NULL ) > rpt_timeout ) {
- fprintf( stderr, "<RCVR> %ld msgs=%ld good=%ld acked=%ld bad=%ld resized=%ld\n", (long) time( NULL ), count, good, ack_count, bad, resized );
+ fprintf( stderr, "<EXRCVR> %ld msgs=%ld good=%ld acked=%ld bad=%ld resized=%ld\n", (long) time( NULL ), count, good, ack_count, bad, resized );
rpt_timeout = time( NULL ) + 5;
}
strcat( wbuf, sbuf );
}
- fprintf( stderr, "<RCVR> mtype histogram: %s\n", wbuf );
- fprintf( stderr, "<RCVR> [%s] %ld messages; good=%ld acked=%ld bad=%ld resized=%ld bad-sub_id=%ld\n",
+ fprintf( stderr, "<EXRCVR> mtype histogram: %s\n", wbuf );
+ fprintf( stderr, "<EXRCVR> [%s] %ld messages; good=%ld acked=%ld bad=%ld resized=%ld bad-sub_id=%ld\n",
!!(errors + bad) ? "FAIL" : "PASS", count, good, ack_count, bad, resized, bad_sid );
sleep( 2 ); // let any outbound acks flow before closing
#define DATA_SIZE (HDR_SIZE-HDR_SIZE) // the actual 'data' length returned in the ack msg
#endif
-
void spew( char* buf, int len ) {
int i;
char wbuf[1024]; // slower, but buffer so that mult writers to the tty don't jumble (too much)
int good = 0;
if( buf == 0 && rmr_len <= 0 ) {
+ fprintf( stderr, "<TEST> validate msg: nil buffer or no len: %p len=%d\n", buf, rmr_len );
return 0;
}
if( ! isprint( *buf ) ) { // we expect the header to be a zero terminated string
fprintf( stderr, "<TEST> validate msg: header is not completely ASCII i=%d\n", i );
- spew( buf, 64 );
+ spew( buf, rmr_len > 64 ? 64 : rmr_len );
return 0;
}
}
ohdr = strdup( buf ); // must copy so we can trash
- search_start = buf;
+ search_start = ohdr;
for( i = 0; i < 3; i++ ) {
tok = strtok_r( search_start, " ", &tok_mark );
search_start = NULL;
if( ex_mlen != rmr_len ) {
fprintf( stderr, "[FAIL] received message length did not match hdr+data lengths, rmr_len=%d data follows:\n", rmr_len );
- if( ! isprint( ohdr ) ) {
- fprintf( stderr, "[CONT] header isn't printable\n" );
- spew( ohdr, 64 );
+ if( ! isprint( *ohdr ) ) {
+ fprintf( stderr, "<TEST> validate msg: header isn't printable\n" );
+ spew( ohdr, rmr_len > 64 ? 64 : rmr_len );
} else {
- fprintf( stderr, "[CONT] header: (%s)\n", ohdr );
- fprintf( stderr, "[CONT] computed length: %d (expected)\n", ex_mlen );
+ fprintf( stderr, "<TEST> validate msg: header: (%s)\n", ohdr );
+ fprintf( stderr, "<TEST> validate msg: computed length: %d (expected)\n", ex_mlen );
}
free( ohdr );
return 0;
sv = sum( tok, data_len ); // compute checksum of data portion
if( sv != ex_sv ) {
- fprintf( stderr, "[FAIL] data checksum mismatch, got %d, expected %d. header: %s\n", sv, ex_sv, ohdr );
+ fprintf( stderr, "<TEST> validate msg: data checksum mismatch, got %d, expected %d. header: %s\n", sv, ex_sv, ohdr );
free( ohdr );
return 0;
}
default:
if( successful ) {
fail_count++; // count failures after first successful message
+ } else {
+ fail_count++;
+ if( fail_count > 10 ) {
+ fprintf( stderr, "<VSEND> giving up\n" );
+ exit( 1 );
+ }
}
// some error (not connected likely), don't count this
//sleep( 1 );
errno = 0;
rbuf = rmr_rcv_msg( mrc, rbuf );
if( rbuf && rbuf->state == RMR_OK ) {
- if( rmr_payload_size( rbuf ) > HDR_SIZE+DATA_SIZE ) { // verify that response has a larger payload than we should have sent
+ if( rmr_payload_size( rbuf ) >= HDR_SIZE+DATA_SIZE ) { // vet message
rts_ok += validate_msg( rbuf->payload, rbuf->len );
} else {
+ fprintf( stderr, "<VSNDR> received short response: >%d expected, got %d\n", HDR_SIZE+DATA_SIZE, rmr_payload_size( rbuf ) );
short_count++;
}
+
rcvd_count++;
}
}
rbuf = rmr_rcv_msg( mrc, rbuf );
if( rbuf && rbuf->state == RMR_OK ) {
rcvd_count++;
- if( rmr_payload_size( rbuf ) > HDR_SIZE+DATA_SIZE ) { // verify that response has a larger payload than we should have sent
+ if( rmr_payload_size( rbuf ) >= HDR_SIZE+DATA_SIZE ) { // vet message
rts_ok += validate_msg( rbuf->payload, rbuf->len );
}
}
}
- if( rcvd_count != rts_ok || count != nmsgs ) { // we might not receive all back if receiver didn't retry, so that is NOT a failure here
- fprintf( stderr, "<VSNDR> rcvd=%d rts_ok=%d count=%d nmsg=%d\n", rcvd_count, rts_ok, count, nmsgs );
+ if( (rcvd_count != rts_ok) || (count != nmsgs) ) { // we might not receive all back if receiver didn't retry, so that is NOT a failure here
+ fprintf( stderr, "<VSNDR> recvd=%d rts_ok=%d short=%d count=%d nmsg=%d\n", rcvd_count, rts_ok, short_count, count, nmsgs );
pass = 0;
}