enhance(API): Add multi-threaded call
[ric-plt/lib/rmr.git] / test / app_test / caller.c
diff --git a/test/app_test/caller.c b/test/app_test/caller.c
new file mode 100644 (file)
index 0000000..7b7336a
--- /dev/null
@@ -0,0 +1,326 @@
+// :vim ts=4 sw=4 noet:
+/*
+==================================================================================
+       Copyright (c) 2019 Nokia
+       Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+          http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+       Mnemonic:       caller.c
+       Abstract:       This is a simple sender which will send a series of messages using
+                               rmr_call().  N threads are started each sending the desired number
+                               of messages and expecting an 'ack' for each. Each ack is examined
+                               to verify that the thread id placed into the message matches (meaning
+                               that the ack was delivered by RMr to the correct thread's chute.
+
+                               In addition, the main thread listens for messages in order to verify
+                               that a main or receiving thread can receive messages concurrently
+                               while call acks are pending and being processed.
+
+                               Message format is:
+                                       ck1 ck2|<msg-txt> @ tid<nil>
+
+                               Ck1 is the simple check sum of the msg-text (NOT includeing <nil>)
+                               Ck2 is the simple check sum of the trace data which is a nil terminated
+                               series of bytes.
+                               tid is the thread id assigned by the main thread.
+
+                               Parms:  argv[1] == number of msgs to send (10)
+                                               argv[2] == delay                (mu-seconds, 1000000 default)
+                                               argv[3] == number of threads (3)
+                                               argv[4] == listen port
+
+                               Sender will send for at most 20 seconds, so if nmsgs and delay extend
+                               beyond that period the total number of messages sent will be less
+                               than n.
+
+       Date:           18 April 2019
+       Author:         E. Scott Daniels
+*/
+
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/epoll.h>
+#include <time.h>
+#include <pthread.h>
+
+
+#include <rmr/rmr.h>
+
+#define TRACE_SIZE 40          // bytes in header to provide for trace junk
+
+/*
+       Thread data
+*/
+typedef struct tdata {
+       int     id;                                     // the id we'll pass to RMr mt-call function NOT the thread id
+       int n2send;                             // number of messages to send
+       int delay;                              // ms delay between messages
+       void* mrc;                              // RMr context
+       int     state;
+} tdata_t;
+
+
+
+// --------------------------------------------------------------------------------
+
+
+static int sum( char* str ) {
+       int sum = 0;
+       int     i = 0;
+
+       while( *str ) {
+               sum += *(str++) + i++;
+       }
+
+       return sum % 255;
+}
+
+
+
+/*
+       Executed as a thread, this puppy will generate calls to ensure that we get the
+       response back to the right thread, that we can handle threads, etc.
+*/
+static void* mk_calls( void* data ) {
+       tdata_t*        control;
+       rmr_mbuf_t*             sbuf;                                   // send buffer
+       int             count = 0;
+       int             rt_count = 0;                                   // number of messages requiring a spin retry
+       int             ok_msg = 0;                                             // received messages that were sent by us
+       int             bad_msg = 0;                                    // received messages that were sent by a different thread
+       int             drops = 0;
+       int             fail_count = 0;                                 // # of failure sends after first successful send
+       int             successful = 0;                                 // set to true after we have a successful send
+       char    wbuf[1024];
+       char    xbuf[1024];                                             // build transaction string here
+       char    trace[1024];
+       int             xaction_id = 1;
+       char*   tok;
+       int             state = 0;
+
+       if( (control  = (tdata_t *) data) == NULL ) {
+               fprintf( stderr, "thread data was nil; bailing out\n" );
+       }
+       //fprintf( stderr, "<THRD> thread started with parallel call id=%d sending=%d delay=%d\n", control->id, control->n2send, control->delay );
+
+       sbuf = rmr_alloc_msg( control->mrc, 512 );      // alloc first send buffer; subsequent buffers allcoated on send
+
+       memset( trace, 0, sizeof( trace ) );    
+       while( count < control->n2send ) {                                                              // we send n messages after the first message is successful
+               snprintf( trace, 100, "%lld", (long long) time( NULL ) );
+               rmr_set_trace( sbuf, trace, TRACE_SIZE );                                       // fully populate so we dont cause a buffer realloc
+
+               snprintf( wbuf, 200, "count=%d tr=%s %d stand up and cheer! @ %d", count, trace, rand(), control->id );
+               snprintf( sbuf->payload, 300, "%d %d|%s", sum( wbuf ), sum( trace ), wbuf );
+               snprintf( xbuf, 200, "%31d", xaction_id );
+               rmr_bytes2xact( sbuf, xbuf, 32 );
+
+               sbuf->mtype = 5;                                                                // mtype is always 5 as the test receiver acks just mtype 5 messages
+               sbuf->len =  strlen( sbuf->payload ) + 1;               // our receiver likely wants a nice acsii-z string
+               sbuf->state = 0;
+               sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 1000 );    // send it (send returns an empty payload on success, or the original payload on fail/retry)
+
+               if( sbuf && sbuf->state == RMR_ERR_RETRY ) {                                    // number of times we had to spin to send
+                       rt_count++;
+               }
+               while( sbuf != NULL && sbuf->state == RMR_ERR_RETRY ) {                         // send blocked; keep trying
+                       sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 100 );             // call and wait up to 100ms for a response
+               }
+
+               if( sbuf != NULL ) {
+                       switch( sbuf->state ) {
+                               case RMR_OK:                                                    // we should have a buffer back from the sender here
+                                       successful = 1;
+                                       if( (tok = strchr( sbuf->payload, '@' )) != NULL ) {
+                                               if( atoi( tok+1 ) == control->id ) {
+                                                       //fprintf( stderr, "<THRD> tid=%-2d ok  ack\n", control->id );
+                                                       ok_msg++;
+                                               } else {
+                                                       bad_msg++;
+                                                       //fprintf( stderr, "<THRD> tid=%-2d bad ack %s\n", control->id, sbuf->payload );
+                                               }
+                                       }
+                                       //fprintf( stderr, "<THRD> tid=%-2d call returned valid msg: %s\n", control->id, sbuf->payload );
+                                       // future -- verify that we see our ID at the end of the message
+                                       count++;
+                                       break;
+
+                               default:
+                                       fprintf( stderr, "unexpected error: tid=%d rmr-state=%d ernro=%d\n", control->id, sbuf->state, errno );
+                                       sbuf = rmr_alloc_msg( control->mrc, 512 );                      // allocate a sendable buffer
+                                       if( successful ) {
+                                               fail_count++;                                                   // count failures after first successful message
+                                       } else {
+                                               // some error (not connected likely), don't count this
+                                               sleep( 1 );
+                                       }
+                                       break;
+                       }
+               } else {
+                       //fprintf( stderr, "<THRD> tid=%-2d call finished, no sbuf\n", control->id );
+                       sbuf = rmr_alloc_msg( control->mrc, 512 );                              // loop expects an subf
+                       drops++;
+                       count++;
+               }
+
+               if( control->delay > 0 ) {
+                       usleep( control->delay );
+               }
+       }
+
+       state = 1;
+       if( ok_msg < (control->n2send-1) || bad_msg > 0 ) {             // allow one drop to pass
+               state = 0;
+       }
+       if( count < control->n2send ) {
+               state = 0;
+       }
+
+       control->state = -state;                                // signal inactive to main thread; -1 == pass, 0 == fail
+       fprintf( stderr, "<THRD> [%s]  tid=%-2d sent=%d  ok-acks=%d bad-acks=%d  drops=%d failures=%d retries=%d\n", 
+               state ? "PASS" : "FAIL",  control->id, count, ok_msg, bad_msg, drops, fail_count, rt_count );
+
+
+       return NULL;
+}
+
+int main( int argc, char** argv ) {
+       void* mrc;                                                      // msg router context
+       rmr_mbuf_t*     rbuf = NULL;                            // received on 'normal' flow
+       struct  epoll_event events[1];                  // list of events to give to epoll
+       struct  epoll_event epe;                                // event definition for event to listen to
+       int     ep_fd = -1;                                             // epoll's file des (given to epoll_wait)
+       int             rcv_fd;                                                 // file des that NNG tickles -- give this to epoll to listen on
+       int             nready;                                                 // number of events ready for receive
+       char*   listen_port = "43086";
+       long    timeout = 0;
+       int             delay = 100000;                                 // usec between send attempts
+       int             nmsgs = 10;                                             // number of messages to send
+       int             nthreads = 3;
+       tdata_t*        cvs;                                            // vector of control blocks
+       int                     i;
+       pthread_t*      pt_info;                                        // thread stuff
+       int     failures = 0;
+       int             pings = 0;                                              // number of messages received on normal channel
+
+       if( argc > 1 ) {
+               nmsgs = atoi( argv[1] );
+       }
+       if( argc > 2 ) {
+               delay = atoi( argv[2] );
+       }
+       if( argc > 3 ) {
+               nthreads = atoi( argv[3] );
+       }
+       if( argc > 4 ) {
+               listen_port = argv[4];
+       }
+
+       fprintf( stderr, "<CALL> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
+
+       if( (mrc = rmr_init( listen_port, 1400, RMRFL_MTCALL )) == NULL ) {             // initialise with multi-threaded call enabled
+               fprintf( stderr, "<CALL> unable to initialise RMr\n" );
+               exit( 1 );
+       }
+
+       rmr_init_trace( mrc, TRACE_SIZE );
+
+       if( (rcv_fd = rmr_get_rcvfd( mrc )) >= 0 ) {                                                    // epoll only available from NNG -- skip receive later if not NNG
+               if( rcv_fd < 0 ) {
+                       fprintf( stderr, "<CALL> unable to set up polling fd\n" );
+                       exit( 1 );
+               }
+               if( (ep_fd = epoll_create1( 0 )) < 0 ) {
+                       fprintf( stderr, "<CALL> [FAIL] unable to create epoll fd: %d\n", errno );
+                       exit( 1 );
+               }
+               epe.events = EPOLLIN;
+               epe.data.fd = rcv_fd;
+
+               if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
+                       fprintf( stderr, "<CALL> [FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
+                       exit( 1 );
+               }
+       } else {
+               rmr_set_rtimeout( mrc, 0 );                     // for nano we must set the receive timeout to 0; non-blocking receive
+       }
+
+
+       cvs = malloc( sizeof( tdata_t ) * nthreads );
+       pt_info = malloc( sizeof( pthread_t ) * nthreads );
+       if( cvs == NULL ) {
+               fprintf( stderr, "<CALL> unable to allocate control vector\n" );
+               exit( 1 );      
+       }
+
+
+       timeout = time( NULL ) + 20;            // give rmr 20s to find the route table (shouldn't need that much)
+       while( ! rmr_ready( mrc ) ) {           // must have a route table before we can send; wait til RMr says it has one
+               fprintf( stderr, "<CALL> waiting for rmr to show ready\n" );
+               sleep( 1 );
+
+               if( time( NULL ) > timeout ) {
+                       fprintf( stderr, "<CALL> giving up\n" );
+                       exit( 1 );
+               }
+       }
+       fprintf( stderr, "<CALL> rmr is ready; starting threads\n" );
+
+       for( i = 0; i < nthreads; i++ ) {
+               cvs[i].mrc = mrc;
+               cvs[i].id = i + 2;                              // we pass this as the call-id to rmr, so must be >1
+               cvs[i].delay = delay;
+               cvs[i].n2send = nmsgs;
+               cvs[i].state = 1;
+
+               pthread_create( &pt_info[i], NULL, mk_calls, &cvs[i] );         // kick a thread
+       }
+
+       timeout = time( NULL ) + 20;
+       i = 0;
+       while( nthreads > 0 ) {
+               if( cvs[i].state < 1 ) {                        // states 0 or below indicate done. 0 == failure, -n == success
+                       nthreads--;
+                       if( cvs[i].state == 0 ) {
+                               failures++;
+                       }
+                       i++;
+               } else {
+               //      sleep( 1 );
+                       rbuf = rmr_torcv_msg( mrc, rbuf, 1000 );
+                       if( rbuf != NULL && rbuf->state != RMR_ERR_RETRY ) {
+                               pings++;
+                               rmr_free_msg( rbuf );
+                               rbuf = NULL;
+                       }
+               }
+               if( time( NULL ) > timeout ) {
+                       failures += nthreads;
+                       fprintf( stderr, "<CALL> timeout waiting for threads to finish; %d were not finished\n", nthreads );
+                       break;
+               }
+       }
+
+       fprintf( stderr, "<CALL> [%s] failing threads=%d  pings reeived=%d\n", failures == 0 ? "PASS" : "FAIL",  failures, pings );
+       rmr_close( mrc );
+
+       return failures > 0;
+}
+