--- /dev/null
+// :vim ts=4 sw=4 noet:
+/*
+==================================================================================
+ Copyright (c) 2019 Nokia
+ Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+*/
+
+/*
+ Mnemonic: lsender.c
+ Abstract: This is a simple sender, slimiar to sender.c, except that a timestamp
+ is placed into the messages such that latency measurements can be
+ made.
+ The message format is 'binary' defined by the lc_msg struct.
+
+ Parms: argv[1] == number of msgs to send (10)
+ argv[2] == delay (mu-seconds, 1000000 default)
+ argv[3] == listen port
+
+ Sender will send for at most 20 seconds, so if nmsgs and delay extend
+ beyond that period the total number of messages sent will be less
+ than n.
+
+ Date: 18 April 2019
+ Author: E. Scott Daniels
+*/
+
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/epoll.h>
+#include <time.h>
+#include <pthread.h>
+
+
+#include <rmr/rmr.h>
+
+#define TRACE_SIZE 40 // bytes in header to provide for trace junk
+#define SUCCESS (-1)
+
+/*
+ Thread data
+*/
+typedef struct tdata {
+ int id; // the id we'll pass to RMr mt-call function NOT the thread id
+ int n2send; // number of messages to send
+ int delay; // ms delay between messages
+ void* mrc; // RMr context
+ int state;
+ int* in_bins; // latency count bins
+ int* out_bins;
+ int nbins; // number of bins allocated
+ long long in_max;
+ long long out_max;
+ int out_oor; // out of range count
+ int in_oor;
+ int in_bcount; // total messages tracked in bins
+ int out_bcount; // total messages tracked in bins
+} tdata_t;
+
+
+/*
+ The message type placed into the payload.
+*/
+typedef struct lc_msg {
+ struct timespec out_ts; // time just before call executed
+ struct timespec turn_ts; // time at the receiver, on receipt
+ struct timespec in_ts; // time received back by the caller
+ int out_retries; // number of retries required to send
+ int turn_retries; // number of retries required to send
+} lc_msg_t;
+
+// --------------------------------------------------------------------------------
+
+
+static int sum( char* str ) {
+ int sum = 0;
+ int i = 0;
+
+ while( *str ) {
+ sum += *(str++) + i++;
+ }
+
+ return sum % 255;
+}
+
+static void print_stats( tdata_t* td, int out, int hist ) {
+ int sum; // sum of latencies
+ int csum = 0; // cutoff sum
+ int i95 = 0; // bin for the 95th count
+ int i99 = 0; // bin for the 99th count
+ int mean = -1;
+ int cutoff_95; // 95% of total messages
+ int cutoff_99; // 99% of total messages
+ int oor;
+ int max;
+ int j;
+
+ if( out ) {
+ cutoff_95 = .95 * (td->out_oor + td->out_bcount);
+ cutoff_99 = .95 * (td->out_oor + td->out_bcount);
+ oor = td->out_oor;
+ max = td->out_max;
+ } else {
+ cutoff_95 = .95 * (td->in_oor + td->in_bcount);
+ cutoff_99 = .95 * (td->in_oor + td->in_bcount);
+ oor = td->in_oor;
+ max = td->in_max;
+ }
+
+ sum = 0;
+ for( j = 0; j < td->nbins; j++ ) {
+ if( csum < cutoff_95 ) {
+ i95++;
+ }
+ if( csum < cutoff_99 ) {
+ i99++;
+ }
+
+ if( out ) {
+ csum += td->out_bins[j];
+ sum += td->out_bins[j] * j;
+ } else {
+ csum += td->in_bins[j];
+ sum += td->in_bins[j] * j;
+ }
+ }
+
+ if( out ) {
+ if( td->out_bcount ) {
+ mean = sum/(td->out_bcount);
+ }
+ } else {
+ if( td->in_bcount ) {
+ mean = sum/(td->in_bcount);
+ }
+ }
+
+ if( hist ) {
+ for( j = 0; j < td->nbins; j++ ) {
+ fprintf( stderr, "%3d %d\n", j, out ? td->out_bins[j] : td->in_bins[j] );
+ }
+ }
+
+ fprintf( stderr, "%s: oor=%d max=%.2fms mean=%.2fms 95th=%.2fms 99th=%.2f\n",
+ out ? "out" : " in", oor, (double)max/1000000.0, (double)mean/100.0, (double) i95/100.0, i99/100.0 );
+}
+
+/*
+ Given a message, compute the in/out and round trip latencies.
+*/
+static void compute_latency( tdata_t* td, lc_msg_t* lcm ) {
+ long long out;
+ long long turn;
+ long long in;
+ double rtl; // round trip latency
+ double outl; // caller to receiver latency (out)
+ double inl; // receiver to caller latency (in)
+ int bin;
+
+ if( lcm == NULL || td == NULL ) {
+ return;
+ }
+
+ out = (lcm->out_ts.tv_sec * 1000000000) + lcm->out_ts.tv_nsec;
+ in = (lcm->in_ts.tv_sec * 1000000000) + lcm->in_ts.tv_nsec;
+ turn = (lcm->turn_ts.tv_sec * 1000000000) + lcm->turn_ts.tv_nsec;
+
+ if( in - turn > td->in_max ) {
+ td->in_max = in - turn;
+ }
+ if( turn - out > td->out_max ) {
+ td->out_max = turn-out;
+ }
+
+ bin = (turn-out) / 10000; // 100ths of ms
+
+#ifdef PRINT
+ outl = ((double) turn - out) / 1000000.0; // convert to ms
+ inl = ((double) in - turn) / 1000000.0;
+ rtl = ((double) in - out) / 1000000.0;
+
+ fprintf( stderr, "outl = %5.3fms inl = %5.3fms rtl = %5.3fms bin=%d\n", outl, inl, rtl, bin );
+#else
+
+ bin = (turn - out) / 10000; // 100ths of ms
+ if( bin < td->nbins ) {
+ td->out_bins[bin]++;
+ td->out_bcount++;
+ } else {
+ td->out_oor++;
+ }
+
+ bin = (in - turn) / 10000; // 100ths of ms
+ if( bin < td->nbins ) {
+ td->in_bins[bin]++;
+ td->in_bcount++;
+ } else {
+ td->in_oor++;
+ }
+
+#endif
+}
+
+/*
+ Compute the elapsed time between ts1 and ts2.
+*/
+static int elapsed( struct timespec* start_ts, struct timespec* end_ts ) {
+ long long start;
+ long long end;
+ int bin;
+
+ start = ( start_ts->tv_sec * 1000000000) + start_ts->tv_nsec;
+ end = ( end_ts->tv_sec * 1000000000) + end_ts->tv_nsec;
+
+ bin = (end - start) / 1000000; // ms
+
+ return bin;
+}
+
+/*
+ The main thing.
+*/
+static void* send_msgs( void* mrc, int n2send, int delay, int retry ) {
+ lc_msg_t* lcm; // pointer at the payload as a struct
+ rmr_mbuf_t* sbuf; // send buffer
+ int count = 0;
+ int rt_count = 0; // number of messages that had a retry on first send attempt
+ int good_count = 0;
+ int drops = 0;
+ int fail_count = 0; // # of failure sends after first successful send
+ int successful = 0; // set to true after we have a successful send
+ char xbuf[1024]; // build transaction string here
+ int xaction_id = 1;
+ char* tok;
+ int state = 0;
+ struct timespec start_ts;
+ struct timespec end_ts;
+ int mtype = 0;
+
+ if( mrc == NULL ) {
+ fprintf( stderr, "send_msg: bad mrc\n" );
+ }
+
+ sbuf = rmr_alloc_msg( mrc, 256 ); // alloc first send buffer; subsequent buffers allcoated on send
+
+ snprintf( xbuf, 200, "%31d", xaction_id );
+ while( count < n2send ) { // we send n messages after the first message is successful
+ lcm = (lc_msg_t *) sbuf->payload;
+
+ rmr_bytes2xact( sbuf, xbuf, 32 );
+
+ sbuf->mtype = 0;
+ sbuf->mtype = mtype++; // all go with the same type
+ if( mtype > 9 ) {
+ mtype = 0;
+ }
+
+ sbuf->len = sizeof( *lcm );
+ sbuf->state = RMR_OK;
+ lcm->out_retries = 0;
+ lcm->turn_retries = 0;
+ clock_gettime( CLOCK_REALTIME, &lcm->out_ts ); // mark time out
+ sbuf = rmr_send_msg( mrc, sbuf );
+
+ if( sbuf && sbuf->state == RMR_ERR_RETRY ) { // send not accepted
+ if( retry || count == 0 ) {
+ rt_count++; // # messages that we retried beyond rmr's retry
+ } else {
+ if( delay )
+ usleep( delay );
+ fail_count++; // send failed because we drop it
+ }
+ }
+
+ count++;
+ if( sbuf != NULL ) {
+ if( ! successful ) {
+ switch( sbuf->state ) {
+ case RMR_OK:
+ clock_gettime( CLOCK_REALTIME, &start_ts );
+ successful = 1;
+ good_count++;
+ break;
+
+ default:
+ fprintf( stderr, "<SM> send error: rmr-state=%d ernro=%d\n", sbuf->state, errno );
+ sleep( 1 );
+ break;
+ }
+ } else {
+ good_count += sbuf->state == RMR_OK;
+ }
+ } else {
+ sbuf = rmr_alloc_msg( mrc, 512 ); // must have a sedn buffer at top
+ drops++;
+ }
+
+ //if( count < n2send && (count % 100) == 0 && delay > 0 ) {
+ if( count < n2send && delay > 0 ) {
+ if( count % 500 ) {
+ usleep( delay );
+ }
+ }
+ }
+
+ clock_gettime( CLOCK_REALTIME, &end_ts );
+
+ fprintf( stderr, "<SM> sending finished attempted=%d good=%d fails=%d rt=%d elapsed=%d ms, \n", count, good_count, fail_count, rt_count, elapsed( &start_ts, &end_ts ) );
+ return NULL;
+}
+
+int main( int argc, char** argv ) {
+ void* mrc; // msg router context
+ rmr_mbuf_t* rbuf = NULL; // received on 'normal' flow
+ char* listen_port = "43086"; // largely unused here
+ long timeout = 0;
+ int delay = 100000; // usec between send attempts
+ int nmsgs = 10; // number of messages to send
+ int rmr_retries = 0; // number of retries we allow rmr to do
+
+ if( argc > 1 ) {
+ nmsgs = atoi( argv[1] );
+ }
+ if( argc > 2 ) {
+ delay = atoi( argv[2] );
+ }
+ if( argc > 4 ) {
+ listen_port = argv[4];
+ }
+ if( argc > 3 ) {
+ rmr_retries = atoi( argv[3] );
+ }
+
+ fprintf( stderr, "<LSEND> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
+
+ if( (mrc = rmr_init( listen_port, 1400, RMRFL_MTCALL )) == NULL ) { // initialise with multi-threaded call enabled
+ fprintf( stderr, "<LSEND> unable to initialise RMr\n" );
+ exit( 1 );
+ }
+
+ fprintf( stderr, "\nsetting rmr retries: %d\n", rmr_retries );
+ //if( rmr_retries != 1 ) {
+ rmr_set_stimeout( mrc, rmr_retries );
+ //}
+
+ timeout = time( NULL ) + 20; // give rmr 20s to find the route table (shouldn't need that much)
+ while( ! rmr_ready( mrc ) ) { // must have a route table before we can send; wait til RMr says it has one
+ fprintf( stderr, "<LSEND> waiting for rmr to show ready\n" );
+ sleep( 1 );
+
+ if( time( NULL ) > timeout ) {
+ fprintf( stderr, "<LSEND> giving up\n" );
+ exit( 1 );
+ }
+ }
+ fprintf( stderr, "<LSEND> rmr is ready; starting sender retries=%d\n", rmr_retries );
+
+ send_msgs( mrc, nmsgs, delay, rmr_retries );
+
+ fprintf( stderr, "pausing for drain\n" );
+ sleep( 3 );
+ fprintf( stderr, "closing down\n" );
+ rmr_close( mrc );
+
+ return 0;
+}
+