X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=test%2Fapp_test%2Fsender.c;h=994c59140f43359f4d788598b717a04920a156c2;hb=fd4477a9698a46ce5755d614b663c18ceadf43c4;hp=ac7d869fc73c432a19ad847ac841a53e21fc9bb4;hpb=a41c6f5f26b3a44009f4aff3df3f83b9a79ace01;p=ric-plt%2Flib%2Frmr.git diff --git a/test/app_test/sender.c b/test/app_test/sender.c index ac7d869..994c591 100644 --- a/test/app_test/sender.c +++ b/test/app_test/sender.c @@ -68,6 +68,9 @@ #include +#define WBUF_SIZE 1024 +#define TRACE_SIZE 1024 + static int sum( char* str ) { int sum = 0; int i = 0; @@ -79,6 +82,20 @@ static int sum( char* str ) { return sum % 255; } +/* + See if my id string is in the buffer immediately after the first >. + Return 1 if so, 0 if not. +*/ +static int vet_received( char* me, char* buf ) { + char* ch; + + if( (ch = strchr( buf, '>' )) == NULL ) { + return 0; + } + + return strcmp( me, ch+1 ) == 0; +} + int main( int argc, char** argv ) { void* mrc; // msg router context struct epoll_event events[1]; // list of events to give to epoll @@ -88,20 +105,28 @@ int main( int argc, char** argv ) { int nready; // number of events ready for receive rmr_mbuf_t* sbuf; // send buffer rmr_mbuf_t* rbuf; // received buffer + char* ch; int count = 0; int rt_count = 0; // number of messages requiring a spin retry int rcvd_count = 0; + int rts_ok = 0; // number received with our tag int fail_count = 0; // # of failure sends after first successful send char* listen_port = "43086"; int mtype = 0; int stats_freq = 100; int successful = 0; // set to true after we have a successful send - char wbuf[1024]; - char trace[1024]; + char* wbuf = NULL; // working buffer + char me[128]; // who I am to vet rts was actually from me + char* trace = NULL; // area to build trace data in long timeout = 0; int delay = 100000; // usec between send attempts int nmsgs = 10; // number of messages to send int max_mt = 10; // reset point for message type + int start_mt = 0; + int pass = 1; + + wbuf = (char *) malloc( sizeof( char ) * WBUF_SIZE ); + trace = (char *) malloc( sizeof( char ) * TRACE_SIZE ); if( argc > 1 ) { nmsgs = atoi( argv[1] ); @@ -110,12 +135,19 @@ int main( int argc, char** argv ) { delay = atoi( argv[2] ); } if( argc > 3 ) { - max_mt = atoi( argv[3] ); + if( (ch = strchr( argv[3], ':' )) != NULL ) { + max_mt = atoi( ch+1 ); + start_mt = atoi( argv[3] ); + } else { + max_mt = atoi( argv[3] ); + } } if( argc > 4 ) { listen_port = argv[4]; } + mtype = start_mt; + fprintf( stderr, " listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay ); if( (mrc = rmr_init( listen_port, 1400, RMRFL_NONE )) == NULL ) { @@ -143,8 +175,8 @@ int main( int argc, char** argv ) { rmr_set_rtimeout( mrc, 0 ); // for nano we must set the receive timeout to 0; non-blocking receive } - sbuf = rmr_alloc_msg( mrc, 512 ); // alloc first send buffer; subsequent buffers allcoated on send - //sbuf = rmr_tralloc_msg( mrc, 512, 11, "xxxxxxxxxx" ); // alloc first send buffer; subsequent buffers allcoated on send + sbuf = rmr_alloc_msg( mrc, 1024 ); // alloc first send buffer; subsequent buffers allcoated on send + //sbuf = rmr_tralloc_msg( mrc, 1024, 11, "xxxxxxxxxx" ); // alloc first send buffer; subsequent buffers allcoated on send rbuf = NULL; // don't need to alloc receive buffer timeout = time( NULL ) + 20; // give rmr 20s to find the route table (shouldn't need that much) @@ -161,11 +193,14 @@ int main( int argc, char** argv ) { timeout = time( NULL ) + 20; + gethostname( wbuf, WBUF_SIZE ); + snprintf( me, sizeof( me ), "%s-%d", wbuf, getpid( ) ); + while( count < nmsgs ) { // we send n messages after the first message is successful snprintf( trace, 100, "%lld", (long long) time( NULL ) ); rmr_set_trace( sbuf, trace, strlen( trace ) + 1 ); - snprintf( wbuf, 200, "count=%d tr=%s %d stand up and cheer!", count, trace, rand() ); - snprintf( sbuf->payload, 300, "%d %d|%s", sum( wbuf ), sum( trace ), wbuf ); + snprintf( wbuf, 512, "count=%d tr=%s %d stand up and cheer!>%s", count, trace, rand(), me ); + snprintf( sbuf->payload, 1024, "%d %d|%s", sum( wbuf ), sum( trace ), wbuf ); sbuf->mtype = mtype; // fill in the message bits if( mtype < 3 ) { @@ -181,14 +216,19 @@ int main( int argc, char** argv ) { switch( sbuf->state ) { case RMR_ERR_RETRY: rt_count++; - while( sbuf->state == RMR_ERR_RETRY ) { // soft failure (device busy?) retry + while( time( NULL ) < timeout && sbuf->state == RMR_ERR_RETRY ) { // soft failure (device busy?) retry sbuf = rmr_send_msg( mrc, sbuf ); // retry send until it's good (simple test; real programmes should do better) } if( sbuf->state == RMR_OK ) { + if( successful == 0 ) { + fail_count = 0; // count only after first message goes through + } successful = 1; // indicates only that we sent one successful message, not the current state } else { - if( successful ) { - fail_count++; // count failures after first successful message + fail_count++; // count failures after first successful message + if( !successful && fail_count > 30 ) { + fprintf( stderr, "[FAIL] too many send errors for this test\n" ); + exit( 1 ); } } break; @@ -210,7 +250,7 @@ int main( int argc, char** argv ) { count++; mtype++; if( mtype >= max_mt ) { // if large number of sends don't require infinite rt entries :) - mtype = 0; + mtype = start_mt; } } @@ -220,6 +260,7 @@ int main( int argc, char** argv ) { errno = 0; rbuf = rmr_rcv_msg( mrc, rbuf ); if( rbuf ) { + rts_ok += vet_received( me, rbuf->payload ); rcvd_count++; } } @@ -227,6 +268,7 @@ int main( int argc, char** argv ) { } else { // nano, we will only pick up one at a time. if( (rbuf = rmr_rcv_msg( mrc, rbuf ) ) != NULL ) { if( rbuf->state == RMR_OK ) { + rts_ok += vet_received( me, rbuf->payload ); rcvd_count++; } } @@ -242,9 +284,9 @@ int main( int argc, char** argv ) { } } - - timeout = time( NULL ) + 2; // allow 2 seconds for the pipe to drain from the receiver - while( time( NULL ) < timeout ); + fprintf( stderr, " draining begins\n" ); + timeout = time( NULL ) + 20; // allow 20 seconds for the pipe to drain from the receiver + while( time( NULL ) < timeout ) { if( rcv_fd >= 0 ) { while( (nready = epoll_wait( ep_fd, events, 1, 100 )) > 0 ) { // if something ready to receive (non-blocking check) if( events[0].data.fd == rcv_fd ) { // we only are waiting on 1 thing, so [0] is ok @@ -252,7 +294,8 @@ int main( int argc, char** argv ) { rbuf = rmr_rcv_msg( mrc, rbuf ); if( rbuf ) { rcvd_count++; - timeout = time( NULL ) + 2; + rts_ok += vet_received( me, rbuf->payload ); + timeout = time( NULL ) + 10; // break 10s after last received message } } } @@ -260,11 +303,19 @@ int main( int argc, char** argv ) { if( (rbuf = rmr_torcv_msg( mrc, rbuf, 100 ) ) != NULL ) { if( rbuf->state == RMR_OK ) { rcvd_count++; + rts_ok += vet_received( me, rbuf->payload ); } } } + } + fprintf( stderr, " draining finishes\n" ); + + if( rcvd_count != rts_ok || count != nmsgs ) { + pass = 0; + } - fprintf( stderr, " [%s] sent=%d rcvd-acks=%d failures=%d retries=%d\n", count == nmsgs ? "PASS" : "FAIL", count, rcvd_count, fail_count, rt_count ); + fprintf( stderr, " [%s] sent=%d rcvd=%d rts-ok=%d failures=%d retries=%d\n", + pass ? "PASS" : "FAIL", count, rcvd_count, rts_ok, fail_count, rt_count ); rmr_close( mrc ); return !( count == nmsgs );