Add support to generate man pages in txt and md
[ric-plt/lib/rmr.git] / test / app_test / receiver.c
index ade69b3..e016f9e 100644 (file)
@@ -1,14 +1,14 @@
 // :vim ts=4 sw=4 noet:
 /*
 ==================================================================================
 // :vim ts=4 sw=4 noet:
 /*
 ==================================================================================
-    Copyright (c) 2019 Nokia
-    Copyright (c) 2018-2019 AT&T Intellectual Property.
+       Copyright (c) 2019 Nokia
+       Copyright (c) 2018-2019 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at
 
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at
 
-       http://www.apache.org/licenses/LICENSE-2.0
+          http://www.apache.org/licenses/LICENSE-2.0
 
    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
 
    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
                                        ck1 ck2|<msg text><nil>
 
                                ck1 is a simple checksum of the message text (NOT including the
                                        ck1 ck2|<msg text><nil>
 
                                ck1 is a simple checksum of the message text (NOT including the
-                               nil at the end of the string. 
+                               nil at the end of the string.
 
                                ck2 is a simple checksum of the trace data which for the purposes
                                of testing is assumed to have a terminating nil to keep this simple.
 
                                Good messages are messages where both computed checksums match
 
                                ck2 is a simple checksum of the trace data which for the purposes
                                of testing is assumed to have a terminating nil to keep this simple.
 
                                Good messages are messages where both computed checksums match
-                               the ck1 and ck2 values. 
+                               the ck1 and ck2 values.
 
                                The receiver will send an 'ack' message back to the sender for
                                all type 5 messages received.
 
                                The sender and receiver can be run on the same host/container
 
                                The receiver will send an 'ack' message back to the sender for
                                all type 5 messages received.
 
                                The sender and receiver can be run on the same host/container
-                               or on different hosts. The route table is the key to setting 
+                               or on different hosts. The route table is the key to setting
                                things up properly.  See the sender code for rt information.
 
                                Define these environment variables to have some control:
                                        RMR_SEED_RT -- path to the static routing table
                                        RMR_RTG_SVC -- port to listen for RTG connections
 
                                things up properly.  See the sender code for rt information.
 
                                Define these environment variables to have some control:
                                        RMR_SEED_RT -- path to the static routing table
                                        RMR_RTG_SVC -- port to listen for RTG connections
 
+                               Compile time options
+                               if -DMTC is defined on the compile command, then RMr is initialised
+                               with the multi-threaded receive thread rather than using the same
+                               process receive function. All other functions in the receiver are
+                               the same.
+
        Date:           18 April 2019
        Author:         E. Scott Daniels
 */
        Date:           18 April 2019
        Author:         E. Scott Daniels
 */
