Release RMR version 4.8.2
[ric-plt/lib/rmr.git] / test / app_test / receiver.c
index dd639ce..6f739a5 100644 (file)
                                        RMR_SEED_RT -- path to the static routing table
                                        RMR_RTG_SVC -- port to listen for RTG connections
 
+                               Compile time options
+                               if -DMTC is defined on the compile command, then RMr is initialised
+                               with the multi-threaded receive thread rather than using the same
+                               process receive function. All other functions in the receiver are
+                               the same.
+
        Date:           18 April 2019
        Author:         E. Scott Daniels
 */
@@ -99,12 +105,15 @@ int main( int argc, char** argv ) {
        long good = 0;                                          // good palyload buffers
        long bad = 0;                                           // payload buffers which were not correct
        long bad_tr = 0;                                        // trace buffers that were not correct
+       long bad_sid = 0;                                       // bad subscription ids
        long timeout = 0;
        char*   data;
-       char    wbuf[1024];                                     // we'll pull trace data into here
        int             nmsgs = 10;                                     // number of messages to stop after (argv[1] overrides)
        int             rt_count = 0;                           // retry count
        long ack_count = 0;                                     // number of acks sent
+       int             count_bins[11];                         // histogram bins based on msg type (0-10)
+       char    wbuf[1024];                                     // we'll pull trace data into here, and use as general working buffer
+       char    sbuf[128];                                      // short buffer
 
        data = getenv( "RMR_RTG_SVC" );
        if( data == NULL ) {
@@ -118,9 +127,17 @@ int main( int argc, char** argv ) {
                listen_port = argv[2];
        }
 
+       memset( count_bins, 0, sizeof( count_bins ) );
+
        fprintf( stderr, "<RCVR> listening on port: %s for a max of %d messages\n", listen_port, nmsgs );
 
+#ifdef MTC
+       fprintf( stderr, "<RCVR> starting in multi-threaded mode\n" );
+       mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_MTCALL ); // start RMr in mt-receive mode
+#else
+       fprintf( stderr, "<RCVR> starting in direct receive mode\n" );
        mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE );   // start your engines!
+#endif
        if( mrc == NULL ) {
                fprintf( stderr, "<RCVR> ABORT:  unable to initialise RMr\n" );
                exit( 1 );
@@ -165,12 +182,25 @@ int main( int argc, char** argv ) {
                                }
                                count++;                                                                        // messages received for stats output
 
-                               if( msg->mtype == 5 ) {                                         // send an ack; sender will count but not process, so data in message is moot
-                                       msg = rmr_rts_msg( mrc, msg );                                                          // we don't try to resend if this returns retry
+                               if( msg->mtype < 3 ) {                                                  // count number of properly set subscription id
+                                       if( msg->sub_id != msg->mtype * 10 ) {
+                                               bad_sid++;
+                                       }
+                               }
+
+                               if( msg->mtype >= 0 && msg->mtype <= 10 ) {
+                                       count_bins[msg->mtype]++;
+                               }
+
+                               if( msg->mtype == 5 ) {                                                                                 // send an ack; sender will count but not process, so data in message is moot
+                                       msg = rmr_rts_msg( mrc, msg );
                                        rt_count = 1000;
                                        while( rt_count > 0 && msg != NULL && msg->state == RMR_ERR_RETRY ) {           // to work right in nano we need this :(
-                                               if( ack_count < 1 ) {                                                                   // need to connect, so hard wait
+                                               if( ack_count < 1 ) {                                                                   // 1st ack, so we need to connect, and we'll wait for that
                                                        sleep( 1 );
+                                                       if( rt_count > 5 ) {
+                                                               rt_count = 5;                                                                   // but only for 5sec; not 1000sec!
+                                                       }
                                                }
                                                rt_count--;
                                                msg = rmr_rts_msg( mrc, msg );                                                  // we don't try to resend if this returns retry
@@ -179,6 +209,8 @@ int main( int argc, char** argv ) {
                                                ack_count++;
                                        }
                                }
+
+                               timeout = time( NULL ) + 10;                                    // extend timeout to 10s past last received message
                        }
                }
 
@@ -189,7 +221,16 @@ int main( int argc, char** argv ) {
                }
        }
 
-       fprintf( stderr, "<RCVR> [%s] %ld messages;  good=%ld  acked=%ld bad=%ld  bad-trace=%ld\n", !!(errors + bad + bad_tr) ? "FAIL" : "PASS", count, good, ack_count, bad, bad_tr );
+       wbuf[0] = 0;
+       for( i = 0; i < 11; i++ ) {
+               snprintf( sbuf, sizeof( sbuf ), "%6d ", count_bins[i] );
+               strcat( wbuf, sbuf );
+       }
+
+       fprintf( stderr, "<RCVR> mtype histogram: %s\n", wbuf );
+       fprintf( stderr, "<RCVR> [%s] %ld messages;  good=%ld  acked=%ld bad=%ld  bad-trace=%ld bad-sub_id=%ld\n", 
+               !!(errors + bad + bad_tr) ? "FAIL" : "PASS", count, good, ack_count, bad, bad_tr, bad_sid );
+
        sleep( 2 );                                                                     // let any outbound acks flow before closing
 
        rmr_close( mrc );