Documentation changes needed to support RTD
[ric-plt/lib/rmr.git] / test / app_test / receiver.c
index ade69b3..3cf1498 100644 (file)
@@ -1,14 +1,14 @@
 // :vim ts=4 sw=4 noet:
 /*
 ==================================================================================
-    Copyright (c) 2019 Nokia
-    Copyright (c) 2018-2019 AT&T Intellectual Property.
+       Copyright (c) 2019 Nokia
+       Copyright (c) 2018-2019 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at
 
-       http://www.apache.org/licenses/LICENSE-2.0
+          http://www.apache.org/licenses/LICENSE-2.0
 
    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
                                        ck1 ck2|<msg text><nil>
 
                                ck1 is a simple checksum of the message text (NOT including the
-                               nil at the end of the string. 
+                               nil at the end of the string.
 
                                ck2 is a simple checksum of the trace data which for the purposes
                                of testing is assumed to have a terminating nil to keep this simple.
 
                                Good messages are messages where both computed checksums match
-                               the ck1 and ck2 values. 
+                               the ck1 and ck2 values.
 
                                The receiver will send an 'ack' message back to the sender for
                                all type 5 messages received.
 
                                The sender and receiver can be run on the same host/container
-                               or on different hosts. The route table is the key to setting 
+                               or on different hosts. The route table is the key to setting
                                things up properly.  See the sender code for rt information.
 
                                Define these environment variables to have some control:
                                        RMR_SEED_RT -- path to the static routing table
                                        RMR_RTG_SVC -- port to listen for RTG connections
 
+                               Compile time options
+                               if -DMTC is defined on the compile command, then RMr is initialised
+                               with the multi-threaded receive thread rather than using the same
+                               process receive function. All other functions in the receiver are
+                               the same.
+
        Date:           18 April 2019
        Author:         E. Scott Daniels
 */
@@ -73,7 +79,7 @@ static int sum( char* str ) {
 
 /*
        Split the message at the first sep and return a pointer to the first
-       character after. 
+       character after.
 */
 static char* split( char* str, char sep ) {
        char*   s;
@@ -89,8 +95,8 @@ static char* split( char* str, char sep ) {
 }
 
 int main( int argc, char** argv ) {
-    void* mrc;                                         // msg router context
-    rmr_mbuf_t* msg = NULL;                            // message received
+       void* mrc;                                              // msg router context
+       rmr_mbuf_t* msg = NULL;                         // message received
        int i;
        int             state;
        int             errors = 0;
@@ -99,12 +105,15 @@ int main( int argc, char** argv ) {
        long good = 0;                                          // good palyload buffers
        long bad = 0;                                           // payload buffers which were not correct
        long bad_tr = 0;                                        // trace buffers that were not correct
+       long bad_sid = 0;                                       // bad subscription ids
        long timeout = 0;
        char*   data;
-       char    wbuf[1024];                                     // we'll pull trace data into here
        int             nmsgs = 10;                                     // number of messages to stop after (argv[1] overrides)
        int             rt_count = 0;                           // retry count
        long ack_count = 0;                                     // number of acks sent
+       int             count_bins[11];                         // histogram bins based on msg type (0-10)
+       char    wbuf[1024];                                     // we'll pull trace data into here, and use as general working buffer
+       char    sbuf[128];                                      // short buffer
 
        data = getenv( "RMR_RTG_SVC" );
        if( data == NULL ) {
@@ -117,10 +126,18 @@ int main( int argc, char** argv ) {
        if( argc > 2 ) {
                listen_port = argv[2];
        }
-       
+
+       memset( count_bins, 0, sizeof( count_bins ) );
+
        fprintf( stderr, "<RCVR> listening on port: %s for a max of %d messages\n", listen_port, nmsgs );
 
-    mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE );      // start your engines!
+#ifdef MTC
+       fprintf( stderr, "<RCVR> starting in multi-threaded mode\n" );
+       mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_MTCALL ); // start RMr in mt-receive mode
+#else
+       fprintf( stderr, "<RCVR> starting in direct receive mode\n" );
+       mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE );   // start your engines!
+#endif
        if( mrc == NULL ) {
                fprintf( stderr, "<RCVR> ABORT:  unable to initialise RMr\n" );
                exit( 1 );
@@ -139,9 +156,9 @@ int main( int argc, char** argv ) {
        fprintf( stderr, "<RCVR> rmr now shows ready, listening begins\n" );
 
        timeout = time( NULL ) + 20;
-    while( count < nmsgs ) {
+       while( count < nmsgs ) {
                msg = rmr_torcv_msg( mrc, msg, 1000 );                          // wait for about 1s so that if sender never starts we eventually escape
-               
+
                if( msg ) {
                        if( msg->state == RMR_OK ) {
                                if( (data = split( msg->payload, '|'  )) != NULL ) {
@@ -165,11 +182,21 @@ int main( int argc, char** argv ) {
                                }
                                count++;                                                                        // messages received for stats output
 
-                               if( msg->mtype == 5 ) {                                         // send an ack; sender will count but not process, so data in message is moot
-                                       msg = rmr_rts_msg( mrc, msg );                                                          // we don't try to resend if this returns retry
+                               if( msg->mtype < 3 ) {                                                  // count number of properly set subscription id
+                                       if( msg->sub_id != msg->mtype * 10 ) {
+                                               bad_sid++;
+                                       }
+                               }
+
+                               if( msg->mtype >= 0 && msg->mtype <= 10 ) {
+                                       count_bins[msg->mtype]++;
+                               }
+
+                               if( msg->mtype == 5 ) {                                                                                 // send an ack; sender will count but not process, so data in message is moot
+                                       msg = rmr_rts_msg( mrc, msg );
                                        rt_count = 1000;
                                        while( rt_count > 0 && msg != NULL && msg->state == RMR_ERR_RETRY ) {           // to work right in nano we need this :(
-                                               if( ack_count < 1 ) {                                                                   // need to connect, so hard wait
+                                               if( ack_count < 1 ) {                                                                   // 1st ack, so we need to connect, and we'll wait for that
                                                        sleep( 1 );
                                                }
                                                rt_count--;
@@ -187,9 +214,18 @@ int main( int argc, char** argv ) {
                        errors++;
                        break;
                }
-    }
+       }
+
+       wbuf[0] = 0;
+       for( i = 0; i < 11; i++ ) {
+               snprintf( sbuf, sizeof( sbuf ), "%6d ", count_bins[i] );
+               strcat( wbuf, sbuf );
+       }
+
+       fprintf( stderr, "<RCVR> mtype histogram: %s\n", wbuf );
+       fprintf( stderr, "<RCVR> [%s] %ld messages;  good=%ld  acked=%ld bad=%ld  bad-trace=%ld bad-sub_id=%ld\n", 
+               !!(errors + bad + bad_tr) ? "FAIL" : "PASS", count, good, ack_count, bad, bad_tr, bad_sid );
 
-       fprintf( stderr, "<RCVR> [%s] %ld messages;  good=%ld  acked=%ld bad=%ld  bad-trace=%ld\n", !!(errors + bad + bad_tr) ? "FAIL" : "PASS", count, good, ack_count, bad, bad_tr );
        sleep( 2 );                                                                     // let any outbound acks flow before closing
 
        rmr_close( mrc );