Added new message type constants
[ric-plt/lib/rmr.git] / test / app_test / lsender.c
1 // :vim ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       lsender.c
23         Abstract:       This is a simple sender, slimiar to sender.c, except that a timestamp
24                                 is placed into the messages such that latency measurements can be 
25                                 made.
26                                 The message format is 'binary' defined by the lc_msg struct.
27
28                                 Parms:  argv[1] == number of msgs to send (10)
29                                                 argv[2] == delay                (mu-seconds, 1000000 default)
30                                                 argv[3] == listen port
31
32                                 Sender will send for at most 20 seconds, so if nmsgs and delay extend
33                                 beyond that period the total number of messages sent will be less
34                                 than n.
35
36         Date:           18 April 2019
37         Author:         E. Scott Daniels
38 */
39
40 #include <unistd.h>
41 #include <errno.h>
42 #include <string.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <sys/epoll.h>
46 #include <time.h>
47 #include <pthread.h>
48
49
50 #include <rmr/rmr.h>
51
52 #define TRACE_SIZE 40           // bytes in header to provide for trace junk
53 #define SUCCESS         (-1)
54
55 /*
56         Thread data
57 */
58 typedef struct tdata {
59         int             id;                                     // the id we'll pass to RMr mt-call function NOT the thread id
60         int             n2send;                         // number of messages to send
61         int             delay;                          // ms delay between messages
62         void*   mrc;                            // RMr context
63         int             state;
64         int*    in_bins;                        // latency count bins
65         int*    out_bins;
66         int             nbins;                          // number of bins allocated
67         long long in_max;
68         long long out_max;
69         int             out_oor;                        // out of range count
70         int             in_oor;
71         int             in_bcount;                              // total messages tracked in bins
72         int             out_bcount;                             // total messages tracked in bins
73 } tdata_t;
74
75
76 /*
77         The message type placed into the payload.
78 */
79 typedef struct lc_msg {
80         struct timespec out_ts;                 // time just before call executed
81         struct timespec turn_ts;                // time at the receiver,  on receipt
82         struct timespec in_ts;                  // time received back by the caller
83         int             out_retries;                    // number of retries required to send
84         int             turn_retries;                   // number of retries required to send
85 } lc_msg_t;
86
87 // --------------------------------------------------------------------------------
88
89
90 static int sum( char* str ) {
91         int sum = 0;
92         int     i = 0;
93
94         while( *str ) {
95                 sum += *(str++) + i++;
96         }
97
98         return sum % 255;
99 }
100
101 static void print_stats( tdata_t* td, int out, int hist ) {
102         int sum;                                        // sum of latencies
103         int csum = 0;                           // cutoff sum
104         int i95 = 0;                            // bin for the 95th count
105         int i99 = 0;                            // bin for the 99th count
106         int mean = -1;
107         int cutoff_95;                          // 95% of total messages
108         int cutoff_99;                          // 99% of total messages
109         int     oor;
110         int max;
111         int j;
112
113         if( out ) {
114                 cutoff_95 = .95 * (td->out_oor + td->out_bcount);
115                 cutoff_99 = .95 * (td->out_oor + td->out_bcount);
116                 oor = td->out_oor;
117                 max = td->out_max;
118         } else {
119                 cutoff_95 = .95 * (td->in_oor + td->in_bcount);
120                 cutoff_99 = .95 * (td->in_oor + td->in_bcount);
121                 oor = td->in_oor;
122                 max = td->in_max;
123         }
124
125         sum = 0;
126         for( j = 0; j < td->nbins; j++ ) {
127                 if( csum < cutoff_95 ) {
128                         i95++;
129                 }
130                 if( csum < cutoff_99 ) {
131                         i99++;
132                 }
133
134                 if( out ) {
135                         csum += td->out_bins[j];
136                         sum += td->out_bins[j] * j;
137                 } else {
138                         csum += td->in_bins[j];
139                         sum += td->in_bins[j] * j;
140                 }
141         }
142
143         if( out ) {
144                 if( td->out_bcount ) {
145                         mean = sum/(td->out_bcount);
146                 }
147         } else {
148                 if( td->in_bcount ) {
149                         mean = sum/(td->in_bcount);
150                 }
151         }
152
153         if( hist ) {
154                 for( j = 0; j < td->nbins; j++ ) {
155                         fprintf( stderr, "%3d %d\n", j, out ? td->out_bins[j] : td->in_bins[j] );
156                 }
157         }
158
159         fprintf( stderr, "%s: oor=%d max=%.2fms  mean=%.2fms  95th=%.2fms 99th=%.2f\n", 
160                 out ? "out" : " in", oor, (double)max/1000000.0, (double)mean/100.0, (double) i95/100.0, i99/100.0 );
161 }
162
163 /*
164         Given a message, compute the in/out and round trip latencies.
165 */
166 static void compute_latency( tdata_t* td, lc_msg_t* lcm ) {
167         long long out;
168         long long turn;
169         long long in;
170         double rtl;             // round trip latency
171         double outl;    // caller to receiver latency (out)
172         double inl;             // receiver to caller latency (in)
173         int bin;
174
175         if( lcm == NULL || td == NULL ) {
176                 return;
177         }
178
179         out = (lcm->out_ts.tv_sec * 1000000000) + lcm->out_ts.tv_nsec;
180         in = (lcm->in_ts.tv_sec * 1000000000) + lcm->in_ts.tv_nsec;
181         turn = (lcm->turn_ts.tv_sec * 1000000000) + lcm->turn_ts.tv_nsec;
182
183         if( in - turn > td->in_max ) {
184                 td->in_max = in - turn;
185         }
186         if( turn - out > td->out_max ) {
187                 td->out_max = turn-out;
188         }
189         
190         bin = (turn-out) / 10000;                       // 100ths of ms
191
192 #ifdef PRINT
193         outl = ((double) turn - out) / 1000000.0;               // convert to ms
194         inl = ((double) in - turn) / 1000000.0;
195         rtl = ((double) in - out) / 1000000.0;
196
197         fprintf( stderr, "outl = %5.3fms   inl = %5.3fms  rtl = %5.3fms bin=%d\n", outl, inl, rtl, bin );
198 #else
199
200         bin = (turn - out) / 10000;                     // 100ths of ms
201         if( bin < td->nbins ) {
202                 td->out_bins[bin]++;
203                 td->out_bcount++;
204         } else {
205                 td->out_oor++;
206         }
207
208         bin = (in - turn) / 10000;                      // 100ths of ms
209         if( bin < td->nbins ) {
210                 td->in_bins[bin]++;
211                 td->in_bcount++;
212         } else {
213                 td->in_oor++;
214         }
215
216 #endif
217 }
218
219 /*
220         Compute the elapsed time between ts1 and ts2.
221 */
222 static int elapsed( struct timespec* start_ts, struct timespec* end_ts ) {
223         long long start;
224         long long end;
225         int bin;
226
227         start = ( start_ts->tv_sec * 1000000000) + start_ts->tv_nsec;
228         end = ( end_ts->tv_sec * 1000000000) + end_ts->tv_nsec;
229
230         bin = (end - start) / 1000000;                  // ms
231
232         return bin;
233 }
234
235 /*
236         The main thing.
237 */
238 static void* send_msgs( void* mrc, int n2send, int delay, int retry ) {
239         lc_msg_t*       lcm;                                            // pointer at the payload as a struct
240         rmr_mbuf_t*             sbuf;                                   // send buffer
241         int             count = 0;
242         int             rt_count = 0;                                   // number of messages that had a retry on first send attempt
243         int             good_count = 0;
244         int             drops = 0;
245         int             fail_count = 0;                                 // # of failure sends after first successful send
246         int             successful = 0;                                 // set to true after we have a successful send
247         char    xbuf[1024];                                             // build transaction string here
248         int             xaction_id = 1;
249         char*   tok;
250         int             state = 0;
251         struct timespec start_ts;
252         struct timespec end_ts;
253         int             mtype = 0;
254
255         if( mrc == NULL ) {
256                 fprintf( stderr, "send_msg: bad mrc\n" );
257         }
258
259         sbuf = rmr_alloc_msg( mrc, 256 );       // alloc first send buffer; subsequent buffers allcoated on send
260
261         snprintf( xbuf, 200, "%31d", xaction_id );
262         while( count < n2send ) {                                                               // we send n messages after the first message is successful
263                 lcm = (lc_msg_t *) sbuf->payload;
264
265                 rmr_bytes2xact( sbuf, xbuf, 32 );
266
267                 sbuf->mtype = 0;
268                 sbuf->mtype = mtype++;                                                                                          // all go with the same type
269                 if( mtype > 9 ) {
270                         mtype = 0;
271                 }
272
273                 sbuf->len =  sizeof( *lcm );
274                 sbuf->state = RMR_OK;
275                 lcm->out_retries = 0;
276                 lcm->turn_retries = 0;
277                 clock_gettime( CLOCK_REALTIME, &lcm->out_ts );                                  // mark time out
278                 sbuf = rmr_send_msg( mrc, sbuf );
279
280                 if( sbuf && sbuf->state == RMR_ERR_RETRY ) {                                    // send not accepted
281                         if( retry || count == 0  ) {
282                                 rt_count++;                                                                                             // # messages that we retried beyond rmr's retry
283                         } else {
284                                 if( delay ) 
285                                         usleep( delay );
286                                 fail_count++;                   // send failed because we drop it
287                         }
288                 }
289
290                 count++;
291                 if( sbuf != NULL ) {
292                         if( ! successful ) {
293                                 switch( sbuf->state ) {
294                                         case RMR_OK:
295                                                 clock_gettime( CLOCK_REALTIME, &start_ts );
296                                                 successful = 1;
297                                                 good_count++;
298                                                 break;
299
300                                         default:
301                                                 fprintf( stderr, "<SM> send error: rmr-state=%d ernro=%d\n", sbuf->state, errno );
302                                                 sleep( 1 );
303                                                 break;
304                                 }
305                         } else {
306                                 good_count += sbuf->state == RMR_OK;
307                         }
308                 } else {
309                         sbuf = rmr_alloc_msg( mrc, 512 );                               // must have a sedn buffer at top
310                         drops++;
311                 }
312
313                 //if( count < n2send  &&  (count % 100) == 0  &&  delay > 0 ) {
314                 if( count < n2send && delay > 0 ) {
315                         if( count % 500 ) {
316                                 usleep( delay );
317                         }
318                 }
319         }
320                                                 
321         clock_gettime( CLOCK_REALTIME, &end_ts );
322
323         fprintf( stderr, "<SM> sending finished attempted=%d good=%d fails=%d rt=%d elapsed=%d ms, \n", count, good_count, fail_count, rt_count, elapsed( &start_ts, &end_ts ) );
324         return NULL;
325 }
326
327 int main( int argc, char** argv ) {
328         void* mrc;                                                      // msg router context
329         rmr_mbuf_t*     rbuf = NULL;                            // received on 'normal' flow
330         char*   listen_port = "43086";                  // largely unused here
331         long    timeout = 0;
332         int             delay = 100000;                                 // usec between send attempts
333         int             nmsgs = 10;                                             // number of messages to send
334         int             rmr_retries = 0;                                // number of retries we allow rmr to do
335
336         if( argc > 1 ) {
337                 nmsgs = atoi( argv[1] );
338         }
339         if( argc > 2 ) {
340                 delay = atoi( argv[2] );
341         }
342         if( argc > 4 ) {
343                 listen_port = argv[4];
344         }
345         if( argc > 3 ) {
346                 rmr_retries = atoi( argv[3] );
347         }
348
349         fprintf( stderr, "<LSEND> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
350
351         if( (mrc = rmr_init( listen_port, 1400, RMRFL_MTCALL )) == NULL ) {             // initialise with multi-threaded call enabled
352                 fprintf( stderr, "<LSEND> unable to initialise RMr\n" );
353                 exit( 1 );
354         }
355
356         fprintf( stderr, "\nsetting rmr retries: %d\n", rmr_retries );
357         //if( rmr_retries != 1 ) {
358                 rmr_set_stimeout( mrc, rmr_retries );
359         //}
360
361         timeout = time( NULL ) + 20;            // give rmr 20s to find the route table (shouldn't need that much)
362         while( ! rmr_ready( mrc ) ) {           // must have a route table before we can send; wait til RMr says it has one
363                 fprintf( stderr, "<LSEND> waiting for rmr to show ready\n" );
364                 sleep( 1 );
365
366                 if( time( NULL ) > timeout ) {
367                         fprintf( stderr, "<LSEND> giving up\n" );
368                         exit( 1 );
369                 }
370         }
371         fprintf( stderr, "<LSEND> rmr is ready; starting sender retries=%d\n", rmr_retries );
372
373         send_msgs( mrc, nmsgs, delay, rmr_retries );
374
375         fprintf( stderr, "pausing for drain\n" );
376         sleep( 3 );
377         fprintf( stderr, "closing down\n" );
378         rmr_close( mrc );
379
380         return 0;
381 }
382