@@ -73,7 +79,7 @@ static int sum( char* str ) {
 
 /*
        Split the message at the first sep and return a pointer to the first
 
 /*
        Split the message at the first sep and return a pointer to the first
-       character after. 
+       character after.
 */
 static char* split( char* str, char sep ) {
        char*   s;
 */
 static char* split( char* str, char sep ) {
        char*   s;
@@ -89,8 +95,8 @@ static char* split( char* str, char sep ) {
 }
 
 int main( int argc, char** argv ) {
 }
 
 int main( int argc, char** argv ) {
-    void* mrc;                                         // msg router context
-    rmr_mbuf_t* msg = NULL;                            // message received
+       void* mrc;                                              // msg router context
+       rmr_mbuf_t* msg = NULL;                         // message received
        int i;
        int             state;
        int             errors = 0;
        int i;
        int             state;
        int             errors = 0;
@@ -99,12 +105,15 @@ int main( int argc, char** argv ) {
        long good = 0;                                          // good palyload buffers
        long bad = 0;                                           // payload buffers which were not correct
        long bad_tr = 0;                                        // trace buffers that were not correct
        long good = 0;                                          // good palyload buffers
        long bad = 0;                                           // payload buffers which were not correct
        long bad_tr = 0;                                        // trace buffers that were not correct
+       long bad_sid = 0;                                       // bad subscription ids
        long timeout = 0;
        char*   data;
        long timeout = 0;
        char*   data;
-       char    wbuf[1024];                                     // we'll pull trace data into here
        int             nmsgs = 10;                                     // number of messages to stop after (argv[1] overrides)
        int             rt_count = 0;                           // retry count
        long ack_count = 0;                                     // number of acks sent
        int             nmsgs = 10;                                     // number of messages to stop after (argv[1] overrides)
        int             rt_count = 0;                           // retry count
        long ack_count = 0;                                     // number of acks sent
+       int             count_bins[11];                         // histogram bins based on msg type (0-10)
+       char    wbuf[1024];                                     // we'll pull trace data into here, and use as general working buffer
+       char    sbuf[128];                                      // short buffer
 
        data = getenv( "RMR_RTG_SVC" );
        if( data == NULL ) {
 
        data = getenv( "RMR_RTG_SVC" );
        if( data == NULL ) {
@@ -117,10 +126,17 @@ int main( int argc, char** argv ) {
        if( argc > 2 ) {
                listen_port = argv[2];
        }
        if( argc > 2 ) {
                listen_port = argv[2];
        }
-       
+
+       memset( count_bins, 0, sizeof( count_bins ) );
+
        fprintf( stderr, "<RCVR> listening on port: %s for a max of %d messages\n", listen_port, nmsgs );
 
        fprintf( stderr, "<RCVR> listening on port: %s for a max of %d messages\n", listen_port, nmsgs );
 
-    mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE );      // start your engines!
+#ifdef MTC
+       mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_MTCALL ); // start RMr in mt-receive mode
+
+#else
+       mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE );   // start your engines!
+#endif
        if( mrc == NULL ) {
                fprintf( stderr, "<RCVR> ABORT:  unable to initialise RMr\n" );
                exit( 1 );
        if( mrc == NULL ) {
                fprintf( stderr, "<RCVR> ABORT:  unable to initialise RMr\n" );
                exit( 1 );
@@ -139,9 +155,9 @@ int main( int argc, char** argv ) {
        fprintf( stderr, "<RCVR> rmr now shows ready, listening begins\n" );
 
        timeout = time( NULL ) + 20;
        fprintf( stderr, "<RCVR> rmr now shows ready, listening begins\n" );
 
        timeout = time( NULL ) + 20;
-    while( count < nmsgs ) {
+       while( count < nmsgs ) {
                msg = rmr_torcv_msg( mrc, msg, 1000 );                          // wait for about 1s so that if sender never starts we eventually escape
                msg = rmr_torcv_msg( mrc, msg, 1000 );                          // wait for about 1s so that if sender never starts we eventually escape
-               
+
                if( msg ) {
                        if( msg->state == RMR_OK ) {
                                if( (data = split( msg->payload, '|'  )) != NULL ) {
                if( msg ) {
                        if( msg->state == RMR_OK ) {
                                if( (data = split( msg->payload, '|'  )) != NULL ) {
@@ -165,11 +181,21 @@ int main( int argc, char** argv ) {
                                }
                                count++;                                                                        // messages received for stats output
 
                                }
                                count++;                                                                        // messages received for stats output
 
-                               if( msg->mtype == 5 ) {                                         // send an ack; sender will count but not process, so data in message is moot
-                                       msg = rmr_rts_msg( mrc, msg );                                                          // we don't try to resend if this returns retry
+                               if( msg->mtype < 3 ) {                                                  // count number of properly set subscription id
+                                       if( msg->sub_id != msg->mtype * 10 ) {
+                                               bad_sid++;
+                                       }
+                               }
+
+                               if( msg->mtype >= 0 && msg->mtype <= 10 ) {
+                                       count_bins[msg->mtype]++;
+                               }
+
+                               if( msg->mtype == 5 ) {                                                                                 // send an ack; sender will count but not process, so data in message is moot
+                                       msg = rmr_rts_msg( mrc, msg );
                                        rt_count = 1000;
                                        while( rt_count > 0 && msg != NULL && msg->state == RMR_ERR_RETRY ) {           // to work right in nano we need this :(
                                        rt_count = 1000;
                                        while( rt_count > 0 && msg != NULL && msg->state == RMR_ERR_RETRY ) {           // to work right in nano we need this :(
-                                               if( ack_count < 1 ) {                                                                   // need to connect, so hard wait
+                                               if( ack_count < 1 ) {                                                                   // 1st ack, so we need to connect, and we'll wait for that
                                                        sleep( 1 );
                                                }
                                                rt_count--;
                                                        sleep( 1 );
                                                }
                                                rt_count--;
@@ -187,9 +213,18 @@ int main( int argc, char** argv ) {
                        errors++;
                        break;
                }
                        errors++;
                        break;
                }
-    }
+       }
+
+       wbuf[0] = 0;
+       for( i = 0; i < 11; i++ ) {
+               snprintf( sbuf, sizeof( sbuf ), "%6d ", count_bins[i] );
+               strcat( wbuf, sbuf );
+       }
+
+       fprintf( stderr, "<RCVR> mtype histogram: %s\n", wbuf );
+       fprintf( stderr, "<RCVR> [%s] %ld messages;  good=%ld  acked=%ld bad=%ld  bad-trace=%ld bad-sub_id=%ld\n", 
+               !!(errors + bad + bad_tr) ? "FAIL" : "PASS", count, good, ack_count, bad, bad_tr, bad_sid );
 
 
-       fprintf( stderr, "<RCVR> [%s] %ld messages;  good=%ld  acked=%ld bad=%ld  bad-trace=%ld\n", !!(errors + bad + bad_tr) ? "FAIL" : "PASS", count, good, ack_count, bad, bad_tr );
        sleep( 2 );                                                                     // let any outbound acks flow before closing
 
        rmr_close( mrc );
        sleep( 2 );                                                                     // let any outbound acks flow before closing
 
        rmr_close( mrc );