1 // :vim ts=4 sw=4 noet:
3 ==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18 ==================================================================================
23 Abstract: This is a simple sender which will send a series of messages using
24 rmr_call(). Similar to caller.c the major difference is that
25 a timestamp is placed into the message and the receiver is expected
26 to add a timestamp before executing an rts call. We can then
27 compute the total round trip latency as well as the forward send
30 Overall, N threads are started each sending the desired number
31 of messages and expecting an 'ack' for each. Each ack is examined
32 to verify that the thread id placed into the message matches (meaning
33 that the ack was delivered by RMr to the correct thread's chute.
35 The message format is 'binary' defined by the lc_msg struct.
37 Parms: argv[1] == number of msgs to send (10)
38 argv[2] == delay (mu-seconds, 1000000 default)
39 argv[3] == number of threads (3)
40 argv[4] == listen port
42 Sender will send for at most 20 seconds, so if nmsgs and delay extend
43 beyond that period the total number of messages sent will be less
47 Author: E. Scott Daniels
55 #include <sys/epoll.h>
62 #define TRACE_SIZE 40 // bytes in header to provide for trace junk
68 typedef struct tdata {
69 int id; // the id we'll pass to RMr mt-call function NOT the thread id
70 int n2send; // number of messages to send
71 int delay; // ms delay between messages
72 void* mrc; // RMr context
74 int* in_bins; // latency count bins
76 int nbins; // number of bins allocated
79 int out_oor; // out of range count
81 int in_bcount; // total messages tracked in bins
82 int out_bcount; // total messages tracked in bins
87 The message type placed into the payload.
89 typedef struct lc_msg {
90 struct timespec out_ts; // time just before call executed
91 struct timespec turn_ts; // time at the receiver, on receipt
92 struct timespec in_ts; // time received back by the caller
93 int out_retries; // number of retries required to send
94 int turn_retries; // number of retries required to send
97 // --------------------------------------------------------------------------------
100 static int sum( char* str ) {
105 sum += *(str++) + i++;
111 static void print_stats( tdata_t* td, int out, int hist ) {
112 int sum; // sum of latencies
113 int csum = 0; // cutoff sum
114 int i95 = 0; // bin for the 95th count
115 int i99 = 0; // bin for the 99th count
117 int cutoff_95; // 95% of total messages
118 int cutoff_99; // 99% of total messages
124 cutoff_95 = .95 * (td->out_oor + td->out_bcount);
125 cutoff_99 = .95 * (td->out_oor + td->out_bcount);
129 cutoff_95 = .95 * (td->in_oor + td->in_bcount);
130 cutoff_99 = .95 * (td->in_oor + td->in_bcount);
136 for( j = 0; j < td->nbins; j++ ) {
137 if( csum < cutoff_95 ) {
140 if( csum < cutoff_99 ) {
145 csum += td->out_bins[j];
146 sum += td->out_bins[j] * j;
148 csum += td->in_bins[j];
149 sum += td->in_bins[j] * j;
154 if( td->out_bcount ) {
155 mean = sum/(td->out_bcount);
158 if( td->in_bcount ) {
159 mean = sum/(td->in_bcount);
164 for( j = 0; j < td->nbins; j++ ) {
165 fprintf( stderr, "%3d %d\n", j, out ? td->out_bins[j] : td->in_bins[j] );
169 fprintf( stderr, "%s: oor=%d max=%.2fms mean=%.2fms 95th=%.2fms 99th=%.2f\n",
170 out ? "out" : " in", oor, (double)max/1000000.0, (double)mean/100.0, (double) i95/100.0, i99/100.0 );
174 Given a message, compute the in/out and round trip latencies.
176 static void compute_latency( tdata_t* td, lc_msg_t* lcm ) {
180 double rtl; // round trip latency
181 double outl; // caller to receiver latency (out)
182 double inl; // receiver to caller latency (in)
185 if( lcm == NULL || td == NULL ) {
189 out = (lcm->out_ts.tv_sec * 1000000000) + lcm->out_ts.tv_nsec;
190 in = (lcm->in_ts.tv_sec * 1000000000) + lcm->in_ts.tv_nsec;
191 turn = (lcm->turn_ts.tv_sec * 1000000000) + lcm->turn_ts.tv_nsec;
193 if( in - turn > td->in_max ) {
194 td->in_max = in - turn;
196 if( turn - out > td->out_max ) {
197 td->out_max = turn-out;
200 bin = (turn-out) / 10000; // 100ths of ms
203 outl = ((double) turn - out) / 1000000.0; // convert to ms
204 inl = ((double) in - turn) / 1000000.0;
205 rtl = ((double) in - out) / 1000000.0;
207 fprintf( stderr, "outl = %5.3fms inl = %5.3fms rtl = %5.3fms bin=%d\n", outl, inl, rtl, bin );
210 bin = (turn - out) / 10000; // 100ths of ms
211 if( bin < td->nbins ) {
218 bin = (in - turn) / 10000; // 100ths of ms
219 if( bin < td->nbins ) {
230 Executed as a thread, this puppy will generate calls to ensure that we get the
231 response back to the right thread, that we can handle threads, etc.
233 static void* mk_calls( void* data ) {
234 lc_msg_t* lcm; // pointer at the payload as a struct
236 rmr_mbuf_t* sbuf; // send buffer
239 int rt_count = 0; // number of messages requiring a spin retry
240 int ok_msg = 0; // received messages that were sent by us
241 int bad_msg = 0; // received messages that were sent by a different thread
243 int fail_count = 0; // # of failure sends after first successful send
244 int successful = 0; // set to true after we have a successful send
245 char xbuf[1024]; // build transaction string here
250 if( (control = (tdata_t *) data) == NULL ) {
251 fprintf( stderr, "thread data was nil; bailing out\n" );
253 //fprintf( stderr, "<THRD> thread started with parallel call id=%d sending=%d delay=%d\n", control->id, control->n2send, control->delay );
255 sbuf = rmr_alloc_msg( control->mrc, 512 ); // alloc first send buffer; subsequent buffers allcoated on send
257 usleep( rand() % 777 ); // stagger starts a bit so that they all don't pile up on the first connections
259 while( count < control->n2send ) { // we send n messages after the first message is successful
260 lcm = (lc_msg_t *) sbuf->payload;
262 snprintf( xbuf, 200, "%31d", xaction_id );
263 xaction_id += control->id;
264 rmr_bytes2xact( sbuf, xbuf, 32 );
266 sbuf->mtype = 5; // all go with the same type
267 sbuf->len = sizeof( *lcm );
268 sbuf->state = RMR_OK;
269 lcm->out_retries = 0;
270 lcm->turn_retries = 0;
271 clock_gettime( CLOCK_REALTIME, &lcm->out_ts ); // mark time out
272 sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 1000 ); // send it (send returns an empty payload on success, or the original payload on fail/retry)
274 if( sbuf && sbuf->state == RMR_ERR_RETRY ) { // number of times we had to spin to send
277 while( sbuf != NULL && sbuf->state == RMR_ERR_RETRY ) { // send blocked; keep trying
279 sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 100 ); // call and wait up to 100ms for a response
284 switch( sbuf->state ) {
285 case RMR_OK: // we should have a buffer back from the sender here
286 lcm = (lc_msg_t *) sbuf->payload;
287 clock_gettime( CLOCK_REALTIME, &lcm->in_ts ); // mark time back
289 compute_latency( control, lcm );
292 //fprintf( stderr, "%d have received %d\n", control->id, count );
296 fprintf( stderr, "unexpected error: tid=%d rmr-state=%d ernro=%d\n", control->id, sbuf->state, errno );
297 sbuf = rmr_alloc_msg( control->mrc, 512 ); // allocate a sendable buffer
299 fail_count++; // count failures after first successful message
301 // some error (not connected likely), don't count this
307 //fprintf( stderr, "<THRD> tid=%-2d call finished, no sbuf\n", control->id );
308 sbuf = rmr_alloc_msg( control->mrc, 512 ); // loop expects an subf
312 if( control->delay > 0 ) {
313 usleep( control->delay );
317 control->state = SUCCESS;
318 fprintf( stderr, "<THRD> %d finished sent %d, received %d messages\n", control->id, count, ack_count );
322 int main( int argc, char** argv ) {
323 void* mrc; // msg router context
324 rmr_mbuf_t* rbuf = NULL; // received on 'normal' flow
325 struct epoll_event events[1]; // list of events to give to epoll
326 struct epoll_event epe; // event definition for event to listen to
327 int ep_fd = -1; // epoll's file des (given to epoll_wait)
328 int rcv_fd; // file des that NNG tickles -- give this to epoll to listen on
329 int nready; // number of events ready for receive
330 char* listen_port = "43086";
332 int delay = 100000; // usec between send attempts
333 int nmsgs = 10; // number of messages to send
337 tdata_t* cvs; // vector of control blocks
340 pthread_t* pt_info; // thread stuff
342 int pings = 0; // number of messages received on normal channel
345 nmsgs = atoi( argv[1] );
348 delay = atoi( argv[2] );
351 nthreads = atoi( argv[3] );
354 listen_port = argv[4];
357 fprintf( stderr, "<CALL> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
359 if( (mrc = rmr_init( listen_port, 1400, RMRFL_MTCALL )) == NULL ) { // initialise with multi-threaded call enabled
360 fprintf( stderr, "<CALL> unable to initialise RMr\n" );
364 //rmr_init_trace( mrc, TRACE_SIZE );
366 if( (rcv_fd = rmr_get_rcvfd( mrc )) >= 0 ) { // epoll only available from NNG -- skip receive later if not NNG
368 fprintf( stderr, "<CALL> unable to set up polling fd\n" );
371 if( (ep_fd = epoll_create1( 0 )) < 0 ) {
372 fprintf( stderr, "<CALL> [FAIL] unable to create epoll fd: %d\n", errno );
375 epe.events = EPOLLIN;
376 epe.data.fd = rcv_fd;
378 if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 ) {
379 fprintf( stderr, "<CALL> [FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
383 rmr_set_rtimeout( mrc, 0 ); // for nano we must set the receive timeout to 0; non-blocking receive
387 cvs = malloc( sizeof( tdata_t ) * nthreads );
388 pt_info = malloc( sizeof( pthread_t ) * nthreads );
390 fprintf( stderr, "<CALL> unable to allocate control vector\n" );
395 timeout = time( NULL ) + 20; // give rmr 20s to find the route table (shouldn't need that much)
396 while( ! rmr_ready( mrc ) ) { // must have a route table before we can send; wait til RMr says it has one
397 fprintf( stderr, "<CALL> waiting for rmr to show ready\n" );
400 if( time( NULL ) > timeout ) {
401 fprintf( stderr, "<CALL> giving up\n" );
405 fprintf( stderr, "<CALL> rmr is ready; starting threads\n" );
407 for( i = 0; i < nthreads; i++ ) {
409 cvs[i].id = i + 2; // we pass this as the call-id to rmr, so must be >1
410 cvs[i].delay = delay;
411 cvs[i].n2send = nmsgs;
415 cvs[i].out_bins = (int *) malloc( sizeof( int ) * cvs[i].nbins );
416 cvs[i].in_bins = (int *) malloc( sizeof( int ) * cvs[i].nbins );
417 memset( cvs[i].out_bins, 0, sizeof( int ) * cvs[i].nbins );
418 memset( cvs[i].in_bins, 0, sizeof( int ) * cvs[i].nbins );
420 pthread_create( &pt_info[i], NULL, mk_calls, &cvs[i] ); // kick a thread
423 timeout = time( NULL ) + 20;
425 while( nthreads > 0 ) {
426 if( cvs[i].state < 1 ) { // states 0 or below indicate done. 0 == failure, -n == success
427 //print_stats( &cvs[i], 1, i == 0 );
428 print_stats( &cvs[i], 1, 0 );
429 print_stats( &cvs[i], 0, 0 );
432 if( cvs[i].state == 0 ) {
438 rbuf = rmr_torcv_msg( mrc, rbuf, 1000 );
439 if( rbuf != NULL && rbuf->state != RMR_ERR_RETRY ) {
441 rmr_free_msg( rbuf );
445 if( time( NULL ) > timeout ) {
446 failures += nthreads;
447 fprintf( stderr, "<CALL> timeout waiting for threads to finish; %d were not finished\n", nthreads );
452 fprintf( stderr, "<CALL> [%s] failing threads=%d pings reeived=%d\n", failures == 0 ? "PASS" : "FAIL", failures, pings );