Eliminate the SI receive buffer length requirement
[ric-plt/lib/rmr.git] / test / app_test / ex_rts_receiver.c
index 3873c77..287143f 100644 (file)
@@ -54,6 +54,8 @@
 #include <string.h>
 
 #include <rmr/rmr.h>
+extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone );
+
 
 #define HDR_SIZE 64                                                    // the size of the header we place into the message
 #define MSG_SIZE 1024                                          // the size of the message that will be sent (hdr+payload)
@@ -69,6 +71,7 @@
 int main( int argc, char** argv ) {
        void* mrc;                                              // msg router context
        rmr_mbuf_t* msg = NULL;                         // message received
+       rmr_mbuf_t* omsg = NULL;                        // original message if cloning
        int     i;
        int             j;
        int             state;
@@ -92,43 +95,79 @@ int main( int argc, char** argv ) {
        char    ack_data[DATA_SIZE];            // data randomly generated for each response
        int             need;                                           // amount of something that we need
        int             sv;                                                     // checksum valu
+       int             clone = 0;
+       int             copy = 0;
+       char    osrc[128];                                      // src strings to test when cloning
+       char    nsrc[128];
+       int             verbose = 0;
+       int             old_len;
+       int             dmb_size = 2048;                        // default message buffer size
 
        data = getenv( "RMR_RTG_SVC" );
        if( data == NULL ) {
                setenv( "RMR_RTG_SVC", "19289", 1 );            // set one that won't collide with the sender if on same host
        }
 
-       if( argc > 1 ) {
-               listen_port = argv[1];
+       i = 1;
+       for( i=1;  i < argc && argv[i] != NULL && *(argv[i]) == '-'; i++ ) {
+               data = argv[i];
+               switch( *(data+1) ) {
+                       case 'c':                                       //  copy clone values as xy value: 00 01 10 11
+                               if( i < argc-1 ) {
+                                       j = atoi( argv[i+1] );
+                                       clone = j % 10 != 0;
+                                       copy = (j/10) != 0;
+
+                                       i++;
+                               }
+                               break;
+
+                       case 'p':
+                               if( i < argc-1 ) {
+                                       listen_port = argv[i+1];
+                                       i++;
+                               }
+                               break;
+
+                       case 'v':
+                               verbose = 1;
+                               break;
+               }
        }
 
        memset( count_bins, 0, sizeof( count_bins ) );
 
-       fprintf( stderr, "<RCVR> listening on port: %s for a max of %d messages\n", listen_port, nmsgs );
+       fprintf( stderr, "<EXRCVR> listening on port %s for a max of 20 seconds\n", listen_port );
+       fprintf( stderr, "<EXRCVR> copy=%d clone=%d\n", copy, clone );
+
+       if( ! clone ) {
+               dmb_size = 128;                 // not cloning, force small buffer to test larger buffer receives
+       }
 
+       fprintf( stderr, "<EXRCVR> default receive message buffer size: %d\n", dmb_size );
 #ifdef MTC
-       fprintf( stderr, "<RCVR> starting in multi-threaded mode\n" );
-       mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_MTCALL ); // start RMr in mt-receive mode
+       fprintf( stderr, "<EXRCVR> starting in multi-threaded mode\n" );
+       mrc = rmr_init( listen_port, dmb_size, RMRFL_MTCALL ); // start RMr in mt-receive mode
 #else
-       fprintf( stderr, "<RCVR> starting in direct receive mode\n" );
-       mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE );   // start your engines!
+       fprintf( stderr, "<EXRCVR> starting in direct receive mode\n" );
+       mrc = rmr_init( listen_port, dmb_size, RMRFL_NONE );    // start your engines!
 #endif
        if( mrc == NULL ) {
-               fprintf( stderr, "<RCVR> ABORT:  unable to initialise RMr\n" );
+               fprintf( stderr, "<EXRCVR> ABORT:  unable to initialise RMr\n" );
                exit( 1 );
        }
 
        timeout = time( NULL ) + 20;
        while( ! rmr_ready( mrc ) ) {                                                           // wait for RMr to load a route table
-               fprintf( stderr, "<RCVR> waiting for RMr to show ready\n" );
+               fprintf( stderr, "<EXRCVR> waiting for RMr to show ready\n" );
                sleep( 1 );
 
                if( time( NULL ) > timeout ) {
-                       fprintf( stderr, "<RCVR> giving up\n" );
+                       fprintf( stderr, "<EXRCVR> giving up\n" );
                        exit( 1 );
                }
        }
-       fprintf( stderr, "<RCVR> rmr now shows ready, listening begins\n" );
+       fprintf( stderr, "<EXRCVR> rmr now shows ready, listening begins\n" );
 
        timeout = time( NULL ) + 20;
        while( 1 ) {
@@ -147,21 +186,56 @@ int main( int argc, char** argv ) {
                                        count_bins[msg->mtype]++;
                                }
 
-                               need = generate_payload( ack_header, ack_data, 0, 0 );          // create an ack w/ random payload in payload, and set data in header
-                               if( rmr_payload_size( msg ) < need ) {                                  // received message too small
+                               if( clone ) {
+                                       need = msg->len;                                                        // clone what was sent, not entire payload
+                               } else {
+                                       need = generate_payload( ack_header, ack_data, 0, 0 );          // create an ack w/ random payload in payload, and set data in header
+                               }
+
+                               if( clone || rmr_payload_size( msg ) < need ) {                                 // received message too small or we want to clone the original for test
+                                       old_len = msg->len;
                                        resized++;
-                                       msg = rmr_realloc_payload( msg, need, 0, 0 );           // reallocate the message with a payload big enough
+                                       omsg = msg;
+                                       rmr_get_src( omsg, osrc );
+                                       msg = rmr_realloc_payload( msg, need, copy, clone );            // reallocate the message with a payload big enough or clone if set
+                                       rmr_get_src( msg, nsrc );
+
+                                       if( msg->len != old_len ) {
+                                               fprintf( stderr, "[ERR] after realloc len=%d didn't match old len=%d\n", msg->len, old_len );
+                                       }
+                                       if( strcmp( osrc, nsrc ) != 0 ) {
+                                               fprintf( stderr, "[ERR] realloc source strings don't match (%s) new=(%s)\n", osrc, nsrc );
+                                       } else {
+                                               if( verbose ) {
+                                                       fprintf( stderr, "[OK]  realloc source strings match (%s) new=(%s)\n", osrc, nsrc );
+                                               }
+                                       }
+
+                                       if( copy ) {
+                                               if( memcmp( omsg->payload, msg->payload, rmr_payload_size( omsg ) ) != 0 ) {
+                                                       fprintf( stderr, "[ERR] realloc payload contents don't match %d bytes\n", rmr_payload_size( omsg ) );
+                                                       //spew( omsg->payload, rmr_payload_size( omsg ) );
+                                                       //fprintf( stderr, "\n---\n\n" );
+                                                       //spew( msg->payload, rmr_payload_size( omsg ) );
+                                                       exit( 1 );
+                                               }
+                                       }
+
+                                       if( clone ) {
+                                               rmr_free_msg( omsg );                           // we're done with the old msg, and must free b/c we cloned it
+                                       }
                                        if( msg == NULL ) {
                                                fprintf( stderr, "[ERR] realloc returned a nil pointer\n" );
                                                continue;
                                        }
                                }
 
-                               fill_payload( msg, ack_header, 0, ack_data, 0 );                // push headers (with default lengths) into message
+                               if( ! clone ) {
+                                       fill_payload( msg, ack_header, 0, ack_data, 0 );                // push headers (with default lengths) into message
+                               }
                                msg->mtype = 99;
                                msg->sub_id = -1;
                                msg->len = need;
-
                                msg = rmr_rts_msg( mrc, msg );                                                  // return our ack message
                                rt_count = 1000;
                                while( rt_count > 0 && msg != NULL && msg->state == RMR_ERR_RETRY ) {           // to work right in nano we need this :(
@@ -183,11 +257,11 @@ int main( int argc, char** argv ) {
                }
 
                if( time( NULL ) > timeout ) {
-                       fprintf( stderr, "<RCVR> stopping, no recent messages received\n" );
+                       fprintf( stderr, "<EXRCVR> stopping, no recent messages received\n" );
                        break;
                } else {
                        if( time( NULL ) > rpt_timeout ) {
-                               fprintf( stderr, "<RCVR> %ld msgs=%ld good=%ld  acked=%ld bad=%ld resized=%ld\n", (long) time( NULL ), count, good, ack_count, bad, resized );
+                               fprintf( stderr, "<EXRCVR> %ld msgs=%ld good=%ld  acked=%ld bad=%ld resized=%ld\n", (long) time( NULL ), count, good, ack_count, bad, resized );
 
                                rpt_timeout = time( NULL ) + 5;
                        }
@@ -200,8 +274,8 @@ int main( int argc, char** argv ) {
                strcat( wbuf, sbuf );
        }
 
-       fprintf( stderr, "<RCVR> mtype histogram: %s\n", wbuf );
-       fprintf( stderr, "<RCVR> [%s] %ld messages;  good=%ld  acked=%ld bad=%ld  resized=%ld bad-sub_id=%ld\n", 
+       fprintf( stderr, "<EXRCVR> mtype histogram: %s\n", wbuf );
+       fprintf( stderr, "<EXRCVR> [%s] %ld messages;  good=%ld  acked=%ld bad=%ld  resized=%ld bad-sub_id=%ld\n", 
                !!(errors + bad) ? "FAIL" : "PASS", count, good, ack_count, bad, resized,  bad_sid );
 
        sleep( 2 );                                                                     // let any outbound acks flow before closing