enhance(API): Add source IP support to msg header
[ric-plt/lib/rmr.git] / test / app_test / sender.c
index 89a0afc..015b943 100644 (file)
@@ -1,14 +1,14 @@
 // :vim ts=4 sw=4 noet:
 /*
 ==================================================================================
 // :vim ts=4 sw=4 noet:
 /*
 ==================================================================================
-    Copyright (c) 2019 Nokia
-    Copyright (c) 2018-2019 AT&T Intellectual Property.
+       Copyright (c) 2019 Nokia
+       Copyright (c) 2018-2019 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at
 
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at
 
-       http://www.apache.org/licenses/LICENSE-2.0
+          http://www.apache.org/licenses/LICENSE-2.0
 
    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
 
    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
 
 /*
        Mnemonic:       sender.c
 
 /*
        Mnemonic:       sender.c
-       Abstract:       This is a simple sender which will send a series of messages. 
+       Abstract:       This is a simple sender which will send a series of messages.
                                It is expected that the first attempt(s) will fail if the receiver
                                is not up and this does not start decrementing the number to
                                It is expected that the first attempt(s) will fail if the receiver
                                is not up and this does not start decrementing the number to
-                               send until it has a good send.  
+                               send until it has a good send.
 
                                The process will check the receive queue and list received messages
                                but pass/fail is not dependent on what comes back.
 
                                If the receiver(s) do not become connectable in 20 sec this process
 
                                The process will check the receive queue and list received messages
                                but pass/fail is not dependent on what comes back.
 
                                If the receiver(s) do not become connectable in 20 sec this process
-                               will give up and fail. 
+                               will give up and fail.
 
 
 
 
-                               Message types will vary between 1 and 10, so the route table must
-                               be set up to support those message types.  
+                               Message types will vary between 0 and 9, so the route table must
+                               be set up to support those message types. Further, for message types
+                               0, 1 and 2, the subscription ID will be set to type x 10, so the route
+                               table must be set to include the sub-id for those types in order for
+                               the messages to reach their destination.
 
                                Message format is:
                                        ck1 ck2|<msg-txt><nil>
 
                                Ck1 is the simple check sum of the msg-text (NOT includeing <nil>)
                                Ck2 is the simple check sum of the trace data which is a nil terminated
 
                                Message format is:
                                        ck1 ck2|<msg-txt><nil>
 
                                Ck1 is the simple check sum of the msg-text (NOT includeing <nil>)
                                Ck2 is the simple check sum of the trace data which is a nil terminated
-                               series of bytes. 
+                               series of bytes.
 
 
-                               Parms:  argv[1] == nmsgs; argv[2] == delay; argv[3] == listen port
+                               Parms:  argv[1] == number of msgs to send (10)
+                                               argv[2] == delay                (mu-seconds, 1000000 default)
+                                               argv[3] == max msg type (not inclusive; default 10)
+                                               argv[4] == listen port
 
                                Sender will send for at most 20 seconds, so if nmsgs and delay extend
                                beyond that period the total number of messages sent will be less
 
                                Sender will send for at most 20 seconds, so if nmsgs and delay extend
                                beyond that period the total number of messages sent will be less
@@ -73,27 +79,48 @@ static int sum( char* str ) {
        return sum % 255;
 }
 
        return sum % 255;
 }
 
+/*
+       See if my id string is in the buffer immediately after the first >.
+       Return 1 if so, 0 if not.
+*/
+static int vet_received( char* me, char* buf ) {
+       char*   ch;
+
+       if( (ch = strchr( buf, '>' )) == NULL ) {
+               return 0;
+       }
+
+       return strcmp( me, ch+1 ) == 0;
+}
+
 int main( int argc, char** argv ) {
 int main( int argc, char** argv ) {
-    void* mrc;                                                 // msg router context
-    struct epoll_event events[1];                      // list of events to give to epoll
-    struct epoll_event epe;                 // event definition for event to listen to
-    int     ep_fd = -1;                                                // epoll's file des (given to epoll_wait)
-    int rcv_fd;                                                // file des that NNG tickles -- give this to epoll to listen on
-       int nready;                                                             // number of events ready for receive
+       void* mrc;                                                      // msg router context
+       struct  epoll_event events[1];                  // list of events to give to epoll
+       struct  epoll_event epe;                                // event definition for event to listen to
+       int     ep_fd = -1;                                             // epoll's file des (given to epoll_wait)
+       int             rcv_fd;                                                 // file des that NNG tickles -- give this to epoll to listen on
+       int             nready;                                                 // number of events ready for receive
        rmr_mbuf_t*             sbuf;                                   // send buffer
        rmr_mbuf_t*             rbuf;                                   // received buffer
        rmr_mbuf_t*             sbuf;                                   // send buffer
        rmr_mbuf_t*             rbuf;                                   // received buffer
-       int     count = 0;
-       int     rt_count = 0;                                           // number of messages requiring a spin retry
-       int     rcvd_count = 0;
+       char*   ch;
+       int             count = 0;
+       int             rt_count = 0;                                   // number of messages requiring a spin retry
+       int             rcvd_count = 0;
+       int             rts_ok = 0;                                             // number received with our tag
+       int             fail_count = 0;                                 // # of failure sends after first successful send
        char*   listen_port = "43086";
        int             mtype = 0;
        int             stats_freq = 100;
        int             successful = 0;                                 // set to true after we have a successful send
        char    wbuf[1024];
        char*   listen_port = "43086";
        int             mtype = 0;
        int             stats_freq = 100;
        int             successful = 0;                                 // set to true after we have a successful send
        char    wbuf[1024];
+       char    me[128];                                                // who I am to vet rts was actually from me
        char    trace[1024];
        long    timeout = 0;
        int             delay = 100000;                                 // usec between send attempts
        int             nmsgs = 10;                                             // number of messages to send
        char    trace[1024];
        long    timeout = 0;
        int             delay = 100000;                                 // usec between send attempts
        int             nmsgs = 10;                                             // number of messages to send
+       int             max_mt = 10;                                    // reset point for message type
+       int             start_mt = 0;
+       int             pass = 1;
 
        if( argc > 1 ) {
                nmsgs = atoi( argv[1] );
 
        if( argc > 1 ) {
                nmsgs = atoi( argv[1] );
@@ -102,17 +129,27 @@ int main( int argc, char** argv ) {
                delay = atoi( argv[2] );
        }
        if( argc > 3 ) {
                delay = atoi( argv[2] );
        }
        if( argc > 3 ) {
-               listen_port = argv[3];
+               if( (ch = strchr( argv[3], ':' )) != NULL ) {
+                       max_mt = atoi( ch+1 );
+                       start_mt = atoi( argv[3] );
+               } else {
+                       max_mt = atoi( argv[3] );
+               }
+       }
+       if( argc > 4 ) {
+               listen_port = argv[4];
        }
 
        }
 
+       mtype = start_mt;
+
        fprintf( stderr, "<SNDR> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
 
        fprintf( stderr, "<SNDR> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
 
-    if( (mrc = rmr_init( listen_port, 1400, RMRFL_NONE )) == NULL ) {
+       if( (mrc = rmr_init( listen_port, 1400, RMRFL_NONE )) == NULL ) {
                fprintf( stderr, "<SNDR> unable to initialise RMr\n" );
                exit( 1 );
        }
 
                fprintf( stderr, "<SNDR> unable to initialise RMr\n" );
                exit( 1 );
        }
 
-    if( (rcv_fd = rmr_get_rcvfd( mrc )) >= 0 ) {                       // epoll only available from NNG -- skip receive later if not NNG
+       if( (rcv_fd = rmr_get_rcvfd( mrc )) >= 0 ) {                    // epoll only available from NNG -- skip receive later if not NNG
                if( rcv_fd < 0 ) {
                        fprintf( stderr, "<SNDR> unable to set up polling fd\n" );
                        exit( 1 );
                if( rcv_fd < 0 ) {
                        fprintf( stderr, "<SNDR> unable to set up polling fd\n" );
                        exit( 1 );
@@ -121,10 +158,10 @@ int main( int argc, char** argv ) {
                        fprintf( stderr, "<SNDR> [FAIL] unable to create epoll fd: %d\n", errno );
                        exit( 1 );
                }
                        fprintf( stderr, "<SNDR> [FAIL] unable to create epoll fd: %d\n", errno );
                        exit( 1 );
                }
-       epe.events = EPOLLIN;
-       epe.data.fd = rcv_fd;
+               epe.events = EPOLLIN;
+               epe.data.fd = rcv_fd;
 
 
-       if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
+               if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
                        fprintf( stderr, "<SNDR> [FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
                        exit( 1 );
                }
                        fprintf( stderr, "<SNDR> [FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
                        exit( 1 );
                }
@@ -132,8 +169,8 @@ int main( int argc, char** argv ) {
                rmr_set_rtimeout( mrc, 0 );                     // for nano we must set the receive timeout to 0; non-blocking receive
        }
 
                rmr_set_rtimeout( mrc, 0 );                     // for nano we must set the receive timeout to 0; non-blocking receive
        }
 
-       sbuf = rmr_alloc_msg( mrc, 512 );       // alloc first send buffer; subsequent buffers allcoated on send
-       //sbuf = rmr_tralloc_msg( mrc, 512, 11, "xxxxxxxxxx" ); // alloc first send buffer; subsequent buffers allcoated on send
+       sbuf = rmr_alloc_msg( mrc, 1024 );      // alloc first send buffer; subsequent buffers allcoated on send
+       //sbuf = rmr_tralloc_msg( mrc, 1024, 11, "xxxxxxxxxx" );        // alloc first send buffer; subsequent buffers allcoated on send
        rbuf = NULL;                                            // don't need to alloc receive buffer
 
        timeout = time( NULL ) + 20;            // give rmr 20s to find the route table (shouldn't need that much)
        rbuf = NULL;                                            // don't need to alloc receive buffer
 
        timeout = time( NULL ) + 20;            // give rmr 20s to find the route table (shouldn't need that much)
@@ -147,16 +184,25 @@ int main( int argc, char** argv ) {
                }
        }
        fprintf( stderr, "<SNDR> rmr is ready; starting to send\n" );
                }
        }
        fprintf( stderr, "<SNDR> rmr is ready; starting to send\n" );
-       
+
        timeout = time( NULL ) + 20;
 
        timeout = time( NULL ) + 20;
 
-    while( count < nmsgs ) {                                                           // we send 10 messages after the first message is successful
+       gethostname( wbuf, sizeof( wbuf ) );
+       snprintf( me, sizeof( me ), "%s-%d", wbuf, getpid( ) );
+
+       while( count < nmsgs ) {                                                                // we send n messages after the first message is successful
                snprintf( trace, 100, "%lld", (long long) time( NULL ) );
                rmr_set_trace( sbuf, trace, strlen( trace ) + 1 );
                snprintf( trace, 100, "%lld", (long long) time( NULL ) );
                rmr_set_trace( sbuf, trace, strlen( trace ) + 1 );
-               snprintf( wbuf, 200, "count=%d tr=%s %d stand up and cheer!", count, trace, rand() );
-               snprintf( sbuf->payload, 300, "%d %d|%s", sum( wbuf ), sum( trace ), wbuf );
+               snprintf( wbuf, 512, "count=%d tr=%s %d stand up and cheer!>%s", count, trace, rand(), me );
+               snprintf( sbuf->payload, 1024, "%d %d|%s", sum( wbuf ), sum( trace ), wbuf );
 
                sbuf->mtype = mtype;                                                    // fill in the message bits
 
                sbuf->mtype = mtype;                                                    // fill in the message bits
+               if( mtype < 3 ) {
+                       sbuf->sub_id = mtype * 10;
+               } else {
+                       sbuf->sub_id = -1;
+               }
+
                sbuf->len =  strlen( sbuf->payload ) + 1;               // our receiver likely wants a nice acsii-z string
                sbuf->state = 0;
                sbuf = rmr_send_msg( mrc, sbuf );                               // send it (send returns an empty payload on success, or the original payload on fail/retry)
                sbuf->len =  strlen( sbuf->payload ) + 1;               // our receiver likely wants a nice acsii-z string
                sbuf->state = 0;
                sbuf = rmr_send_msg( mrc, sbuf );                               // send it (send returns an empty payload on success, or the original payload on fail/retry)
@@ -164,10 +210,16 @@ int main( int argc, char** argv ) {
                switch( sbuf->state ) {
                        case RMR_ERR_RETRY:
                                rt_count++;
                switch( sbuf->state ) {
                        case RMR_ERR_RETRY:
                                rt_count++;
-                               while( sbuf->state == RMR_ERR_RETRY ) {                 // soft failure (device busy?) retry
+                               while( time( NULL ) < timeout && sbuf->state == RMR_ERR_RETRY ) {                       // soft failure (device busy?) retry
                                        sbuf = rmr_send_msg( mrc, sbuf );                       // retry send until it's good (simple test; real programmes should do better)
                                }
                                        sbuf = rmr_send_msg( mrc, sbuf );                       // retry send until it's good (simple test; real programmes should do better)
                                }
-                               successful = 1;
+                               if( sbuf->state == RMR_OK ) {
+                                       successful = 1;                                                         // indicates only that we sent one successful message, not the current state
+                               } else {
+                                       if( successful ) {
+                                               fail_count++;                                                   // count failures after first successful message
+                                       }
+                               }
                                break;
 
                        case RMR_OK:
                                break;
 
                        case RMR_OK:
@@ -175,15 +227,19 @@ int main( int argc, char** argv ) {
                                break;
 
                        default:
                                break;
 
                        default:
+                               if( successful ) {
+                                       fail_count++;                                                   // count failures after first successful message
+                               }
                                // some error (not connected likely), don't count this
                                // some error (not connected likely), don't count this
+                               //sleep( 1 );
                                break;
                }
 
                if( successful ) {                              // once we have a message that was sent, start to increase things
                        count++;
                        mtype++;
                                break;
                }
 
                if( successful ) {                              // once we have a message that was sent, start to increase things
                        count++;
                        mtype++;
-                       if( mtype > 10 ) {                      // if large number of sends don't require infinite rt entries :)
-                               mtype = 1;
+                       if( mtype >= max_mt ) {                 // if large number of sends don't require infinite rt entries :)
+                               mtype = start_mt;
                        }
                }
 
                        }
                }
 
@@ -193,6 +249,7 @@ int main( int argc, char** argv ) {
                                        errno = 0;
                                        rbuf = rmr_rcv_msg( mrc, rbuf );
                                        if( rbuf ) {
                                        errno = 0;
                                        rbuf = rmr_rcv_msg( mrc, rbuf );
                                        if( rbuf ) {
+                                               rts_ok += vet_received( me, rbuf->payload );
                                                rcvd_count++;
                                        }
                                }
                                                rcvd_count++;
                                        }
                                }
@@ -200,6 +257,7 @@ int main( int argc, char** argv ) {
                } else {                                // nano, we will only pick up one at a time.
                        if(     (rbuf = rmr_rcv_msg( mrc, rbuf ) ) != NULL ) {
                                if( rbuf->state == RMR_OK ) {
                } else {                                // nano, we will only pick up one at a time.
                        if(     (rbuf = rmr_rcv_msg( mrc, rbuf ) ) != NULL ) {
                                if( rbuf->state == RMR_OK ) {
+                                       rts_ok += vet_received( me, rbuf->payload );
                                        rcvd_count++;
                                }
                        }
                                        rcvd_count++;
                                }
                        }
@@ -213,9 +271,8 @@ int main( int argc, char** argv ) {
                if( delay > 0 ) {
                        usleep( delay );
                }
                if( delay > 0 ) {
                        usleep( delay );
                }
-    }
+       }
 
 
-       
        timeout = time( NULL ) + 2;                             // allow 2 seconds for the pipe to drain from the receiver
        while( time( NULL ) < timeout );
                if( rcv_fd >= 0 ) {
        timeout = time( NULL ) + 2;                             // allow 2 seconds for the pipe to drain from the receiver
        while( time( NULL ) < timeout );
                if( rcv_fd >= 0 ) {
@@ -225,6 +282,7 @@ int main( int argc, char** argv ) {
                                        rbuf = rmr_rcv_msg( mrc, rbuf );
                                        if( rbuf ) {
                                                rcvd_count++;
                                        rbuf = rmr_rcv_msg( mrc, rbuf );
                                        if( rbuf ) {
                                                rcvd_count++;
+                                               rts_ok += vet_received( me, rbuf->payload );
                                                timeout = time( NULL ) + 2;
                                        }
                                }
                                                timeout = time( NULL ) + 2;
                                        }
                                }
@@ -233,11 +291,17 @@ int main( int argc, char** argv ) {
                        if(     (rbuf = rmr_torcv_msg( mrc, rbuf, 100 ) ) != NULL ) {
                                if( rbuf->state == RMR_OK ) {
                                        rcvd_count++;
                        if(     (rbuf = rmr_torcv_msg( mrc, rbuf, 100 ) ) != NULL ) {
                                if( rbuf->state == RMR_OK ) {
                                        rcvd_count++;
+                                       rts_ok += vet_received( me, rbuf->payload );
                                }
                        }
                }
 
                                }
                        }
                }
 
-       fprintf( stderr, "<SNDR> [%s] sent %d messages   received %d acks retries=%d\n", count == nmsgs ? "PASS" : "FAIL",  count, rcvd_count, rt_count );
+       if( rcvd_count != rts_ok || count != nmsgs ) {
+               pass = 0;
+       }
+
+       fprintf( stderr, "<SNDR> [%s] sent=%d  rcvd=%d  rts-ok=%d failures=%d retries=%d\n", 
+               pass ? "PASS" : "FAIL",  count, rcvd_count, rts_ok, fail_count, rt_count );
        rmr_close( mrc );
 
        return !( count == nmsgs );
        rmr_close( mrc );
 
        return !( count == nmsgs );