1 // :vim ts=4 sw=4 noet:
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: This is a simple sender, slimiar to sender.c, except that a timestamp
24 is placed into the messages such that latency measurements can be
26 The message format is 'binary' defined by the lc_msg struct.
28 Parms: argv[1] == number of msgs to send (10)
29 argv[2] == delay (mu-seconds, 1000000 default)
30 argv[3] == listen port
32 Sender will send for at most 20 seconds, so if nmsgs and delay extend
33 beyond that period the total number of messages sent will be less
37 Author: E. Scott Daniels
45 #include <sys/epoll.h>
52 #define TRACE_SIZE 40 // bytes in header to provide for trace junk
58 typedef struct tdata {
59 int id; // the id we'll pass to RMr mt-call function NOT the thread id
60 int n2send; // number of messages to send
61 int delay; // ms delay between messages
62 void* mrc; // RMr context
64 int* in_bins; // latency count bins
66 int nbins; // number of bins allocated
69 int out_oor; // out of range count
71 int in_bcount; // total messages tracked in bins
72 int out_bcount; // total messages tracked in bins
77 The message type placed into the payload.
79 typedef struct lc_msg {
80 struct timespec out_ts; // time just before call executed
81 struct timespec turn_ts; // time at the receiver, on receipt
82 struct timespec in_ts; // time received back by the caller
83 int out_retries; // number of retries required to send
84 int turn_retries; // number of retries required to send
87 // --------------------------------------------------------------------------------
90 static int sum( char* str ) {
95 sum += *(str++) + i++;
101 static void print_stats( tdata_t* td, int out, int hist ) {
102 int sum; // sum of latencies
103 int csum = 0; // cutoff sum
104 int i95 = 0; // bin for the 95th count
105 int i99 = 0; // bin for the 99th count
107 int cutoff_95; // 95% of total messages
108 int cutoff_99; // 99% of total messages
114 cutoff_95 = .95 * (td->out_oor + td->out_bcount);
115 cutoff_99 = .95 * (td->out_oor + td->out_bcount);
119 cutoff_95 = .95 * (td->in_oor + td->in_bcount);
120 cutoff_99 = .95 * (td->in_oor + td->in_bcount);
126 for( j = 0; j < td->nbins; j++ ) {
127 if( csum < cutoff_95 ) {
130 if( csum < cutoff_99 ) {
135 csum += td->out_bins[j];
136 sum += td->out_bins[j] * j;
138 csum += td->in_bins[j];
139 sum += td->in_bins[j] * j;
144 if( td->out_bcount ) {
145 mean = sum/(td->out_bcount);
148 if( td->in_bcount ) {
149 mean = sum/(td->in_bcount);
154 for( j = 0; j < td->nbins; j++ ) {
155 fprintf( stderr, "%3d %d\n", j, out ? td->out_bins[j] : td->in_bins[j] );
159 fprintf( stderr, "%s: oor=%d max=%.2fms mean=%.2fms 95th=%.2fms 99th=%.2f\n",
160 out ? "out" : " in", oor, (double)max/1000000.0, (double)mean/100.0, (double) i95/100.0, i99/100.0 );
164 Given a message, compute the in/out and round trip latencies.
166 static void compute_latency( tdata_t* td, lc_msg_t* lcm ) {
170 double rtl; // round trip latency
171 double outl; // caller to receiver latency (out)
172 double inl; // receiver to caller latency (in)
175 if( lcm == NULL || td == NULL ) {
179 out = (lcm->out_ts.tv_sec * 1000000000) + lcm->out_ts.tv_nsec;
180 in = (lcm->in_ts.tv_sec * 1000000000) + lcm->in_ts.tv_nsec;
181 turn = (lcm->turn_ts.tv_sec * 1000000000) + lcm->turn_ts.tv_nsec;
183 if( in - turn > td->in_max ) {
184 td->in_max = in - turn;
186 if( turn - out > td->out_max ) {
187 td->out_max = turn-out;
190 bin = (turn-out) / 10000; // 100ths of ms
193 outl = ((double) turn - out) / 1000000.0; // convert to ms
194 inl = ((double) in - turn) / 1000000.0;
195 rtl = ((double) in - out) / 1000000.0;
197 fprintf( stderr, "outl = %5.3fms inl = %5.3fms rtl = %5.3fms bin=%d\n", outl, inl, rtl, bin );
200 bin = (turn - out) / 10000; // 100ths of ms
201 if( bin < td->nbins ) {
208 bin = (in - turn) / 10000; // 100ths of ms
209 if( bin < td->nbins ) {
220 Compute the elapsed time between ts1 and ts2.
222 static int elapsed( struct timespec* start_ts, struct timespec* end_ts ) {
227 start = ( start_ts->tv_sec * 1000000000) + start_ts->tv_nsec;
228 end = ( end_ts->tv_sec * 1000000000) + end_ts->tv_nsec;
230 bin = (end - start) / 1000000; // ms
238 static void* send_msgs( void* mrc, int n2send, int delay, int retry ) {
239 lc_msg_t* lcm; // pointer at the payload as a struct
240 rmr_mbuf_t* sbuf; // send buffer
242 int rt_count = 0; // number of messages that had a retry on first send attempt
245 int fail_count = 0; // # of failure sends after first successful send
246 int successful = 0; // set to true after we have a successful send
247 char xbuf[1024]; // build transaction string here
251 struct timespec start_ts;
252 struct timespec end_ts;
256 fprintf( stderr, "send_msg: bad mrc\n" );
259 sbuf = rmr_alloc_msg( mrc, 256 ); // alloc first send buffer; subsequent buffers allcoated on send
261 snprintf( xbuf, 200, "%31d", xaction_id );
262 while( count < n2send ) { // we send n messages after the first message is successful
263 lcm = (lc_msg_t *) sbuf->payload;
265 rmr_bytes2xact( sbuf, xbuf, 32 );
268 sbuf->mtype = mtype++; // all go with the same type
273 sbuf->len = sizeof( *lcm );
274 sbuf->state = RMR_OK;
275 lcm->out_retries = 0;
276 lcm->turn_retries = 0;
277 clock_gettime( CLOCK_REALTIME, &lcm->out_ts ); // mark time out
278 sbuf = rmr_send_msg( mrc, sbuf );
280 if( sbuf && sbuf->state == RMR_ERR_RETRY ) { // send not accepted
281 if( retry || count == 0 ) {
282 rt_count++; // # messages that we retried beyond rmr's retry
286 fail_count++; // send failed because we drop it
293 switch( sbuf->state ) {
295 clock_gettime( CLOCK_REALTIME, &start_ts );
301 fprintf( stderr, "<SM> send error: rmr-state=%d ernro=%d\n", sbuf->state, errno );
306 good_count += sbuf->state == RMR_OK;
309 sbuf = rmr_alloc_msg( mrc, 512 ); // must have a sedn buffer at top
313 //if( count < n2send && (count % 100) == 0 && delay > 0 ) {
314 if( count < n2send && delay > 0 ) {
321 clock_gettime( CLOCK_REALTIME, &end_ts );
323 fprintf( stderr, "<SM> sending finished attempted=%d good=%d fails=%d rt=%d elapsed=%d ms, \n", count, good_count, fail_count, rt_count, elapsed( &start_ts, &end_ts ) );
327 int main( int argc, char** argv ) {
328 void* mrc; // msg router context
329 rmr_mbuf_t* rbuf = NULL; // received on 'normal' flow
330 char* listen_port = "43086"; // largely unused here
332 int delay = 100000; // usec between send attempts
333 int nmsgs = 10; // number of messages to send
334 int rmr_retries = 0; // number of retries we allow rmr to do
337 nmsgs = atoi( argv[1] );
340 delay = atoi( argv[2] );
343 listen_port = argv[4];
346 rmr_retries = atoi( argv[3] );
349 fprintf( stderr, "<LSEND> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
351 if( (mrc = rmr_init( listen_port, 1400, RMRFL_MTCALL )) == NULL ) { // initialise with multi-threaded call enabled
352 fprintf( stderr, "<LSEND> unable to initialise RMr\n" );
356 fprintf( stderr, "\nsetting rmr retries: %d\n", rmr_retries );
357 //if( rmr_retries != 1 ) {
358 rmr_set_stimeout( mrc, rmr_retries );
361 timeout = time( NULL ) + 20; // give rmr 20s to find the route table (shouldn't need that much)
362 while( ! rmr_ready( mrc ) ) { // must have a route table before we can send; wait til RMr says it has one
363 fprintf( stderr, "<LSEND> waiting for rmr to show ready\n" );
366 if( time( NULL ) > timeout ) {
367 fprintf( stderr, "<LSEND> giving up\n" );
371 fprintf( stderr, "<LSEND> rmr is ready; starting sender retries=%d\n", rmr_retries );
373 send_msgs( mrc, nmsgs, delay, rmr_retries );
375 fprintf( stderr, "pausing for drain\n" );
377 fprintf( stderr, "closing down\n" );