enhance(API): Add multi-threaded call
[ric-plt/lib/rmr.git] / test / app_test / caller.c
1 // :vim ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       caller.c
23         Abstract:       This is a simple sender which will send a series of messages using
24                                 rmr_call().  N threads are started each sending the desired number
25                                 of messages and expecting an 'ack' for each. Each ack is examined
26                                 to verify that the thread id placed into the message matches (meaning
27                                 that the ack was delivered by RMr to the correct thread's chute.
28
29                                 In addition, the main thread listens for messages in order to verify
30                                 that a main or receiving thread can receive messages concurrently
31                                 while call acks are pending and being processed.
32
33                                 Message format is:
34                                         ck1 ck2|<msg-txt> @ tid<nil>
35
36                                 Ck1 is the simple check sum of the msg-text (NOT includeing <nil>)
37                                 Ck2 is the simple check sum of the trace data which is a nil terminated
38                                 series of bytes.
39                                 tid is the thread id assigned by the main thread.
40
41                                 Parms:  argv[1] == number of msgs to send (10)
42                                                 argv[2] == delay                (mu-seconds, 1000000 default)
43                                                 argv[3] == number of threads (3)
44                                                 argv[4] == listen port
45
46                                 Sender will send for at most 20 seconds, so if nmsgs and delay extend
47                                 beyond that period the total number of messages sent will be less
48                                 than n.
49
50         Date:           18 April 2019
51         Author:         E. Scott Daniels
52 */
53
54 #include <unistd.h>
55 #include <errno.h>
56 #include <string.h>
57 #include <stdio.h>
58 #include <stdlib.h>
59 #include <sys/epoll.h>
60 #include <time.h>
61 #include <pthread.h>
62
63
64 #include <rmr/rmr.h>
65
66 #define TRACE_SIZE 40           // bytes in header to provide for trace junk
67
68 /*
69         Thread data
70 */
71 typedef struct tdata {
72         int     id;                                     // the id we'll pass to RMr mt-call function NOT the thread id
73         int n2send;                             // number of messages to send
74         int delay;                              // ms delay between messages
75         void* mrc;                              // RMr context
76         int     state;
77 } tdata_t;
78
79
80
81 // --------------------------------------------------------------------------------
82
83
84 static int sum( char* str ) {
85         int sum = 0;
86         int     i = 0;
87
88         while( *str ) {
89                 sum += *(str++) + i++;
90         }
91
92         return sum % 255;
93 }
94
95
96
97 /*
98         Executed as a thread, this puppy will generate calls to ensure that we get the
99         response back to the right thread, that we can handle threads, etc.
100 */
101 static void* mk_calls( void* data ) {
102         tdata_t*        control;
103         rmr_mbuf_t*             sbuf;                                   // send buffer
104         int             count = 0;
105         int             rt_count = 0;                                   // number of messages requiring a spin retry
106         int             ok_msg = 0;                                             // received messages that were sent by us
107         int             bad_msg = 0;                                    // received messages that were sent by a different thread
108         int             drops = 0;
109         int             fail_count = 0;                                 // # of failure sends after first successful send
110         int             successful = 0;                                 // set to true after we have a successful send
111         char    wbuf[1024];
112         char    xbuf[1024];                                             // build transaction string here
113         char    trace[1024];
114         int             xaction_id = 1;
115         char*   tok;
116         int             state = 0;
117
118         if( (control  = (tdata_t *) data) == NULL ) {
119                 fprintf( stderr, "thread data was nil; bailing out\n" );
120         }
121         //fprintf( stderr, "<THRD> thread started with parallel call id=%d sending=%d delay=%d\n", control->id, control->n2send, control->delay );
122
123         sbuf = rmr_alloc_msg( control->mrc, 512 );      // alloc first send buffer; subsequent buffers allcoated on send
124
125         memset( trace, 0, sizeof( trace ) );    
126         while( count < control->n2send ) {                                                              // we send n messages after the first message is successful
127                 snprintf( trace, 100, "%lld", (long long) time( NULL ) );
128                 rmr_set_trace( sbuf, trace, TRACE_SIZE );                                       // fully populate so we dont cause a buffer realloc
129
130                 snprintf( wbuf, 200, "count=%d tr=%s %d stand up and cheer! @ %d", count, trace, rand(), control->id );
131                 snprintf( sbuf->payload, 300, "%d %d|%s", sum( wbuf ), sum( trace ), wbuf );
132                 snprintf( xbuf, 200, "%31d", xaction_id );
133                 rmr_bytes2xact( sbuf, xbuf, 32 );
134
135                 sbuf->mtype = 5;                                                                // mtype is always 5 as the test receiver acks just mtype 5 messages
136                 sbuf->len =  strlen( sbuf->payload ) + 1;               // our receiver likely wants a nice acsii-z string
137                 sbuf->state = 0;
138                 sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 1000 );    // send it (send returns an empty payload on success, or the original payload on fail/retry)
139
140                 if( sbuf && sbuf->state == RMR_ERR_RETRY ) {                                    // number of times we had to spin to send
141                         rt_count++;
142                 }
143                 while( sbuf != NULL && sbuf->state == RMR_ERR_RETRY ) {                         // send blocked; keep trying
144                         sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 100 );             // call and wait up to 100ms for a response
145                 }
146
147                 if( sbuf != NULL ) {
148                         switch( sbuf->state ) {
149                                 case RMR_OK:                                                    // we should have a buffer back from the sender here
150                                         successful = 1;
151                                         if( (tok = strchr( sbuf->payload, '@' )) != NULL ) {
152                                                 if( atoi( tok+1 ) == control->id ) {
153                                                         //fprintf( stderr, "<THRD> tid=%-2d ok  ack\n", control->id );
154                                                         ok_msg++;
155                                                 } else {
156                                                         bad_msg++;
157                                                         //fprintf( stderr, "<THRD> tid=%-2d bad ack %s\n", control->id, sbuf->payload );
158                                                 }
159                                         }
160                                         //fprintf( stderr, "<THRD> tid=%-2d call returned valid msg: %s\n", control->id, sbuf->payload );
161                                         // future -- verify that we see our ID at the end of the message
162                                         count++;
163                                         break;
164
165                                 default:
166                                         fprintf( stderr, "unexpected error: tid=%d rmr-state=%d ernro=%d\n", control->id, sbuf->state, errno );
167                                         sbuf = rmr_alloc_msg( control->mrc, 512 );                      // allocate a sendable buffer
168                                         if( successful ) {
169                                                 fail_count++;                                                   // count failures after first successful message
170                                         } else {
171                                                 // some error (not connected likely), don't count this
172                                                 sleep( 1 );
173                                         }
174                                         break;
175                         }
176                 } else {
177                         //fprintf( stderr, "<THRD> tid=%-2d call finished, no sbuf\n", control->id );
178                         sbuf = rmr_alloc_msg( control->mrc, 512 );                              // loop expects an subf
179                         drops++;
180                         count++;
181                 }
182
183                 if( control->delay > 0 ) {
184                         usleep( control->delay );
185                 }
186         }
187
188         state = 1;
189         if( ok_msg < (control->n2send-1) || bad_msg > 0 ) {             // allow one drop to pass
190                 state = 0;
191         }
192         if( count < control->n2send ) {
193                 state = 0;
194         }
195
196         control->state = -state;                                // signal inactive to main thread; -1 == pass, 0 == fail
197         fprintf( stderr, "<THRD> [%s]  tid=%-2d sent=%d  ok-acks=%d bad-acks=%d  drops=%d failures=%d retries=%d\n", 
198                 state ? "PASS" : "FAIL",  control->id, count, ok_msg, bad_msg, drops, fail_count, rt_count );
199
200
201         return NULL;
202 }
203
204 int main( int argc, char** argv ) {
205         void* mrc;                                                      // msg router context
206         rmr_mbuf_t*     rbuf = NULL;                            // received on 'normal' flow
207         struct  epoll_event events[1];                  // list of events to give to epoll
208         struct  epoll_event epe;                                // event definition for event to listen to
209         int     ep_fd = -1;                                             // epoll's file des (given to epoll_wait)
210         int             rcv_fd;                                                 // file des that NNG tickles -- give this to epoll to listen on
211         int             nready;                                                 // number of events ready for receive
212         char*   listen_port = "43086";
213         long    timeout = 0;
214         int             delay = 100000;                                 // usec between send attempts
215         int             nmsgs = 10;                                             // number of messages to send
216         int             nthreads = 3;
217         tdata_t*        cvs;                                            // vector of control blocks
218         int                     i;
219         pthread_t*      pt_info;                                        // thread stuff
220         int     failures = 0;
221         int             pings = 0;                                              // number of messages received on normal channel
222
223         if( argc > 1 ) {
224                 nmsgs = atoi( argv[1] );
225         }
226         if( argc > 2 ) {
227                 delay = atoi( argv[2] );
228         }
229         if( argc > 3 ) {
230                 nthreads = atoi( argv[3] );
231         }
232         if( argc > 4 ) {
233                 listen_port = argv[4];
234         }
235
236         fprintf( stderr, "<CALL> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
237
238         if( (mrc = rmr_init( listen_port, 1400, RMRFL_MTCALL )) == NULL ) {             // initialise with multi-threaded call enabled
239                 fprintf( stderr, "<CALL> unable to initialise RMr\n" );
240                 exit( 1 );
241         }
242
243         rmr_init_trace( mrc, TRACE_SIZE );
244
245         if( (rcv_fd = rmr_get_rcvfd( mrc )) >= 0 ) {                                                    // epoll only available from NNG -- skip receive later if not NNG
246                 if( rcv_fd < 0 ) {
247                         fprintf( stderr, "<CALL> unable to set up polling fd\n" );
248                         exit( 1 );
249                 }
250                 if( (ep_fd = epoll_create1( 0 )) < 0 ) {
251                         fprintf( stderr, "<CALL> [FAIL] unable to create epoll fd: %d\n", errno );
252                         exit( 1 );
253                 }
254                 epe.events = EPOLLIN;
255                 epe.data.fd = rcv_fd;
256
257                 if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
258                         fprintf( stderr, "<CALL> [FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
259                         exit( 1 );
260                 }
261         } else {
262                 rmr_set_rtimeout( mrc, 0 );                     // for nano we must set the receive timeout to 0; non-blocking receive
263         }
264
265
266         cvs = malloc( sizeof( tdata_t ) * nthreads );
267         pt_info = malloc( sizeof( pthread_t ) * nthreads );
268         if( cvs == NULL ) {
269                 fprintf( stderr, "<CALL> unable to allocate control vector\n" );
270                 exit( 1 );      
271         }
272
273
274         timeout = time( NULL ) + 20;            // give rmr 20s to find the route table (shouldn't need that much)
275         while( ! rmr_ready( mrc ) ) {           // must have a route table before we can send; wait til RMr says it has one
276                 fprintf( stderr, "<CALL> waiting for rmr to show ready\n" );
277                 sleep( 1 );
278
279                 if( time( NULL ) > timeout ) {
280                         fprintf( stderr, "<CALL> giving up\n" );
281                         exit( 1 );
282                 }
283         }
284         fprintf( stderr, "<CALL> rmr is ready; starting threads\n" );
285
286         for( i = 0; i < nthreads; i++ ) {
287                 cvs[i].mrc = mrc;
288                 cvs[i].id = i + 2;                              // we pass this as the call-id to rmr, so must be >1
289                 cvs[i].delay = delay;
290                 cvs[i].n2send = nmsgs;
291                 cvs[i].state = 1;
292
293                 pthread_create( &pt_info[i], NULL, mk_calls, &cvs[i] );         // kick a thread
294         }
295
296         timeout = time( NULL ) + 20;
297         i = 0;
298         while( nthreads > 0 ) {
299                 if( cvs[i].state < 1 ) {                        // states 0 or below indicate done. 0 == failure, -n == success
300                         nthreads--;
301                         if( cvs[i].state == 0 ) {
302                                 failures++;
303                         }
304                         i++;
305                 } else {
306                 //      sleep( 1 );
307                         rbuf = rmr_torcv_msg( mrc, rbuf, 1000 );
308                         if( rbuf != NULL && rbuf->state != RMR_ERR_RETRY ) {
309                                 pings++;
310                                 rmr_free_msg( rbuf );
311                                 rbuf = NULL;
312                         }
313                 }
314                 if( time( NULL ) > timeout ) {
315                         failures += nthreads;
316                         fprintf( stderr, "<CALL> timeout waiting for threads to finish; %d were not finished\n", nthreads );
317                         break;
318                 }
319         }
320
321         fprintf( stderr, "<CALL> [%s] failing threads=%d  pings reeived=%d\n", failures == 0 ? "PASS" : "FAIL",  failures, pings );
322         rmr_close( mrc );
323
324         return failures > 0;
325 }
326