// :vim ts=4 sw=4 noet: /* ================================================================================== Copyright (c) 2019 Nokia Copyright (c) 2018-2019 AT&T Intellectual Property. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ================================================================================== */ /* Mnemonic: lsender.c Abstract: This is a simple sender, slimiar to sender.c, except that a timestamp is placed into the messages such that latency measurements can be made. The message format is 'binary' defined by the lc_msg struct. Parms: argv[1] == number of msgs to send (10) argv[2] == delay (mu-seconds, 1000000 default) argv[3] == listen port Sender will send for at most 20 seconds, so if nmsgs and delay extend beyond that period the total number of messages sent will be less than n. Date: 18 April 2019 Author: E. Scott Daniels */ #include #include #include #include #include #include #include #include #include #define TRACE_SIZE 40 // bytes in header to provide for trace junk #define SUCCESS (-1) /* Thread data */ typedef struct tdata { int id; // the id we'll pass to RMr mt-call function NOT the thread id int n2send; // number of messages to send int delay; // ms delay between messages void* mrc; // RMr context int state; int* in_bins; // latency count bins int* out_bins; int nbins; // number of bins allocated long long in_max; long long out_max; int out_oor; // out of range count int in_oor; int in_bcount; // total messages tracked in bins int out_bcount; // total messages tracked in bins } tdata_t; /* The message type placed into the payload. */ typedef struct lc_msg { struct timespec out_ts; // time just before call executed struct timespec turn_ts; // time at the receiver, on receipt struct timespec in_ts; // time received back by the caller int out_retries; // number of retries required to send int turn_retries; // number of retries required to send } lc_msg_t; // -------------------------------------------------------------------------------- static int sum( char* str ) { int sum = 0; int i = 0; while( *str ) { sum += *(str++) + i++; } return sum % 255; } static void print_stats( tdata_t* td, int out, int hist ) { int sum; // sum of latencies int csum = 0; // cutoff sum int i95 = 0; // bin for the 95th count int i99 = 0; // bin for the 99th count int mean = -1; int cutoff_95; // 95% of total messages int cutoff_99; // 99% of total messages int oor; int max; int j; if( out ) { cutoff_95 = .95 * (td->out_oor + td->out_bcount); cutoff_99 = .95 * (td->out_oor + td->out_bcount); oor = td->out_oor; max = td->out_max; } else { cutoff_95 = .95 * (td->in_oor + td->in_bcount); cutoff_99 = .95 * (td->in_oor + td->in_bcount); oor = td->in_oor; max = td->in_max; } sum = 0; for( j = 0; j < td->nbins; j++ ) { if( csum < cutoff_95 ) { i95++; } if( csum < cutoff_99 ) { i99++; } if( out ) { csum += td->out_bins[j]; sum += td->out_bins[j] * j; } else { csum += td->in_bins[j]; sum += td->in_bins[j] * j; } } if( out ) { if( td->out_bcount ) { mean = sum/(td->out_bcount); } } else { if( td->in_bcount ) { mean = sum/(td->in_bcount); } } if( hist ) { for( j = 0; j < td->nbins; j++ ) { fprintf( stderr, "%3d %d\n", j, out ? td->out_bins[j] : td->in_bins[j] ); } } fprintf( stderr, "%s: oor=%d max=%.2fms mean=%.2fms 95th=%.2fms 99th=%.2f\n", out ? "out" : " in", oor, (double)max/1000000.0, (double)mean/100.0, (double) i95/100.0, i99/100.0 ); } /* Given a message, compute the in/out and round trip latencies. */ static void compute_latency( tdata_t* td, lc_msg_t* lcm ) { long long out; long long turn; long long in; double rtl; // round trip latency double outl; // caller to receiver latency (out) double inl; // receiver to caller latency (in) int bin; if( lcm == NULL || td == NULL ) { return; } out = (lcm->out_ts.tv_sec * 1000000000) + lcm->out_ts.tv_nsec; in = (lcm->in_ts.tv_sec * 1000000000) + lcm->in_ts.tv_nsec; turn = (lcm->turn_ts.tv_sec * 1000000000) + lcm->turn_ts.tv_nsec; if( in - turn > td->in_max ) { td->in_max = in - turn; } if( turn - out > td->out_max ) { td->out_max = turn-out; } bin = (turn-out) / 10000; // 100ths of ms #ifdef PRINT outl = ((double) turn - out) / 1000000.0; // convert to ms inl = ((double) in - turn) / 1000000.0; rtl = ((double) in - out) / 1000000.0; fprintf( stderr, "outl = %5.3fms inl = %5.3fms rtl = %5.3fms bin=%d\n", outl, inl, rtl, bin ); #else bin = (turn - out) / 10000; // 100ths of ms if( bin < td->nbins ) { td->out_bins[bin]++; td->out_bcount++; } else { td->out_oor++; } bin = (in - turn) / 10000; // 100ths of ms if( bin < td->nbins ) { td->in_bins[bin]++; td->in_bcount++; } else { td->in_oor++; } #endif } /* Compute the elapsed time between ts1 and ts2. */ static int elapsed( struct timespec* start_ts, struct timespec* end_ts ) { long long start; long long end; int bin; start = ( start_ts->tv_sec * 1000000000) + start_ts->tv_nsec; end = ( end_ts->tv_sec * 1000000000) + end_ts->tv_nsec; bin = (end - start) / 1000000; // ms return bin; } /* The main thing. */ static void* send_msgs( void* mrc, int n2send, int delay, int retry ) { lc_msg_t* lcm; // pointer at the payload as a struct rmr_mbuf_t* sbuf; // send buffer int count = 0; int rt_count = 0; // number of messages that had a retry on first send attempt int good_count = 0; int drops = 0; int fail_count = 0; // # of failure sends after first successful send int successful = 0; // set to true after we have a successful send char xbuf[1024]; // build transaction string here int xaction_id = 1; char* tok; int state = 0; struct timespec start_ts; struct timespec end_ts; int mtype = 0; if( mrc == NULL ) { fprintf( stderr, "send_msg: bad mrc\n" ); } sbuf = rmr_alloc_msg( mrc, 256 ); // alloc first send buffer; subsequent buffers allcoated on send snprintf( xbuf, 200, "%31d", xaction_id ); while( count < n2send ) { // we send n messages after the first message is successful lcm = (lc_msg_t *) sbuf->payload; rmr_bytes2xact( sbuf, xbuf, 32 ); sbuf->mtype = 0; sbuf->mtype = mtype++; // all go with the same type if( mtype > 9 ) { mtype = 0; } sbuf->len = sizeof( *lcm ); sbuf->state = RMR_OK; lcm->out_retries = 0; lcm->turn_retries = 0; clock_gettime( CLOCK_REALTIME, &lcm->out_ts ); // mark time out sbuf = rmr_send_msg( mrc, sbuf ); if( sbuf && sbuf->state == RMR_ERR_RETRY ) { // send not accepted if( retry || count == 0 ) { rt_count++; // # messages that we retried beyond rmr's retry } else { if( delay ) usleep( delay ); fail_count++; // send failed because we drop it } } count++; if( sbuf != NULL ) { if( ! successful ) { switch( sbuf->state ) { case RMR_OK: clock_gettime( CLOCK_REALTIME, &start_ts ); successful = 1; good_count++; break; default: fprintf( stderr, " send error: rmr-state=%d ernro=%d\n", sbuf->state, errno ); sleep( 1 ); break; } } else { good_count += sbuf->state == RMR_OK; } } else { sbuf = rmr_alloc_msg( mrc, 512 ); // must have a sedn buffer at top drops++; } //if( count < n2send && (count % 100) == 0 && delay > 0 ) { if( count < n2send && delay > 0 ) { if( count % 500 ) { usleep( delay ); } } } clock_gettime( CLOCK_REALTIME, &end_ts ); fprintf( stderr, " sending finished attempted=%d good=%d fails=%d rt=%d elapsed=%d ms, \n", count, good_count, fail_count, rt_count, elapsed( &start_ts, &end_ts ) ); return NULL; } int main( int argc, char** argv ) { void* mrc; // msg router context rmr_mbuf_t* rbuf = NULL; // received on 'normal' flow char* listen_port = "43086"; // largely unused here long timeout = 0; int delay = 100000; // usec between send attempts int nmsgs = 10; // number of messages to send int rmr_retries = 0; // number of retries we allow rmr to do if( argc > 1 ) { nmsgs = atoi( argv[1] ); } if( argc > 2 ) { delay = atoi( argv[2] ); } if( argc > 4 ) { listen_port = argv[4]; } if( argc > 3 ) { rmr_retries = atoi( argv[3] ); } fprintf( stderr, " listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay ); if( (mrc = rmr_init( listen_port, 1400, RMRFL_MTCALL )) == NULL ) { // initialise with multi-threaded call enabled fprintf( stderr, " unable to initialise RMr\n" ); exit( 1 ); } fprintf( stderr, "\nsetting rmr retries: %d\n", rmr_retries ); //if( rmr_retries != 1 ) { rmr_set_stimeout( mrc, rmr_retries ); //} timeout = time( NULL ) + 20; // give rmr 20s to find the route table (shouldn't need that much) while( ! rmr_ready( mrc ) ) { // must have a route table before we can send; wait til RMr says it has one fprintf( stderr, " waiting for rmr to show ready\n" ); sleep( 1 ); if( time( NULL ) > timeout ) { fprintf( stderr, " giving up\n" ); exit( 1 ); } } fprintf( stderr, " rmr is ready; starting sender retries=%d\n", rmr_retries ); send_msgs( mrc, nmsgs, delay, rmr_retries ); fprintf( stderr, "pausing for drain\n" ); sleep( 3 ); fprintf( stderr, "closing down\n" ); rmr_close( mrc ); return 0; }