X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=test%2Fapp_test%2Fsender.c;h=015b943ffafaf34cf7bf7f0fa76ffbdcd9ad9e59;hb=68d09fa5028e47e763c44c30647da31e77eda64a;hp=89a0afc6beac75bc5ceea80616f2985a722e2edd;hpb=e8a5b2c912d4be9cc93bc52ad7a460b57321c5fd;p=ric-plt%2Flib%2Frmr.git diff --git a/test/app_test/sender.c b/test/app_test/sender.c index 89a0afc..015b943 100644 --- a/test/app_test/sender.c +++ b/test/app_test/sender.c @@ -1,14 +1,14 @@ // :vim ts=4 sw=4 noet: /* ================================================================================== - Copyright (c) 2019 Nokia - Copyright (c) 2018-2019 AT&T Intellectual Property. + Copyright (c) 2019 Nokia + Copyright (c) 2018-2019 AT&T Intellectual Property. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -20,29 +20,35 @@ /* Mnemonic: sender.c - Abstract: This is a simple sender which will send a series of messages. + Abstract: This is a simple sender which will send a series of messages. It is expected that the first attempt(s) will fail if the receiver is not up and this does not start decrementing the number to - send until it has a good send. + send until it has a good send. The process will check the receive queue and list received messages but pass/fail is not dependent on what comes back. If the receiver(s) do not become connectable in 20 sec this process - will give up and fail. + will give up and fail. - Message types will vary between 1 and 10, so the route table must - be set up to support those message types. + Message types will vary between 0 and 9, so the route table must + be set up to support those message types. Further, for message types + 0, 1 and 2, the subscription ID will be set to type x 10, so the route + table must be set to include the sub-id for those types in order for + the messages to reach their destination. Message format is: ck1 ck2| Ck1 is the simple check sum of the msg-text (NOT includeing ) Ck2 is the simple check sum of the trace data which is a nil terminated - series of bytes. + series of bytes. - Parms: argv[1] == nmsgs; argv[2] == delay; argv[3] == listen port + Parms: argv[1] == number of msgs to send (10) + argv[2] == delay (mu-seconds, 1000000 default) + argv[3] == max msg type (not inclusive; default 10) + argv[4] == listen port Sender will send for at most 20 seconds, so if nmsgs and delay extend beyond that period the total number of messages sent will be less @@ -73,27 +79,48 @@ static int sum( char* str ) { return sum % 255; } +/* + See if my id string is in the buffer immediately after the first >. + Return 1 if so, 0 if not. +*/ +static int vet_received( char* me, char* buf ) { + char* ch; + + if( (ch = strchr( buf, '>' )) == NULL ) { + return 0; + } + + return strcmp( me, ch+1 ) == 0; +} + int main( int argc, char** argv ) { - void* mrc; // msg router context - struct epoll_event events[1]; // list of events to give to epoll - struct epoll_event epe; // event definition for event to listen to - int ep_fd = -1; // epoll's file des (given to epoll_wait) - int rcv_fd; // file des that NNG tickles -- give this to epoll to listen on - int nready; // number of events ready for receive + void* mrc; // msg router context + struct epoll_event events[1]; // list of events to give to epoll + struct epoll_event epe; // event definition for event to listen to + int ep_fd = -1; // epoll's file des (given to epoll_wait) + int rcv_fd; // file des that NNG tickles -- give this to epoll to listen on + int nready; // number of events ready for receive rmr_mbuf_t* sbuf; // send buffer rmr_mbuf_t* rbuf; // received buffer - int count = 0; - int rt_count = 0; // number of messages requiring a spin retry - int rcvd_count = 0; + char* ch; + int count = 0; + int rt_count = 0; // number of messages requiring a spin retry + int rcvd_count = 0; + int rts_ok = 0; // number received with our tag + int fail_count = 0; // # of failure sends after first successful send char* listen_port = "43086"; int mtype = 0; int stats_freq = 100; int successful = 0; // set to true after we have a successful send char wbuf[1024]; + char me[128]; // who I am to vet rts was actually from me char trace[1024]; long timeout = 0; int delay = 100000; // usec between send attempts int nmsgs = 10; // number of messages to send + int max_mt = 10; // reset point for message type + int start_mt = 0; + int pass = 1; if( argc > 1 ) { nmsgs = atoi( argv[1] ); @@ -102,17 +129,27 @@ int main( int argc, char** argv ) { delay = atoi( argv[2] ); } if( argc > 3 ) { - listen_port = argv[3]; + if( (ch = strchr( argv[3], ':' )) != NULL ) { + max_mt = atoi( ch+1 ); + start_mt = atoi( argv[3] ); + } else { + max_mt = atoi( argv[3] ); + } + } + if( argc > 4 ) { + listen_port = argv[4]; } + mtype = start_mt; + fprintf( stderr, " listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay ); - if( (mrc = rmr_init( listen_port, 1400, RMRFL_NONE )) == NULL ) { + if( (mrc = rmr_init( listen_port, 1400, RMRFL_NONE )) == NULL ) { fprintf( stderr, " unable to initialise RMr\n" ); exit( 1 ); } - if( (rcv_fd = rmr_get_rcvfd( mrc )) >= 0 ) { // epoll only available from NNG -- skip receive later if not NNG + if( (rcv_fd = rmr_get_rcvfd( mrc )) >= 0 ) { // epoll only available from NNG -- skip receive later if not NNG if( rcv_fd < 0 ) { fprintf( stderr, " unable to set up polling fd\n" ); exit( 1 ); @@ -121,10 +158,10 @@ int main( int argc, char** argv ) { fprintf( stderr, " [FAIL] unable to create epoll fd: %d\n", errno ); exit( 1 ); } - epe.events = EPOLLIN; - epe.data.fd = rcv_fd; + epe.events = EPOLLIN; + epe.data.fd = rcv_fd; - if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 ) { + if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 ) { fprintf( stderr, " [FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) ); exit( 1 ); } @@ -132,8 +169,8 @@ int main( int argc, char** argv ) { rmr_set_rtimeout( mrc, 0 ); // for nano we must set the receive timeout to 0; non-blocking receive } - sbuf = rmr_alloc_msg( mrc, 512 ); // alloc first send buffer; subsequent buffers allcoated on send - //sbuf = rmr_tralloc_msg( mrc, 512, 11, "xxxxxxxxxx" ); // alloc first send buffer; subsequent buffers allcoated on send + sbuf = rmr_alloc_msg( mrc, 1024 ); // alloc first send buffer; subsequent buffers allcoated on send + //sbuf = rmr_tralloc_msg( mrc, 1024, 11, "xxxxxxxxxx" ); // alloc first send buffer; subsequent buffers allcoated on send rbuf = NULL; // don't need to alloc receive buffer timeout = time( NULL ) + 20; // give rmr 20s to find the route table (shouldn't need that much) @@ -147,16 +184,25 @@ int main( int argc, char** argv ) { } } fprintf( stderr, " rmr is ready; starting to send\n" ); - + timeout = time( NULL ) + 20; - while( count < nmsgs ) { // we send 10 messages after the first message is successful + gethostname( wbuf, sizeof( wbuf ) ); + snprintf( me, sizeof( me ), "%s-%d", wbuf, getpid( ) ); + + while( count < nmsgs ) { // we send n messages after the first message is successful snprintf( trace, 100, "%lld", (long long) time( NULL ) ); rmr_set_trace( sbuf, trace, strlen( trace ) + 1 ); - snprintf( wbuf, 200, "count=%d tr=%s %d stand up and cheer!", count, trace, rand() ); - snprintf( sbuf->payload, 300, "%d %d|%s", sum( wbuf ), sum( trace ), wbuf ); + snprintf( wbuf, 512, "count=%d tr=%s %d stand up and cheer!>%s", count, trace, rand(), me ); + snprintf( sbuf->payload, 1024, "%d %d|%s", sum( wbuf ), sum( trace ), wbuf ); sbuf->mtype = mtype; // fill in the message bits + if( mtype < 3 ) { + sbuf->sub_id = mtype * 10; + } else { + sbuf->sub_id = -1; + } + sbuf->len = strlen( sbuf->payload ) + 1; // our receiver likely wants a nice acsii-z string sbuf->state = 0; sbuf = rmr_send_msg( mrc, sbuf ); // send it (send returns an empty payload on success, or the original payload on fail/retry) @@ -164,10 +210,16 @@ int main( int argc, char** argv ) { switch( sbuf->state ) { case RMR_ERR_RETRY: rt_count++; - while( sbuf->state == RMR_ERR_RETRY ) { // soft failure (device busy?) retry + while( time( NULL ) < timeout && sbuf->state == RMR_ERR_RETRY ) { // soft failure (device busy?) retry sbuf = rmr_send_msg( mrc, sbuf ); // retry send until it's good (simple test; real programmes should do better) } - successful = 1; + if( sbuf->state == RMR_OK ) { + successful = 1; // indicates only that we sent one successful message, not the current state + } else { + if( successful ) { + fail_count++; // count failures after first successful message + } + } break; case RMR_OK: @@ -175,15 +227,19 @@ int main( int argc, char** argv ) { break; default: + if( successful ) { + fail_count++; // count failures after first successful message + } // some error (not connected likely), don't count this + //sleep( 1 ); break; } if( successful ) { // once we have a message that was sent, start to increase things count++; mtype++; - if( mtype > 10 ) { // if large number of sends don't require infinite rt entries :) - mtype = 1; + if( mtype >= max_mt ) { // if large number of sends don't require infinite rt entries :) + mtype = start_mt; } } @@ -193,6 +249,7 @@ int main( int argc, char** argv ) { errno = 0; rbuf = rmr_rcv_msg( mrc, rbuf ); if( rbuf ) { + rts_ok += vet_received( me, rbuf->payload ); rcvd_count++; } } @@ -200,6 +257,7 @@ int main( int argc, char** argv ) { } else { // nano, we will only pick up one at a time. if( (rbuf = rmr_rcv_msg( mrc, rbuf ) ) != NULL ) { if( rbuf->state == RMR_OK ) { + rts_ok += vet_received( me, rbuf->payload ); rcvd_count++; } } @@ -213,9 +271,8 @@ int main( int argc, char** argv ) { if( delay > 0 ) { usleep( delay ); } - } + } - timeout = time( NULL ) + 2; // allow 2 seconds for the pipe to drain from the receiver while( time( NULL ) < timeout ); if( rcv_fd >= 0 ) { @@ -225,6 +282,7 @@ int main( int argc, char** argv ) { rbuf = rmr_rcv_msg( mrc, rbuf ); if( rbuf ) { rcvd_count++; + rts_ok += vet_received( me, rbuf->payload ); timeout = time( NULL ) + 2; } } @@ -233,11 +291,17 @@ int main( int argc, char** argv ) { if( (rbuf = rmr_torcv_msg( mrc, rbuf, 100 ) ) != NULL ) { if( rbuf->state == RMR_OK ) { rcvd_count++; + rts_ok += vet_received( me, rbuf->payload ); } } } - fprintf( stderr, " [%s] sent %d messages received %d acks retries=%d\n", count == nmsgs ? "PASS" : "FAIL", count, rcvd_count, rt_count ); + if( rcvd_count != rts_ok || count != nmsgs ) { + pass = 0; + } + + fprintf( stderr, " [%s] sent=%d rcvd=%d rts-ok=%d failures=%d retries=%d\n", + pass ? "PASS" : "FAIL", count, rcvd_count, rts_ok, fail_count, rt_count ); rmr_close( mrc ); return !( count == nmsgs );