#include <string.h>
#include <rmr/rmr.h>
+extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone );
+
#define HDR_SIZE 64 // the size of the header we place into the message
#define MSG_SIZE 1024 // the size of the message that will be sent (hdr+payload)
int main( int argc, char** argv ) {
void* mrc; // msg router context
rmr_mbuf_t* msg = NULL; // message received
+ rmr_mbuf_t* omsg = NULL; // original message if cloning
int i;
int j;
int state;
char ack_data[DATA_SIZE]; // data randomly generated for each response
int need; // amount of something that we need
int sv; // checksum valu
+ int clone = 0;
+ int copy = 0;
+ char osrc[128]; // src strings to test when cloning
+ char nsrc[128];
+ int verbose = 0;
data = getenv( "RMR_RTG_SVC" );
if( data == NULL ) {
setenv( "RMR_RTG_SVC", "19289", 1 ); // set one that won't collide with the sender if on same host
}
- if( argc > 1 ) {
- listen_port = argv[1];
+ i = 1;
+ for( i=1; i < argc && argv[i] != NULL && *(argv[i]) == '-'; i++ ) {
+ data = argv[i];
+ switch( *(data+1) ) {
+ case 'c': // copy clone values as xy value: 00 01 10 11
+ if( i < argc-1 ) {
+ j = atoi( argv[i+1] );
+ clone = j % 10 != 0;
+ copy = (j/10) != 0;
+
+ i++;
+ }
+ break;
+
+ case 'p':
+ if( i < argc-1 ) {
+ listen_port = argv[i+1];
+ i++;
+ }
+ break;
+
+ case 'v':
+ verbose = 1;
+ break;
+ }
}
memset( count_bins, 0, sizeof( count_bins ) );
- fprintf( stderr, "<RCVR> listening on port: %s for a max of %d messages\n", listen_port, nmsgs );
+ fprintf( stderr, "<EXRCVR> listening on port: %s for a max of %d messages\n", listen_port, nmsgs );
+ fprintf( stderr, "<EXRCVR> copy=%d clone=%d\n", copy, clone );
#ifdef MTC
- fprintf( stderr, "<RCVR> starting in multi-threaded mode\n" );
+ fprintf( stderr, "<EXRCVR> starting in multi-threaded mode\n" );
mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_MTCALL ); // start RMr in mt-receive mode
#else
- fprintf( stderr, "<RCVR> starting in direct receive mode\n" );
+ fprintf( stderr, "<EXRCVR> starting in direct receive mode\n" );
mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE ); // start your engines!
#endif
if( mrc == NULL ) {
- fprintf( stderr, "<RCVR> ABORT: unable to initialise RMr\n" );
+ fprintf( stderr, "<EXRCVR> ABORT: unable to initialise RMr\n" );
exit( 1 );
}
timeout = time( NULL ) + 20;
while( ! rmr_ready( mrc ) ) { // wait for RMr to load a route table
- fprintf( stderr, "<RCVR> waiting for RMr to show ready\n" );
+ fprintf( stderr, "<EXRCVR> waiting for RMr to show ready\n" );
sleep( 1 );
if( time( NULL ) > timeout ) {
- fprintf( stderr, "<RCVR> giving up\n" );
+ fprintf( stderr, "<EXRCVR> giving up\n" );
exit( 1 );
}
}
- fprintf( stderr, "<RCVR> rmr now shows ready, listening begins\n" );
+ fprintf( stderr, "<EXRCVR> rmr now shows ready, listening begins\n" );
timeout = time( NULL ) + 20;
while( 1 ) {
count_bins[msg->mtype]++;
}
- need = generate_payload( ack_header, ack_data, 0, 0 ); // create an ack w/ random payload in payload, and set data in header
- if( rmr_payload_size( msg ) < need ) { // received message too small
+ if( clone ) {
+ need = rmr_payload_size( msg );
+ } else {
+ need = generate_payload( ack_header, ack_data, 0, 0 ); // create an ack w/ random payload in payload, and set data in header
+ }
+
+ if( clone || rmr_payload_size( msg ) < need ) { // received message too small or we want to clone the original for test
resized++;
- msg = rmr_realloc_payload( msg, need, 0, 0 ); // reallocate the message with a payload big enough
+ omsg = msg;
+ rmr_get_src( omsg, osrc );
+ msg = rmr_realloc_payload( msg, need, copy, clone ); // reallocate the message with a payload big enough or clone if set
+ rmr_get_src( msg, nsrc );
+
+ if( strcmp( osrc, nsrc ) != 0 ) {
+ fprintf( stderr, "[ERR] realloc source strings don't match (%s) new=(%s)\n", osrc, nsrc );
+ } else {
+ if( verbose ) {
+ fprintf( stderr, "[OK] realloc source strings match (%s) new=(%s)\n", osrc, nsrc );
+ }
+ }
+
+ if( copy ) {
+ if( memcmp( omsg->payload, msg->payload, rmr_payload_size( omsg ) ) != 0 ) {
+ fprintf( stderr, "[ERR] realloc payload contents don't match %d bytes\n", rmr_payload_size( omsg ) );
+ //spew( omsg->payload, rmr_payload_size( omsg ) );
+ //fprintf( stderr, "\n---\n\n" );
+ //spew( msg->payload, rmr_payload_size( omsg ) );
+ exit( 1 );
+ }
+ }
+
+ if( clone ) {
+ rmr_free_msg( omsg ); // we're done with the old msg, and must free b/c we cloned it
+ }
if( msg == NULL ) {
fprintf( stderr, "[ERR] realloc returned a nil pointer\n" );
continue;
}
}
- fill_payload( msg, ack_header, 0, ack_data, 0 ); // push headers (with default lengths) into message
+ if( ! clone ) {
+ fill_payload( msg, ack_header, 0, ack_data, 0 ); // push headers (with default lengths) into message
+ }
msg->mtype = 99;
msg->sub_id = -1;
msg->len = need;
-
msg = rmr_rts_msg( mrc, msg ); // return our ack message
rt_count = 1000;
while( rt_count > 0 && msg != NULL && msg->state == RMR_ERR_RETRY ) { // to work right in nano we need this :(
if( ack_count < 1 ) { // 1st ack, so we need to connect, and we'll wait for that
sleep( 1 );
}
+ if( rt_count > 5 ) { // but only pause for max 5sec not 1000s!
+ rt_count = 5;
+ }
rt_count--;
msg = rmr_rts_msg( mrc, msg ); // we don't try to resend if this returns retry
}
}
if( time( NULL ) > timeout ) {
- fprintf( stderr, "<RCVR> stopping, no recent messages received\n" );
+ fprintf( stderr, "<EXRCVR> stopping, no recent messages received\n" );
break;
} else {
if( time( NULL ) > rpt_timeout ) {
- fprintf( stderr, "<RCVR> %ld msgs=%ld good=%ld acked=%ld bad=%ld resized=%ld\n", (long) time( NULL ), count, good, ack_count, bad, resized );
+ fprintf( stderr, "<EXRCVR> %ld msgs=%ld good=%ld acked=%ld bad=%ld resized=%ld\n", (long) time( NULL ), count, good, ack_count, bad, resized );
rpt_timeout = time( NULL ) + 5;
}
strcat( wbuf, sbuf );
}
- fprintf( stderr, "<RCVR> mtype histogram: %s\n", wbuf );
- fprintf( stderr, "<RCVR> [%s] %ld messages; good=%ld acked=%ld bad=%ld resized=%ld bad-sub_id=%ld\n",
+ fprintf( stderr, "<EXRCVR> mtype histogram: %s\n", wbuf );
+ fprintf( stderr, "<EXRCVR> [%s] %ld messages; good=%ld acked=%ld bad=%ld resized=%ld bad-sub_id=%ld\n",
!!(errors + bad) ? "FAIL" : "PASS", count, good, ack_count, bad, resized, bad_sid );
sleep( 2 ); // let any outbound acks flow before closing