X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=test%2Fapp_test%2Fex_rts_receiver.c;h=e2a73e68aa57a27ac569e44c4760cd51661ccaf9;hb=338fe5e46223df8145b53cdf52dab358f18c7f26;hp=14ae26c78fef5b396434f7ffec73086b6fb41535;hpb=d9de79acd9c205dc4f795e90a98331628ed6c85b;p=ric-plt%2Flib%2Frmr.git diff --git a/test/app_test/ex_rts_receiver.c b/test/app_test/ex_rts_receiver.c index 14ae26c..e2a73e6 100644 --- a/test/app_test/ex_rts_receiver.c +++ b/test/app_test/ex_rts_receiver.c @@ -54,6 +54,8 @@ #include #include +extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ); + #define HDR_SIZE 64 // the size of the header we place into the message #define MSG_SIZE 1024 // the size of the message that will be sent (hdr+payload) @@ -69,6 +71,7 @@ int main( int argc, char** argv ) { void* mrc; // msg router context rmr_mbuf_t* msg = NULL; // message received + rmr_mbuf_t* omsg = NULL; // original message if cloning int i; int j; int state; @@ -92,43 +95,72 @@ int main( int argc, char** argv ) { char ack_data[DATA_SIZE]; // data randomly generated for each response int need; // amount of something that we need int sv; // checksum valu + int clone = 0; + int copy = 0; + char osrc[128]; // src strings to test when cloning + char nsrc[128]; + int verbose = 0; data = getenv( "RMR_RTG_SVC" ); if( data == NULL ) { setenv( "RMR_RTG_SVC", "19289", 1 ); // set one that won't collide with the sender if on same host } - if( argc > 1 ) { - listen_port = argv[1]; + i = 1; + for( i=1; i < argc && argv[i] != NULL && *(argv[i]) == '-'; i++ ) { + data = argv[i]; + switch( *(data+1) ) { + case 'c': // copy clone values as xy value: 00 01 10 11 + if( i < argc-1 ) { + j = atoi( argv[i+1] ); + clone = j % 10 != 0; + copy = (j/10) != 0; + + i++; + } + break; + + case 'p': + if( i < argc-1 ) { + listen_port = argv[i+1]; + i++; + } + break; + + case 'v': + verbose = 1; + break; + } } memset( count_bins, 0, sizeof( count_bins ) ); - fprintf( stderr, " listening on port: %s for a max of %d messages\n", listen_port, nmsgs ); + fprintf( stderr, " listening on port: %s for a max of %d messages\n", listen_port, nmsgs ); + fprintf( stderr, " copy=%d clone=%d\n", copy, clone ); #ifdef MTC - fprintf( stderr, " starting in multi-threaded mode\n" ); + fprintf( stderr, " starting in multi-threaded mode\n" ); mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_MTCALL ); // start RMr in mt-receive mode #else - fprintf( stderr, " starting in direct receive mode\n" ); + fprintf( stderr, " starting in direct receive mode\n" ); mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE ); // start your engines! #endif if( mrc == NULL ) { - fprintf( stderr, " ABORT: unable to initialise RMr\n" ); + fprintf( stderr, " ABORT: unable to initialise RMr\n" ); exit( 1 ); } timeout = time( NULL ) + 20; while( ! rmr_ready( mrc ) ) { // wait for RMr to load a route table - fprintf( stderr, " waiting for RMr to show ready\n" ); + fprintf( stderr, " waiting for RMr to show ready\n" ); sleep( 1 ); if( time( NULL ) > timeout ) { - fprintf( stderr, " giving up\n" ); + fprintf( stderr, " giving up\n" ); exit( 1 ); } } - fprintf( stderr, " rmr now shows ready, listening begins\n" ); + fprintf( stderr, " rmr now shows ready, listening begins\n" ); timeout = time( NULL ) + 20; while( 1 ) { @@ -147,27 +179,61 @@ int main( int argc, char** argv ) { count_bins[msg->mtype]++; } - need = generate_payload( ack_header, ack_data, 0, 0 ); // create an ack w/ random payload in payload, and set data in header - if( rmr_payload_size( msg ) < need ) { // received message too small + if( clone ) { + need = rmr_payload_size( msg ); + } else { + need = generate_payload( ack_header, ack_data, 0, 0 ); // create an ack w/ random payload in payload, and set data in header + } + + if( clone || rmr_payload_size( msg ) < need ) { // received message too small or we want to clone the original for test resized++; - msg = rmr_realloc_payload( msg, need, 0, 0 ); // reallocate the message with a payload big enough + omsg = msg; + rmr_get_src( omsg, osrc ); + msg = rmr_realloc_payload( msg, need, copy, clone ); // reallocate the message with a payload big enough or clone if set + rmr_get_src( msg, nsrc ); + + if( strcmp( osrc, nsrc ) != 0 ) { + fprintf( stderr, "[ERR] realloc source strings don't match (%s) new=(%s)\n", osrc, nsrc ); + } else { + if( verbose ) { + fprintf( stderr, "[OK] realloc source strings match (%s) new=(%s)\n", osrc, nsrc ); + } + } + + if( copy ) { + if( memcmp( omsg->payload, msg->payload, rmr_payload_size( omsg ) ) != 0 ) { + fprintf( stderr, "[ERR] realloc payload contents don't match %d bytes\n", rmr_payload_size( omsg ) ); + //spew( omsg->payload, rmr_payload_size( omsg ) ); + //fprintf( stderr, "\n---\n\n" ); + //spew( msg->payload, rmr_payload_size( omsg ) ); + exit( 1 ); + } + } + + if( clone ) { + rmr_free_msg( omsg ); // we're done with the old msg, and must free b/c we cloned it + } if( msg == NULL ) { fprintf( stderr, "[ERR] realloc returned a nil pointer\n" ); continue; } } - fill_payload( msg, ack_header, 0, ack_data, 0 ); // push headers (with default lengths) into message + if( ! clone ) { + fill_payload( msg, ack_header, 0, ack_data, 0 ); // push headers (with default lengths) into message + } msg->mtype = 99; msg->sub_id = -1; msg->len = need; - msg = rmr_rts_msg( mrc, msg ); // return our ack message rt_count = 1000; while( rt_count > 0 && msg != NULL && msg->state == RMR_ERR_RETRY ) { // to work right in nano we need this :( if( ack_count < 1 ) { // 1st ack, so we need to connect, and we'll wait for that sleep( 1 ); } + if( rt_count > 5 ) { // but only pause for max 5sec not 1000s! + rt_count = 5; + } rt_count--; msg = rmr_rts_msg( mrc, msg ); // we don't try to resend if this returns retry } @@ -180,11 +246,11 @@ int main( int argc, char** argv ) { } if( time( NULL ) > timeout ) { - fprintf( stderr, " stopping, no recent messages received\n" ); + fprintf( stderr, " stopping, no recent messages received\n" ); break; } else { if( time( NULL ) > rpt_timeout ) { - fprintf( stderr, " %ld msgs=%ld good=%ld acked=%ld bad=%ld resized=%ld\n", (long) time( NULL ), count, good, ack_count, bad, resized ); + fprintf( stderr, " %ld msgs=%ld good=%ld acked=%ld bad=%ld resized=%ld\n", (long) time( NULL ), count, good, ack_count, bad, resized ); rpt_timeout = time( NULL ) + 5; } @@ -197,8 +263,8 @@ int main( int argc, char** argv ) { strcat( wbuf, sbuf ); } - fprintf( stderr, " mtype histogram: %s\n", wbuf ); - fprintf( stderr, " [%s] %ld messages; good=%ld acked=%ld bad=%ld resized=%ld bad-sub_id=%ld\n", + fprintf( stderr, " mtype histogram: %s\n", wbuf ); + fprintf( stderr, " [%s] %ld messages; good=%ld acked=%ld bad=%ld resized=%ld bad-sub_id=%ld\n", !!(errors + bad) ? "FAIL" : "PASS", count, good, ack_count, bad, resized, bad_sid ); sleep( 2 ); // let any outbound acks flow before closing