CI: Add silent cmake SonarCloud scan
[ric-plt/lib/rmr.git] / test / app_test / lcaller.c
1 // :vim ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       lcaller.c
23         Abstract:       This is a simple sender which will send a series of messages using
24                                 rmr_call().   Similar to caller.c the major difference is that
25                                 a timestamp is placed into the message and the receiver is expected
26                                 to add a timestamp before executing an rts call.  We can then
27                                 compute the total round trip latency as well as the forward send
28                                 latency.
29
30                                 Overall, N threads are started each sending the desired number
31                                 of messages and expecting an 'ack' for each. Each ack is examined
32                                 to verify that the thread id placed into the message matches (meaning
33                                 that the ack was delivered by RMr to the correct thread's chute.
34
35                                 The message format is 'binary' defined by the lc_msg struct.
36
37                                 Parms:  argv[1] == number of msgs to send (10)
38                                                 argv[2] == delay                (mu-seconds, 1000000 default)
39                                                 argv[3] == number of threads (3)
40                                                 argv[4] == listen port
41
42                                 Sender will send for at most 20 seconds, so if nmsgs and delay extend
43                                 beyond that period the total number of messages sent will be less
44                                 than n.
45
46         Date:           18 April 2019
47         Author:         E. Scott Daniels
48 */
49
50 #include <unistd.h>
51 #include <errno.h>
52 #include <string.h>
53 #include <stdio.h>
54 #include <stdlib.h>
55 #include <sys/epoll.h>
56 #include <time.h>
57 #include <pthread.h>
58
59
60 #include <rmr/rmr.h>
61
62 #define TRACE_SIZE 40           // bytes in header to provide for trace junk
63 #define SUCCESS         (-1)
64
65 /*
66         Thread data
67 */
68 typedef struct tdata {
69         int             id;                                     // the id we'll pass to RMr mt-call function NOT the thread id
70         int             n2send;                         // number of messages to send
71         int             delay;                          // ms delay between messages
72         void*   mrc;                            // RMr context
73         int             state;
74         int*    in_bins;                        // latency count bins
75         int*    out_bins;
76         int             nbins;                          // number of bins allocated
77         long long in_max;
78         long long out_max;
79         int             out_oor;                        // out of range count
80         int             in_oor;
81         int             in_bcount;                              // total messages tracked in bins
82         int             out_bcount;                             // total messages tracked in bins
83 } tdata_t;
84
85
86 /*
87         The message type placed into the payload.
88 */
89 typedef struct lc_msg {
90         struct timespec out_ts;                 // time just before call executed
91         struct timespec turn_ts;                // time at the receiver,  on receipt
92         struct timespec in_ts;                  // time received back by the caller
93         int             out_retries;                    // number of retries required to send
94         int             turn_retries;                   // number of retries required to send
95 } lc_msg_t;
96
97 // --------------------------------------------------------------------------------
98
99
100 static int sum( char* str ) {
101         int sum = 0;
102         int     i = 0;
103
104         while( *str ) {
105                 sum += *(str++) + i++;
106         }
107
108         return sum % 255;
109 }
110
111 static void print_stats( tdata_t* td, int out, int hist ) {
112         int sum;                                        // sum of latencies
113         int csum = 0;                           // cutoff sum
114         int i95 = 0;                            // bin for the 95th count
115         int i99 = 0;                            // bin for the 99th count
116         int mean = -1;
117         int cutoff_95;                          // 95% of total messages
118         int cutoff_99;                          // 99% of total messages
119         int     oor;
120         int max;
121         int j;
122
123         if( out ) {
124                 cutoff_95 = .95 * (td->out_oor + td->out_bcount);
125                 cutoff_99 = .95 * (td->out_oor + td->out_bcount);
126                 oor = td->out_oor;
127                 max = td->out_max;
128         } else {
129                 cutoff_95 = .95 * (td->in_oor + td->in_bcount);
130                 cutoff_99 = .95 * (td->in_oor + td->in_bcount);
131                 oor = td->in_oor;
132                 max = td->in_max;
133         }
134
135         sum = 0;
136         for( j = 0; j < td->nbins; j++ ) {
137                 if( csum < cutoff_95 ) {
138                         i95++;
139                 }
140                 if( csum < cutoff_99 ) {
141                         i99++;
142                 }
143
144                 if( out ) {
145                         csum += td->out_bins[j];
146                         sum += td->out_bins[j] * j;
147                 } else {
148                         csum += td->in_bins[j];
149                         sum += td->in_bins[j] * j;
150                 }
151         }
152
153         if( out ) {
154                 if( td->out_bcount ) {
155                         mean = sum/(td->out_bcount);
156                 }
157         } else {
158                 if( td->in_bcount ) {
159                         mean = sum/(td->in_bcount);
160                 }
161         }
162
163         if( hist ) {
164                 for( j = 0; j < td->nbins; j++ ) {
165                         fprintf( stderr, "<LCALLER> hist: bin[%03d] %d\n", j, out ? td->out_bins[j] : td->in_bins[j] );
166                 }
167         }
168
169         fprintf( stderr, "<LCALLER> %s: oor=%d max=%.2fms  mean=%.2fms  95th=%.2fms 99th=%.2f\n", 
170                 out ? "out" : " in", oor, (double)max/1000000.0, (double)mean/100.0, (double) i95/100.0, i99/100.0 );
171 }
172
173 /*
174         Given a message, compute the in/out and round trip latencies.
175 */
176 static void compute_latency( tdata_t* td, lc_msg_t* lcm ) {
177         long long out;
178         long long turn;
179         long long in;
180         double rtl;             // round trip latency
181         double outl;    // caller to receiver latency (out)
182         double inl;             // receiver to caller latency (in)
183         int bin;
184
185         if( lcm == NULL || td == NULL ) {
186                 return;
187         }
188
189         out = (lcm->out_ts.tv_sec * 1000000000) + lcm->out_ts.tv_nsec;
190         in = (lcm->in_ts.tv_sec * 1000000000) + lcm->in_ts.tv_nsec;
191         turn = (lcm->turn_ts.tv_sec * 1000000000) + lcm->turn_ts.tv_nsec;
192
193         if( in - turn > td->in_max ) {
194                 td->in_max = in - turn;
195         }
196         if( turn - out > td->out_max ) {
197                 td->out_max = turn-out;
198         }
199         
200         bin = (turn-out) / 10000;                       // 100ths of ms
201
202 #ifdef PRINT
203         outl = ((double) turn - out) / 1000000.0;               // convert to ms
204         inl = ((double) in - turn) / 1000000.0;
205         rtl = ((double) in - out) / 1000000.0;
206
207         fprintf( stderr, "outl = %5.3fms   inl = %5.3fms  rtl = %5.3fms bin=%d\n", outl, inl, rtl, bin );
208 #else
209
210         bin = (turn - out) / 10000;                     // 100ths of ms
211         if( bin < td->nbins ) {
212                 td->out_bins[bin]++;
213                 td->out_bcount++;
214         } else {
215                 td->out_oor++;
216         }
217
218         bin = (in - turn) / 10000;                      // 100ths of ms
219         if( bin < td->nbins ) {
220                 td->in_bins[bin]++;
221                 td->in_bcount++;
222         } else {
223                 td->in_oor++;
224         }
225
226 #endif
227 }
228
229 /*
230         Executed as a thread, this puppy will generate calls to ensure that we get the
231         response back to the right thread, that we can handle threads, etc.
232 */
233 static void* mk_calls( void* data ) {
234         lc_msg_t*       lcm;                                            // pointer at the payload as a struct
235         tdata_t*        control;
236         rmr_mbuf_t*             sbuf;                                   // send buffer
237         int             count = 0;
238         int             ack_count = 0;
239         int             rt_count = 0;                                   // number of messages requiring a spin retry
240         int             ok_msg = 0;                                             // received messages that were sent by us
241         int             bad_msg = 0;                                    // received messages that were sent by a different thread
242         int             drops = 0;
243         int             fail_count = 0;                                 // # of failure sends after first successful send
244         int             successful = 0;                                 // set to true after we have a successful send
245         char    xbuf[1024];                                             // build transaction string here
246         int             xaction_id = 1;
247         char*   tok;
248         int             state = 0;
249
250         if( (control  = (tdata_t *) data) == NULL ) {
251                 fprintf( stderr, "thread data was nil; bailing out\n" );
252         }
253         //fprintf( stderr, "<THRD> thread started with parallel call id=%d sending=%d delay=%d\n", control->id, control->n2send, control->delay );
254
255         sbuf = rmr_alloc_msg( control->mrc, 512 );      // alloc first send buffer; subsequent buffers allcoated on send
256
257         usleep( rand() % 777 );                                 // stagger starts a bit so that they all don't pile up on the first connections
258
259         while( count < control->n2send ) {                                                              // we send n messages after the first message is successful
260                 lcm = (lc_msg_t *) sbuf->payload;
261
262                 snprintf( xbuf, 200, "%31d", xaction_id );
263                 xaction_id += control->id;
264                 rmr_bytes2xact( sbuf, xbuf, 32 );
265
266                 sbuf->mtype = 5;                                                                                                // all go with the same type
267                 sbuf->len =  sizeof( *lcm );
268                 sbuf->state = RMR_OK;
269                 lcm->out_retries = 0;
270                 lcm->turn_retries = 0;
271                 clock_gettime( CLOCK_REALTIME, &lcm->out_ts );                                  // mark time out
272                 sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 1000 );    // send it (send returns an empty payload on success, or the original payload on fail/retry)
273
274                 if( sbuf && sbuf->state == RMR_ERR_RETRY ) {                                    // number of times we had to spin to send
275                         rt_count++;
276                 }
277                 while( sbuf != NULL && sbuf->state == RMR_ERR_RETRY ) {                         // send blocked; keep trying
278                         lcm->out_retries++;
279                         sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 100 );             // call and wait up to 100ms for a response
280                 }
281
282                 count++;
283                 if( sbuf != NULL ) {
284                         switch( sbuf->state ) {
285                                 case RMR_OK:                                                                                                            // we should have a buffer back from the sender here
286                                         lcm = (lc_msg_t *) sbuf->payload;
287                                         clock_gettime( CLOCK_REALTIME, &lcm->in_ts );                                   // mark time back
288                                         successful = 1;
289                                         compute_latency( control, lcm );
290                                         
291                                         ack_count++;
292                                         //fprintf( stderr, "%d  have received %d\n", control->id, count );
293                                         break;
294
295                                 default:
296                                         fprintf( stderr, "<LCALLER> unexpected error: tid=%d rmr-state=%d ernro=%d\n", control->id, sbuf->state, errno );
297                                         sbuf = rmr_alloc_msg( control->mrc, 512 );                      // allocate a sendable buffer
298                                         if( successful ) {
299                                                 fail_count++;                                                   // count failures after first successful message
300                                         } else {
301                                                 // some error (not connected likely), don't count this
302                                                 sleep( 1 );
303                                         }
304                                         break;
305                         }
306                 } else {
307                         //fprintf( stderr, "<THRD> tid=%-2d call finished, no sbuf\n", control->id );
308                         sbuf = rmr_alloc_msg( control->mrc, 512 );                              // loop expects an subf
309                         drops++;
310                 }
311
312                 if( control->delay > 0 ) {
313                         usleep( control->delay );
314                 }
315         }
316
317         control->state = SUCCESS;
318         fprintf( stderr, "<THRD> %d finished sent %d, received %d  messages\n", control->id, count, ack_count );
319         return NULL;
320 }
321
322 int main( int argc, char** argv ) {
323         void* mrc;                                                      // msg router context
324         rmr_mbuf_t*     rbuf = NULL;                            // received on 'normal' flow
325         struct  epoll_event events[1];                  // list of events to give to epoll
326         struct  epoll_event epe;                                // event definition for event to listen to
327         int     ep_fd = -1;                                             // epoll's file des (given to epoll_wait)
328         int             rcv_fd;                                                 // file des that NNG tickles -- give this to epoll to listen on
329         int             nready;                                                 // number of events ready for receive
330         char*   listen_port = "43086";
331         long    timeout = 0;
332         int             delay = 100000;                                 // usec between send attempts
333         int             nmsgs = 10;                                             // number of messages to send
334         int             nthreads = 3;
335         int             cutoff;
336         int             sum;
337         tdata_t*        cvs;                                            // vector of control blocks
338         int                     i;
339         int                     j;
340         pthread_t*      pt_info;                                        // thread stuff
341         int     failures = 0;
342         int             pings = 0;                                              // number of messages received on normal channel
343
344         if( argc > 1 ) {
345                 nmsgs = atoi( argv[1] );
346         }
347         if( argc > 2 ) {
348                 delay = atoi( argv[2] );
349         }
350         if( argc > 3 ) {
351                 nthreads = atoi( argv[3] );
352         }
353         if( argc > 4 ) {
354                 listen_port = argv[4];
355         }
356
357         fprintf( stderr, "<CALL> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
358
359         if( (mrc = rmr_init( listen_port, 1400, RMRFL_MTCALL )) == NULL ) {             // initialise with multi-threaded call enabled
360                 fprintf( stderr, "<CALL> unable to initialise RMr\n" );
361                 exit( 1 );
362         }
363
364         //rmr_init_trace( mrc, TRACE_SIZE );
365
366         if( (rcv_fd = rmr_get_rcvfd( mrc )) >= 0 ) {                                                    // epoll only available from NNG -- skip receive later if not NNG
367                 if( rcv_fd < 0 ) {
368                         fprintf( stderr, "<CALL> unable to set up polling fd\n" );
369                         exit( 1 );
370                 }
371                 if( (ep_fd = epoll_create1( 0 )) < 0 ) {
372                         fprintf( stderr, "<CALL> [FAIL] unable to create epoll fd: %d\n", errno );
373                         exit( 1 );
374                 }
375                 epe.events = EPOLLIN;
376                 epe.data.fd = rcv_fd;
377
378                 if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
379                         fprintf( stderr, "<CALL> [FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
380                         exit( 1 );
381                 }
382         } else {
383                 rmr_set_rtimeout( mrc, 0 );                     // for nano we must set the receive timeout to 0; non-blocking receive
384         }
385
386
387         cvs = malloc( sizeof( tdata_t ) * nthreads );
388         pt_info = malloc( sizeof( pthread_t ) * nthreads );
389         if( cvs == NULL ) {
390                 fprintf( stderr, "<CALL> unable to allocate control vector\n" );
391                 exit( 1 );      
392         }
393
394
395         timeout = time( NULL ) + 20;            // give rmr 20s to find the route table (shouldn't need that much)
396         while( ! rmr_ready( mrc ) ) {           // must have a route table before we can send; wait til RMr says it has one
397                 fprintf( stderr, "<CALL> waiting for rmr to show ready\n" );
398                 sleep( 1 );
399
400                 if( time( NULL ) > timeout ) {
401                         fprintf( stderr, "<CALL> giving up\n" );
402                         exit( 1 );
403                 }
404         }
405         fprintf( stderr, "<CALL> rmr is ready; starting threads\n" );
406
407         for( i = 0; i < nthreads; i++ ) {
408                 cvs[i].mrc = mrc;
409                 cvs[i].id = i + 2;                              // we pass this as the call-id to rmr, so must be >1
410                 cvs[i].delay = delay;
411                 cvs[i].n2send = nmsgs;
412                 cvs[i].state = 1;
413
414                 cvs[i].nbins = 100;
415                 cvs[i].out_bins = (int *) malloc( sizeof( int ) * cvs[i].nbins );
416                 cvs[i].in_bins = (int *) malloc( sizeof( int ) * cvs[i].nbins );
417                 memset( cvs[i].out_bins, 0, sizeof( int ) * cvs[i].nbins );
418                 memset( cvs[i].in_bins, 0, sizeof( int ) * cvs[i].nbins );
419
420                 pthread_create( &pt_info[i], NULL, mk_calls, &cvs[i] );         // kick a thread
421         }
422
423         timeout = time( NULL ) + 20;
424         i = 0;
425         while( nthreads > 0 ) {
426                 if( cvs[i].state < 1 ) {                        // states 0 or below indicate done. 0 == failure, -n == success
427                         //print_stats( &cvs[i], 1, i == 0 );
428                         print_stats( &cvs[i], 1, 0 );
429                         print_stats( &cvs[i], 0, 0 );
430
431                         nthreads--;
432                         if( cvs[i].state == 0 ) {
433                                 failures++;
434                         }
435                         i++;
436                 } else {
437                 //      sleep( 1 );
438                         rbuf = rmr_torcv_msg( mrc, rbuf, 1000 );
439                         if( rbuf != NULL && rbuf->state != RMR_ERR_RETRY ) {
440                                 pings++;
441                                 rmr_free_msg( rbuf );
442                                 rbuf = NULL;
443                         }
444                 }
445                 if( time( NULL ) > timeout ) {
446                         failures += nthreads;
447                         fprintf( stderr, "<CALL> timeout waiting for threads to finish; %d were not finished\n", nthreads );
448                         break;
449                 }
450         }
451
452         fprintf( stderr, "<CALL> [%s] failing threads=%d  pings reeived=%d\n", failures == 0 ? "PASS" : "FAIL",  failures, pings );
453         sleep( 2 );
454         rmr_close( mrc );
455
456         return failures > 0;
457 }
458