From 3925774e739509bb51df2c81addb3ab742c1801f Mon Sep 17 00:00:00 2001 From: "E. Scott Daniels" Date: Thu, 6 Jun 2019 16:44:33 +0000 Subject: [PATCH] fix(rtable): Prevent direct send hairpins If the current node is listed as an endpoint for a message type that this process would add to the route table a hair-pin loop back to the node can happen. This isn't desireable and this change will prevent the current node from being added as a recipient endpoint to any route table entry. Signed-off-by: E. Scott Daniels Change-Id: I5a1b2bdaad2eab499ae5a1c7430238c1da2f3256 Add latency call/receive test support Signed-off-by: E. Scott Daniels Change-Id: I70be670d063adfdf6505431a4a2ce72e846df0a4 Actual mods to implement hairpin loop avodiance Signed-off-by: E. Scott Daniels Change-Id: Ieead4fbf69ade58db1f37d1949cf0f03683c64de Signed-off-by: E. Scott Daniels --- CMakeLists.txt | 2 +- src/rmr/common/include/rmr_agnostic.h | 1 + src/rmr/common/src/rt_generic_static.c | 11 +- src/rmr/common/src/tools_static.c | 49 +++- src/rmr/nng/src/rtable_nng_static.c | 4 +- test/app_test/Makefile | 7 + test/app_test/lcaller.c | 458 +++++++++++++++++++++++++++++++++ test/app_test/lreceiver.c | 178 +++++++++++++ test/app_test/run_all.ksh | 2 +- test/app_test/run_app_test.ksh | 130 +++++++--- test/app_test/run_lcall_test.ksh | 235 +++++++++++++++++ test/sr_nano_static_test.c | 6 +- 12 files changed, 1039 insertions(+), 44 deletions(-) create mode 100644 test/app_test/lcaller.c create mode 100644 test/app_test/lreceiver.c create mode 100644 test/app_test/run_lcall_test.ksh diff --git a/CMakeLists.txt b/CMakeLists.txt index 40660c1..2d40e80 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -23,7 +23,7 @@ cmake_minimum_required( VERSION 3.5 ) set( major_version "1" ) # should be automatically populated from git tag later, but until CI process sets a tag we use this set( minor_version "0" ) -set( patch_level "29" ) +set( patch_level "30" ) set( install_root "${CMAKE_INSTALL_PREFIX}" ) set( install_lib "lib" ) diff --git a/src/rmr/common/include/rmr_agnostic.h b/src/rmr/common/include/rmr_agnostic.h index 465f60a..9000c29 100644 --- a/src/rmr/common/include/rmr_agnostic.h +++ b/src/rmr/common/include/rmr_agnostic.h @@ -245,6 +245,7 @@ typedef struct chute { //---- tools ---------------------------------- static int has_myip( char const* buf, if_addrs_t* list, char sep, int max ); static int uta_tokenise( char* buf, char** tokens, int max, char sep ); +static int uta_rmip_tokenise( char* buf, if_addrs_t* iplist, char** toks, int max, char sep ); static char* uta_h2ip( char const* hname ); static int uta_lookup_rtg( uta_ctx_t* ctx ); static int uta_has_str( char const* buf, char const* str, char sep, int max ); diff --git a/src/rmr/common/src/rt_generic_static.c b/src/rmr/common/src/rt_generic_static.c index 9b01100..d51a44c 100644 --- a/src/rmr/common/src/rt_generic_static.c +++ b/src/rmr/common/src/rt_generic_static.c @@ -270,17 +270,20 @@ static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* r rte = uta_add_rte( ctx->new_rtable, key, ngtoks ); // get/create entry for this key for( grp = 0; grp < ngtoks; grp++ ) { - if( (ntoks = uta_tokenise( gtokens[grp], tokens, 64, ',' )) > 0 ) { + if( (ntoks = uta_rmip_tokenise( gtokens[grp], ctx->ip_list, tokens, 64, ',' )) > 0 ) { // remove any referneces to our ip addrs for( i = 0; i < ntoks; i++ ) { - if( DEBUG > 1 || (vlevel > 1)) fprintf( stderr, "[DBUG] add endpoint %s\n", ts_field ); - uta_add_ep( ctx->new_rtable, rte, tokens[i], grp ); + if( strcmp( tokens[i], ctx->my_name ) != 0 ) { // don't add if it is us -- cannot send to ourself + if( DEBUG > 1 || (vlevel > 1)) fprintf( stderr, "[DBUG] add endpoint ts=%s %s\n", ts_field, tokens[i] ); + uta_add_ep( ctx->new_rtable, rte, tokens[i], grp ); + } } } } } } else { - if( DEBUG || (vlevel > 2) ) + if( DEBUG || (vlevel > 2) ) { fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] ); + } } } diff --git a/src/rmr/common/src/tools_static.c b/src/rmr/common/src/tools_static.c index 67adc85..069911d 100644 --- a/src/rmr/common/src/tools_static.c +++ b/src/rmr/common/src/tools_static.c @@ -22,6 +22,7 @@ Mnemonic: tools_static.c Abstract: A small set of very simple tools to support Uta == RMR. uta_tokenise -- simple string tokeniser + uta_rmip_tokenise -- tokenise and remove ip addresses from the list uta_h2ip -- look up host name and return an ip address uta_lookup_rtg -- looks in env for rtg host:port uta_has_str -- searches buffer of tokens for a string @@ -50,6 +51,11 @@ #include #include +// --- some protos needed for better organisation -------- +int is_this_myip( if_addrs_t* l, char* addr ); + + +// ---------------------------------------------------------------------------------- /* Simple tokeniser. Split a null terminated string into tokens recording the @@ -77,6 +83,43 @@ static int uta_tokenise( char* buf, char** tokens, int max, char sep ) { return n; } +/* + Tokenise and remove matches. + Given a buffer of 'sep' separated tokens, and a list of things, + return up to max tokens with any tokens that matched things in + the list. Toks is the user supplied array of char* which we will + fill in (up to max) with pointers to tokens from buf. This + damages buf, so the caller must dup the string if it must be + preserved for later, original, use. The pointers returned in + toks will reference portions of bufs. + + Returns the number of tokens referenced by toks. +*/ +static int uta_rmip_tokenise( char* buf, if_addrs_t* iplist, char** toks, int max, char sep ) { + int ntoks = 0; // total toks in the original buffer + int pcount = 0; // count after prune + char** all_toks; + int i; + int j; + + + all_toks = malloc( sizeof( char * ) * max ); // refernce to all tokens; we'll prune + pcount = ntoks = uta_tokenise( buf, all_toks, max, sep ); // split them up + j = 0; + if( ntoks > 0 ) { + for( i = 0; i < ntoks; i++ ) { + if( is_this_myip( iplist, all_toks[i] ) ) { + pcount--; // ours, prune + } else { + toks[j++] = all_toks[i]; // not one of ours, keep it + } + } + } + + free( all_toks ); + return pcount; +} + /* Xlate hostname (expected to be name:port) to an IP address that nano will tolerate. We'll use the first address from the list to keep it simple. If the first character @@ -330,8 +373,12 @@ int is_this_myip( if_addrs_t* l, char* addr ) { return 0; } + if( addr == NULL ) { + return 0; + } + for( i = 0; i < l->naddrs; i++ ) { - if( strcmp( addr, l->addrs[i] ) == 0 ) { + if( l->addrs[i] != NULL && strcmp( addr, l->addrs[i] ) == 0 ) { return 1; } } diff --git a/src/rmr/nng/src/rtable_nng_static.c b/src/rmr/nng/src/rtable_nng_static.c index 9f3092a..fefa63d 100644 --- a/src/rmr/nng/src/rtable_nng_static.c +++ b/src/rmr/nng/src/rtable_nng_static.c @@ -225,7 +225,7 @@ static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_s if( ! ep->open ) { // not open -- connect now if( DEBUG ) fprintf( stderr, "[DBUG] get ep by name for %s session not started... starting\n", ep_name ); if( ep->addr == NULL ) { // name didn't resolve before, try again - ep->addr = uta_h2ip( ep->name ); + ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup } if( uta_link2( ep ) ) { // find entry in table and create link state = TRUE; @@ -335,7 +335,7 @@ static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, if( state ) { // end point selected, open if not, get socket either way if( ! ep->open ) { // not connected if( ep->addr == NULL ) { // name didn't resolve before, try again - ep->addr = uta_h2ip( ep->name ); + ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup } if( uta_link2( ep ) ) { // find entry in table and create link diff --git a/test/app_test/Makefile b/test/app_test/Makefile index 07748ee..67acb9f 100644 --- a/test/app_test/Makefile +++ b/test/app_test/Makefile @@ -55,6 +55,9 @@ receiver: receiver.c mt_receiver: receiver.c gcc -I $${C_INCLUDE_PATH:-.} -DMTC $< -g -o $@ -lrmr_nng -lnng -lpthread -lm +lreceiver: lreceiver.c + gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr_nng -lnng -lpthread -lm + sender_nano: sender.c gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr -lnanomsg -lpthread -lm @@ -64,6 +67,10 @@ sender: sender.c caller: caller.c gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr_nng -lnng -lpthread -lm +lcaller: lcaller.c + gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr_nng -lnng -lpthread -lm + + # clean removes intermediates; nuke removes everything that can be built .PHONY: clean nuke diff --git a/test/app_test/lcaller.c b/test/app_test/lcaller.c new file mode 100644 index 0000000..0d246d8 --- /dev/null +++ b/test/app_test/lcaller.c @@ -0,0 +1,458 @@ +// :vim ts=4 sw=4 noet: +/* +================================================================================== + Copyright (c) 2019 Nokia + Copyright (c) 2018-2019 AT&T Intellectual Property. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +================================================================================== +*/ + +/* + Mnemonic: lcaller.c + Abstract: This is a simple sender which will send a series of messages using + rmr_call(). Similar to caller.c the major difference is that + a timestamp is placed into the message and the receiver is expected + to add a timestamp before executing an rts call. We can then + compute the total round trip latency as well as the forward send + latency. + + Overall, N threads are started each sending the desired number + of messages and expecting an 'ack' for each. Each ack is examined + to verify that the thread id placed into the message matches (meaning + that the ack was delivered by RMr to the correct thread's chute. + + The message format is 'binary' defined by the lc_msg struct. + + Parms: argv[1] == number of msgs to send (10) + argv[2] == delay (mu-seconds, 1000000 default) + argv[3] == number of threads (3) + argv[4] == listen port + + Sender will send for at most 20 seconds, so if nmsgs and delay extend + beyond that period the total number of messages sent will be less + than n. + + Date: 18 April 2019 + Author: E. Scott Daniels +*/ + +#include +#include +#include +#include +#include +#include +#include +#include + + +#include + +#define TRACE_SIZE 40 // bytes in header to provide for trace junk +#define SUCCESS (-1) + +/* + Thread data +*/ +typedef struct tdata { + int id; // the id we'll pass to RMr mt-call function NOT the thread id + int n2send; // number of messages to send + int delay; // ms delay between messages + void* mrc; // RMr context + int state; + int* in_bins; // latency count bins + int* out_bins; + int nbins; // number of bins allocated + long long in_max; + long long out_max; + int out_oor; // out of range count + int in_oor; + int in_bcount; // total messages tracked in bins + int out_bcount; // total messages tracked in bins +} tdata_t; + + +/* + The message type placed into the payload. +*/ +typedef struct lc_msg { + struct timespec out_ts; // time just before call executed + struct timespec turn_ts; // time at the receiver, on receipt + struct timespec in_ts; // time received back by the caller + int out_retries; // number of retries required to send + int turn_retries; // number of retries required to send +} lc_msg_t; + +// -------------------------------------------------------------------------------- + + +static int sum( char* str ) { + int sum = 0; + int i = 0; + + while( *str ) { + sum += *(str++) + i++; + } + + return sum % 255; +} + +static void print_stats( tdata_t* td, int out, int hist ) { + int sum; // sum of latencies + int csum = 0; // cutoff sum + int i95 = 0; // bin for the 95th count + int i99 = 0; // bin for the 99th count + int mean = -1; + int cutoff_95; // 95% of total messages + int cutoff_99; // 99% of total messages + int oor; + int max; + int j; + + if( out ) { + cutoff_95 = .95 * (td->out_oor + td->out_bcount); + cutoff_99 = .95 * (td->out_oor + td->out_bcount); + oor = td->out_oor; + max = td->out_max; + } else { + cutoff_95 = .95 * (td->in_oor + td->in_bcount); + cutoff_99 = .95 * (td->in_oor + td->in_bcount); + oor = td->in_oor; + max = td->in_max; + } + + sum = 0; + for( j = 0; j < td->nbins; j++ ) { + if( csum < cutoff_95 ) { + i95++; + } + if( csum < cutoff_99 ) { + i99++; + } + + if( out ) { + csum += td->out_bins[j]; + sum += td->out_bins[j] * j; + } else { + csum += td->in_bins[j]; + sum += td->in_bins[j] * j; + } + } + + if( out ) { + if( td->out_bcount ) { + mean = sum/(td->out_bcount); + } + } else { + if( td->in_bcount ) { + mean = sum/(td->in_bcount); + } + } + + if( hist ) { + for( j = 0; j < td->nbins; j++ ) { + fprintf( stderr, "%3d %d\n", j, out ? td->out_bins[j] : td->in_bins[j] ); + } + } + + fprintf( stderr, "%s: oor=%d max=%.2fms mean=%.2fms 95th=%.2fms 99th=%.2f\n", + out ? "out" : " in", oor, (double)max/1000000.0, (double)mean/100.0, (double) i95/100.0, i99/100.0 ); +} + +/* + Given a message, compute the in/out and round trip latencies. +*/ +static void compute_latency( tdata_t* td, lc_msg_t* lcm ) { + long long out; + long long turn; + long long in; + double rtl; // round trip latency + double outl; // caller to receiver latency (out) + double inl; // receiver to caller latency (in) + int bin; + + if( lcm == NULL || td == NULL ) { + return; + } + + out = (lcm->out_ts.tv_sec * 1000000000) + lcm->out_ts.tv_nsec; + in = (lcm->in_ts.tv_sec * 1000000000) + lcm->in_ts.tv_nsec; + turn = (lcm->turn_ts.tv_sec * 1000000000) + lcm->turn_ts.tv_nsec; + + if( in - turn > td->in_max ) { + td->in_max = in - turn; + } + if( turn - out > td->out_max ) { + td->out_max = turn-out; + } + + bin = (turn-out) / 10000; // 100ths of ms + +#ifdef PRINT + outl = ((double) turn - out) / 1000000.0; // convert to ms + inl = ((double) in - turn) / 1000000.0; + rtl = ((double) in - out) / 1000000.0; + + fprintf( stderr, "outl = %5.3fms inl = %5.3fms rtl = %5.3fms bin=%d\n", outl, inl, rtl, bin ); +#else + + bin = (turn - out) / 10000; // 100ths of ms + if( bin < td->nbins ) { + td->out_bins[bin]++; + td->out_bcount++; + } else { + td->out_oor++; + } + + bin = (in - turn) / 10000; // 100ths of ms + if( bin < td->nbins ) { + td->in_bins[bin]++; + td->in_bcount++; + } else { + td->in_oor++; + } + +#endif +} + +/* + Executed as a thread, this puppy will generate calls to ensure that we get the + response back to the right thread, that we can handle threads, etc. +*/ +static void* mk_calls( void* data ) { + lc_msg_t* lcm; // pointer at the payload as a struct + tdata_t* control; + rmr_mbuf_t* sbuf; // send buffer + int count = 0; + int ack_count = 0; + int rt_count = 0; // number of messages requiring a spin retry + int ok_msg = 0; // received messages that were sent by us + int bad_msg = 0; // received messages that were sent by a different thread + int drops = 0; + int fail_count = 0; // # of failure sends after first successful send + int successful = 0; // set to true after we have a successful send + char xbuf[1024]; // build transaction string here + int xaction_id = 1; + char* tok; + int state = 0; + + if( (control = (tdata_t *) data) == NULL ) { + fprintf( stderr, "thread data was nil; bailing out\n" ); + } + //fprintf( stderr, " thread started with parallel call id=%d sending=%d delay=%d\n", control->id, control->n2send, control->delay ); + + sbuf = rmr_alloc_msg( control->mrc, 512 ); // alloc first send buffer; subsequent buffers allcoated on send + + usleep( rand() % 777 ); // stagger starts a bit so that they all don't pile up on the first connections + + while( count < control->n2send ) { // we send n messages after the first message is successful + lcm = (lc_msg_t *) sbuf->payload; + + snprintf( xbuf, 200, "%31d", xaction_id ); + xaction_id += control->id; + rmr_bytes2xact( sbuf, xbuf, 32 ); + + sbuf->mtype = 5; // all go with the same type + sbuf->len = sizeof( *lcm ); + sbuf->state = RMR_OK; + lcm->out_retries = 0; + lcm->turn_retries = 0; + clock_gettime( CLOCK_REALTIME, &lcm->out_ts ); // mark time out + sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 1000 ); // send it (send returns an empty payload on success, or the original payload on fail/retry) + + if( sbuf && sbuf->state == RMR_ERR_RETRY ) { // number of times we had to spin to send + rt_count++; + } + while( sbuf != NULL && sbuf->state == RMR_ERR_RETRY ) { // send blocked; keep trying + lcm->out_retries++; + sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 100 ); // call and wait up to 100ms for a response + } + + count++; + if( sbuf != NULL ) { + switch( sbuf->state ) { + case RMR_OK: // we should have a buffer back from the sender here + lcm = (lc_msg_t *) sbuf->payload; + clock_gettime( CLOCK_REALTIME, &lcm->in_ts ); // mark time back + successful = 1; + compute_latency( control, lcm ); + + ack_count++; + //fprintf( stderr, "%d have received %d\n", control->id, count ); + break; + + default: + fprintf( stderr, "unexpected error: tid=%d rmr-state=%d ernro=%d\n", control->id, sbuf->state, errno ); + sbuf = rmr_alloc_msg( control->mrc, 512 ); // allocate a sendable buffer + if( successful ) { + fail_count++; // count failures after first successful message + } else { + // some error (not connected likely), don't count this + sleep( 1 ); + } + break; + } + } else { + //fprintf( stderr, " tid=%-2d call finished, no sbuf\n", control->id ); + sbuf = rmr_alloc_msg( control->mrc, 512 ); // loop expects an subf + drops++; + } + + if( control->delay > 0 ) { + usleep( control->delay ); + } + } + + control->state = SUCCESS; + fprintf( stderr, " %d finished sent %d, received %d messages\n", control->id, count, ack_count ); + return NULL; +} + +int main( int argc, char** argv ) { + void* mrc; // msg router context + rmr_mbuf_t* rbuf = NULL; // received on 'normal' flow + struct epoll_event events[1]; // list of events to give to epoll + struct epoll_event epe; // event definition for event to listen to + int ep_fd = -1; // epoll's file des (given to epoll_wait) + int rcv_fd; // file des that NNG tickles -- give this to epoll to listen on + int nready; // number of events ready for receive + char* listen_port = "43086"; + long timeout = 0; + int delay = 100000; // usec between send attempts + int nmsgs = 10; // number of messages to send + int nthreads = 3; + int cutoff; + int sum; + tdata_t* cvs; // vector of control blocks + int i; + int j; + pthread_t* pt_info; // thread stuff + int failures = 0; + int pings = 0; // number of messages received on normal channel + + if( argc > 1 ) { + nmsgs = atoi( argv[1] ); + } + if( argc > 2 ) { + delay = atoi( argv[2] ); + } + if( argc > 3 ) { + nthreads = atoi( argv[3] ); + } + if( argc > 4 ) { + listen_port = argv[4]; + } + + fprintf( stderr, " listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay ); + + if( (mrc = rmr_init( listen_port, 1400, RMRFL_MTCALL )) == NULL ) { // initialise with multi-threaded call enabled + fprintf( stderr, " unable to initialise RMr\n" ); + exit( 1 ); + } + + //rmr_init_trace( mrc, TRACE_SIZE ); + + if( (rcv_fd = rmr_get_rcvfd( mrc )) >= 0 ) { // epoll only available from NNG -- skip receive later if not NNG + if( rcv_fd < 0 ) { + fprintf( stderr, " unable to set up polling fd\n" ); + exit( 1 ); + } + if( (ep_fd = epoll_create1( 0 )) < 0 ) { + fprintf( stderr, " [FAIL] unable to create epoll fd: %d\n", errno ); + exit( 1 ); + } + epe.events = EPOLLIN; + epe.data.fd = rcv_fd; + + if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 ) { + fprintf( stderr, " [FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) ); + exit( 1 ); + } + } else { + rmr_set_rtimeout( mrc, 0 ); // for nano we must set the receive timeout to 0; non-blocking receive + } + + + cvs = malloc( sizeof( tdata_t ) * nthreads ); + pt_info = malloc( sizeof( pthread_t ) * nthreads ); + if( cvs == NULL ) { + fprintf( stderr, " unable to allocate control vector\n" ); + exit( 1 ); + } + + + timeout = time( NULL ) + 20; // give rmr 20s to find the route table (shouldn't need that much) + while( ! rmr_ready( mrc ) ) { // must have a route table before we can send; wait til RMr says it has one + fprintf( stderr, " waiting for rmr to show ready\n" ); + sleep( 1 ); + + if( time( NULL ) > timeout ) { + fprintf( stderr, " giving up\n" ); + exit( 1 ); + } + } + fprintf( stderr, " rmr is ready; starting threads\n" ); + + for( i = 0; i < nthreads; i++ ) { + cvs[i].mrc = mrc; + cvs[i].id = i + 2; // we pass this as the call-id to rmr, so must be >1 + cvs[i].delay = delay; + cvs[i].n2send = nmsgs; + cvs[i].state = 1; + + cvs[i].nbins = 100; + cvs[i].out_bins = (int *) malloc( sizeof( int ) * cvs[i].nbins ); + cvs[i].in_bins = (int *) malloc( sizeof( int ) * cvs[i].nbins ); + memset( cvs[i].out_bins, 0, sizeof( int ) * cvs[i].nbins ); + memset( cvs[i].in_bins, 0, sizeof( int ) * cvs[i].nbins ); + + pthread_create( &pt_info[i], NULL, mk_calls, &cvs[i] ); // kick a thread + } + + timeout = time( NULL ) + 20; + i = 0; + while( nthreads > 0 ) { + if( cvs[i].state < 1 ) { // states 0 or below indicate done. 0 == failure, -n == success + //print_stats( &cvs[i], 1, i == 0 ); + print_stats( &cvs[i], 1, 0 ); + print_stats( &cvs[i], 0, 0 ); + + nthreads--; + if( cvs[i].state == 0 ) { + failures++; + } + i++; + } else { + // sleep( 1 ); + rbuf = rmr_torcv_msg( mrc, rbuf, 1000 ); + if( rbuf != NULL && rbuf->state != RMR_ERR_RETRY ) { + pings++; + rmr_free_msg( rbuf ); + rbuf = NULL; + } + } + if( time( NULL ) > timeout ) { + failures += nthreads; + fprintf( stderr, " timeout waiting for threads to finish; %d were not finished\n", nthreads ); + break; + } + } + + fprintf( stderr, " [%s] failing threads=%d pings reeived=%d\n", failures == 0 ? "PASS" : "FAIL", failures, pings ); + sleep( 2 ); + rmr_close( mrc ); + + return failures > 0; +} + diff --git a/test/app_test/lreceiver.c b/test/app_test/lreceiver.c new file mode 100644 index 0000000..161abc7 --- /dev/null +++ b/test/app_test/lreceiver.c @@ -0,0 +1,178 @@ +// :vim ts=4 sw=4 noet: +/* +================================================================================== + Copyright (c) 2019 Nokia + Copyright (c) 2018-2019 AT&T Intellectual Property. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +================================================================================== +*/ + +/* + Mnemonic: rmr_rcvr.c + Abstract: This is a very simple receiver that listens for messages and + returns each to the sender after adding a timestamp to the + payload. The payload is expected to be lc_msg_t (see lcaller.c) + and this will update the 'turn' timestamp on receipt. + + Define these environment variables to have some control: + RMR_SEED_RT -- path to the static routing table + RMR_RTG_SVC -- port to listen for RTG connections + + Date: 18 April 2019 + Author: E. Scott Daniels +*/ + +#include +#include +#include +#include +#include +#include + +#include + +/* + The message type placed into the payload. +*/ +typedef struct lc_msg { + struct timespec out_ts; // time just before call executed + struct timespec turn_ts; // time at the receiver, on receipt + struct timespec in_ts; // time received back by the caller + int out_retries; // number of retries required to send + int turn_retries; // number of retries required to send +} lc_msg_t; + +// ---------------------------------------------------------------------------------- + +static int sum( char* str ) { + int sum = 0; + int i = 0; + + while( *str ) { + sum += *(str++) + i++; + } + + return sum % 255; +} + +/* + Split the message at the first sep and return a pointer to the first + character after. +*/ +static char* split( char* str, char sep ) { + char* s; + + s = strchr( str, sep ); + + if( s ) { + return s+1; + } + + fprintf( stderr, " no pipe in message: (%s)\n", str ); + return NULL; +} + +int main( int argc, char** argv ) { + void* mrc; // msg router context + lc_msg_t* lmc; // latency message type from caller + rmr_mbuf_t* msg = NULL; // message received + int i; + int errors = 0; + char* listen_port = "4560"; + long count = 0; // total received + long timeout = 0; + char* data; + int nmsgs = 10; // number of messages to stop after (argv[1] overrides) + int rt_count = 0; // retry count + time_t now; + int active; + + data = getenv( "RMR_RTG_SVC" ); + if( data == NULL ) { + setenv( "RMR_RTG_SVC", "19289", 1 ); // set one that won't collide with the sender if on same host + } + + if( argc > 1 ) { + nmsgs = atoi( argv[1] ); + } + if( argc > 2 ) { + listen_port = argv[2]; + } + + + fprintf( stderr, " listening on port: %s for a max of %d messages\n", listen_port, nmsgs ); + + mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_MTCALL ); // start your engines! + //mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, 0 ); // start your engines! + if( mrc == NULL ) { + fprintf( stderr, " ABORT: unable to initialise RMr\n" ); + exit( 1 ); + } + + timeout = time( NULL ) + 20; + while( ! rmr_ready( mrc ) ) { // wait for RMr to load a route table + fprintf( stderr, " waiting for RMr to show ready\n" ); + sleep( 1 ); + + if( time( NULL ) > timeout ) { + fprintf( stderr, " giving up\n" ); + exit( 1 ); + } + } + fprintf( stderr, " rmr now shows ready, listening begins\n" ); + + timeout = time( NULL ) + 2; // once we start, we assume if we go 2s w/o a message that we're done + //while( count < nmsgs ) { + while( 1 ) { + active = 0; + msg = rmr_torcv_msg( mrc, msg, 1000 ); // pop every second or so to timeout if needed + + if( msg ) { + active = 1; + if( msg->state == RMR_OK ) { + lmc = (lc_msg_t *) msg->payload; + clock_gettime( CLOCK_REALTIME, &lmc->turn_ts ); // mark time that we received it. + count++; + + msg = rmr_rts_msg( mrc, msg ); + rt_count = 1000; + while( rt_count > 0 && msg != NULL && msg->state == RMR_ERR_RETRY ) { // to work right in nano we need this :( + lmc->turn_retries++; + if( count < 1 ) { // 1st msg, so we need to connect, and we'll wait for that + sleep( 1 ); + } + rt_count--; + msg = rmr_rts_msg( mrc, msg ); // we don't try to resend if this returns retry + } + } + } + + now = time( NULL ); + if( now > timeout ) { + break; + } + + if( active ) { + timeout = now + 2; + } + } + + fprintf( stderr, " %ld is finished got %ld messages\n", (long) getpid(), count ); + + + sleep( 3 ); + rmr_close( mrc ); + return 0; +} + diff --git a/test/app_test/run_all.ksh b/test/app_test/run_all.ksh index ffcd3e1..7271cfd 100644 --- a/test/app_test/run_all.ksh +++ b/test/app_test/run_all.ksh @@ -15,7 +15,7 @@ do done set -e -ksh run_app_test.ksh $build +ksh run_app_test.ksh -v -i $build ksh run_multi_test.ksh ksh run_rr_test.ksh ksh run_rts_test.ksh -s 20 diff --git a/test/app_test/run_app_test.ksh b/test/app_test/run_app_test.ksh index ff0673f..964a080 100644 --- a/test/app_test/run_app_test.ksh +++ b/test/app_test/run_app_test.ksh @@ -27,9 +27,9 @@ # recevier then run the basic test. # # Example command line: -# ksh ./run # default 10 messages at 1 msg/sec -# ksh ./run -N # default but with nanomsg lib -# ksh ./run -d 100 -n 10000 # send 10k messages with 100ms delay between +# ksh ./run_app_test.ksh # default 20 messages at 2 msg/sec +# ksh ./run_app_test.ksh -N # default but with nanomsg lib +# ksh ./run_app_test.ksh -d 100 -n 10000 # send 10k messages with 100ms delay between # # Date: 22 April 2019 # Author: E. Scott Daniels @@ -59,22 +59,61 @@ function run_rcvr { echo $? >/tmp/PID$$.rrc } -# --------------------------------------------------------- +# snarf the first v4 IP (not the loopback) that belongs to this box/container/guest +function snarf_ip { + ip addr| sed -e '/inet /b c; d' -e ':c' -e '/127.0.0.1/d; s!/.*!!; s!^.* !!; q' +} -if [[ ! -f local.rt ]] # we need the real host name in the local.rt; build one from mask if not there -then - hn=$(hostname) - sed "s!%%hostname%%!$hn!" rt.mask >local.rt -fi +# Drop a contrived route table in. This table should add a reference to our +# local IP to an entry to ensure that the route table collector code in RMr +# is removing 'hairpin' loops. If RMr isn't removing the references to our +# hostname and IP address when it builds the endpoint lists, the sender will +# send messages to itself some of the time, causing the receiver to come up +# short when comparing messages received with expected count and thus failing. +# +function set_rt { + typeset port=4560 # port the receiver listens on by default + + cat <app_test.rt + newrt | start + mse | 0 | 0 | localhost:$port,$my_ip:43086 + mse | 1 | 10 | localhost:$port,${my_host//.*/}:43086 + mse | 2 | 20 | localhost:$port + rte | 3 | localhost:$port + mse | 3 | 100 | localhost:$port # special test to ensure that this does not affect previous entry + rte | 4 | localhost:$port + rte | 5 | localhost:$port + rte | 6 | localhost:$port + rte | 7 | localhost:$port + rte | 8 | localhost:$port + rte | 9 | localhost:$port + rte | 10 | localhost:$port + rte | 11 | localhost:$port + rte | 12 | localhost:$port + rte | 13 | localhost:$port + newrt | end + +head -3 app_test.rt + +endKat + +} -nmsg=10 # total number of messages to be exchanged (-n value changes) -delay=1000000 # microsec sleep between msg 1,000,000 == 1s +# --------------------------------------------------------- + +nmsg=20 # total number of messages to be exchanged (-n value changes) + # need two sent to each receiver to ensure hairpin entries were removed (will fail if they were not) +delay=500000 # microsec sleep between msg 1,000,000 == 1s nano_sender=0 # start nano version if set (-N) nano_receiver=0 wait=1 rebuild=0 nopull="" # -b sets so that build does not pull verbose=0 +use_installed=0 +my_ip=$(snarf_ip) # get an ip to insert into the route table +keep=0 + while [[ $1 == -* ]] do @@ -82,6 +121,8 @@ do -B) rebuild=1;; # build with pull first -b) rebuild=1; nopull="nopull";; # buld without pull -d) delay=$2; shift;; + -k) keep=1;; + -i) use_installed=1;; -N) nano_sender=1 nano_receiver=1 ;; @@ -89,8 +130,12 @@ do -v) verbose=1;; *) echo "unrecognised option: $1" - echo "usage: $0 [-B] [-d micor-sec-delay] [-N] [-n num-msgs]" + echo "usage: $0 [-B] [-d micor-sec-delay] [-i] [-k] [-N] [-n num-msgs]" echo " -B forces a rebuild which will use .build" + echo " -i causes the installd libraries (/usr/local) to be referenced; -B is ignored if supplied" + echo " -k keeps the route table" + echo "" + echo "total number of messages must > 20 to correctly test hairpin loop removal" exit 1 ;; esac @@ -98,38 +143,54 @@ do shift done +if [[ ! -f app_test.rt ]] # we need the real host name in the local.rt; build one from mask if not there +then + my_host=$(hostname) + set_rt + if (( verbose )) + then + cat app_test.rt + fi +fi + if (( verbose )) then echo "2" >.verbose export RMR_VCTL_FILE=".verbose" fi -if (( rebuild )) + +if (( use_installed )) # point at installed library rather than testing the build then - set -e - ksh ./rebuild.ksh $nopull | read build_path - set +e + export LD_LIBRARY_PATH=/usr/local/lib + export LIBRARY_PATH=$LD_LIBRARY_PATH else - build_path=${BUILD_PATH:-"../../.build"} # we prefer .build at the root level, but allow user option - - if [[ ! -d $build_path ]] + if (( rebuild )) then - echo "cannot find build in: $build_path" - echo "either create, and then build RMr, or set BUILD_PATH as an evironment var before running this" - exit 1 + set -e + ksh ./rebuild.ksh $nopull | read build_path + set +e + else + build_path=${BUILD_PATH:-"../../.build"} # we prefer .build at the root level, but allow user option + + if [[ ! -d $build_path ]] + then + echo "cannot find build in: $build_path" + echo "either create, and then build RMr, or set BUILD_PATH as an evironment var before running this" + exit 1 + fi fi -fi -if [[ -d $build_path/lib64 ]] -then - export LD_LIBRARY_PATH=$build_path:$build_path/lib64 -else - export LD_LIBRARY_PATH=$build_path:$build_path/lib + if [[ -d $build_path/lib64 ]] + then + export LD_LIBRARY_PATH=$build_path:$build_path/lib64 + else + export LD_LIBRARY_PATH=$build_path:$build_path/lib + fi + export LIBRARY_PATH=$LD_LIBRARY_PATH fi -export LIBRARY_PATH=$LD_LIBRARY_PATH -export RMR_SEED_RT=${RMR_SEED_RT:-./local.rt} # allow easy testing with different rt - +export RMR_SEED_RT=${RMR_SEED_RT:-./app_test.rt} # allow easy testing with different rt if [[ ! -f ./sender ]] then @@ -141,7 +202,7 @@ then fi run_rcvr & -sleep 2 # if sender starts faster than rcvr we can drop, so pause a bit +sleep 2 # if sender starts faster than rcvr we can drop msgs, so pause a bit run_sender & wait @@ -155,6 +216,11 @@ else echo "[PASS] sender rc=$src receiver rc=$rrc" fi +if (( ! keep )) +then + rm app_test.rt +fi + rm /tmp/PID$$.* rm -f .verbose diff --git a/test/app_test/run_lcall_test.ksh b/test/app_test/run_lcall_test.ksh new file mode 100644 index 0000000..3d5d691 --- /dev/null +++ b/test/app_test/run_lcall_test.ksh @@ -0,0 +1,235 @@ +#!/usr/bin/env ksh +# :vi ts=4 sw=4 noet : +#================================================================================== +# Copyright (c) 2019 Nokia +# Copyright (c) 2018-2019 AT&T Intellectual Property. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#================================================================================== +# + +# --------------------------------------------------------------------------------- +# Mnemonic: run_lcall_test.ksh +# Abstract: This is a simple script to set up and run the basic send/receive +# processes for some library validation on top of nano/nng. This +# particular test starts the latency caller and latency receiver +# processes such that they exchange messages and track the latency +# from the caller's perepective (both outbound to receiver, and then +# back. Stats are presented at the end. This test is NOT intended +# to be used as a CI validation test. +# +# The sender and receiver processes all have a 20s timeout (+/-) +# which means that all messages must be sent, and acked within that +# time or the processes will give up and report failure. Keep in mind +# that n messages with a delay value (-d) set will affect whether or +# not the messages can be sent in the 20s timeout period. There is +# currently no provision to adjust the timeout other than by changing +# the C source. The default (100 msgs with 500 micro-sec delay) works +# just fine for base testing. +# +# Example command line: +# # run with 10 caller threads sending 10,000 meessages each, +# # 5 receivers, and a 10 mu-s delay between each caller send +# ksh ./run_lcall_test.ksh -d 10 -n 10000 -r 5 -c 10 +# +# Date: 28 May 2019 +# Author: E. Scott Daniels +# --------------------------------------------------------------------------------- + + +# The sender and receivers are run asynch. Their exit statuses are captured in a +# file in order for the 'main' to pick them up easily. +# +function run_sender { + if (( $nano_sender )) + then + echo "nanomsg not supportded" + exit 1 + else + ./lcaller ${nmsg:-10} ${delay:-500} ${cthreads:-3} + fi + echo $? >/tmp/PID$$.src # must communicate state back via file b/c asynch +} + +# $1 is the instance so we can keep logs separate +function run_rcvr { + typeset port + + port=$(( 4460 + ${1:-0} )) + export RMR_RTG_SVC=$(( 9990 + $1 )) + if (( $nano_receiver )) + then + echo "nanomsg not supported" + exit 1 + else + ./lreceiver $(( ((nmsg * cthreads)/nrcvrs) + 10 )) $port + fi + echo $? >/tmp/PID$$.$1.rrc +} + +# Drop a contrived route table in such that the sender sends each message to n +# receivers. +# +function set_rt { + typeset port=4460 + typeset groups="localhost:4460" + for (( i=1; i < ${1:-3}; i++ )) + do + groups="$groups,localhost:$((port+i))" + done + + cat <lcall.rt + newrt | start + mse |0 | 0 | $groups + mse |1 | 10 | $groups + mse |2 | 20 | $groups + rte |3 | $groups + rte |4 | $groups + rte |5 | $groups + rte |6 | $groups + rte |7 | $groups + rte |8 | $groups + rte |9 | $groups + rte |10 | $groups + rte |11 | $groups + newrt | end +endKat +} + +# --------------------------------------------------------- + +if [[ ! -f local.rt ]] # we need the real host name in the local.rt; build one from mask if not there +then + hn=$(hostname) + sed "s!%%hostname%%!$hn!" rt.mask >local.rt +fi + +cthreads=3 # number of caller threads +nmsg=100 # total number of messages to be exchanged (-n value changes) +delay=500 # microsec sleep between msg 1,000,000 == 1s +nano_sender=0 # start nano version if set (-N) +nano_receiver=0 +wait=1 +rebuild=0 +verbose=0 +nrcvrs=3 # this is sane, but -r allows it to be set up +use_installed=0 + +while [[ $1 == -* ]] +do + case $1 in + -c) cthreads=$2; shift;; + -B) rebuild=1;; + -d) delay=$2; shift;; + -i) use_installed=1;; + -N) echo "abort: nanomsg does not support epoll and thus cannot be used for mt-caller" + echo "" + exit 1; + ;; + -n) nmsg=$2; shift;; + -r) nrcvrs=$2; shift;; + -v) verbose=1;; + + *) echo "unrecognised option: $1" + echo "usage: $0 [-B] [-c caller-threads] [-d micor-sec-delay] [-i] [-n num-msgs] [-r num-receivers]" + echo " -B forces a rebuild which will use .build" + echo " -i will use installed libraries (/usr/local) and cause -B to be ignored if supplied)" + exit 1 + ;; + esac + + shift +done + +if (( verbose )) +then + echo "2" >.verbose + export RMR_VCTL_FILE=".verbose" +fi + +if (( use_installed )) # point at installed library +then + export LD_LIBRARY_PATH=/usr/local/lib + export LIBRARY_PATH=$LD_LIBRARY_PATH +else + if (( rebuild )) + then + build_path=../../.build # if we rebuild we can insist that it is in .build :) + set -e + ksh ./rebuild.ksh + set +e + else + build_path=${BUILD_PATH:-"../../.build"} # we prefer .build at the root level, but allow user option + + if [[ ! -d $build_path ]] + then + echo "cannot find build in: $build_path" + echo "either create, and then build RMr, or set BUILD_PATH as an evironment var before running this" + exit 1 + fi + fi + + if [[ -d $build_path/lib64 ]] + then + export LD_LIBRARY_PATH=$build_path:$build_path/lib64 + else + export LD_LIBRARY_PATH=$build_path:$build_path/lib + fi +fi + +export LIBRARY_PATH=$LD_LIBRARY_PATH +export RMR_SEED_RT=./lcall.rt + +set_rt $nrcvrs # set up the rt for n receivers + +if (( rebuild )) || [[ ! -f ./lcaller ]] +then + if ! make -B lcaller lreceiver >/dev/null 2>&1 + then + echo "[FAIL] cannot find lcaller binary, and cannot make it.... humm?" + exit 1 + fi +fi + +for (( i=0; i < nrcvrs; i++ )) # start the receivers with an instance number +do + run_rcvr $i & +done + +sleep 2 # let receivers init so we don't shoot at an empty target +run_sender & + +wait + + +for (( i=0; i < nrcvrs; i++ )) # collect return codes +do + head -1 /tmp/PID$$.$i.rrc | read x + (( rrc += x )) +done + +head -1 /tmp/PID$$.src | read src + +if (( !! (src + rrc) )) +then + echo "[FAIL] sender rc=$src receiver rc=$rrc" +else + echo "[PASS] sender rc=$src receiver rc=$rrc" + rm -f lcall.rt +fi + +rm /tmp/PID$$.* +rm -f .verbose + +exit $(( !! (src + rrc) )) + diff --git a/test/sr_nano_static_test.c b/test/sr_nano_static_test.c index efeba23..9f88562 100644 --- a/test/sr_nano_static_test.c +++ b/test/sr_nano_static_test.c @@ -60,8 +60,8 @@ static void gen_rt( uta_ctx_t* ctx ) { "newrt|start\n" // false start to drive detection "xxx|badentry to drive default case" "newrt|start\n" - "rte|0|localhost:4560,localhost:4562\n" // these are legitimate entries for our testing - "rte|1|localhost:4562;localhost:4561,localhost:4569\n" + "rte|0|localhost:4560,localhost:4562,dummy-test:1111\n" // these are legitimate entries for our testing + "rte|1|localhost:4562;localhost:4561,localhost:4569,10.7.9.86:4560\n" "rte|2|localhost:4562| 10\n" // new subid at end "mse|4|10|localhost:4561\n" // new msg/subid specifier rec "mse|4|localhost:4561\n" // new mse entry with less than needed fields @@ -183,7 +183,7 @@ static int sr_nano_test() { ctx->mring = NULL; //uta_mk_ring( 128 ); ctx->max_plen = RMR_MAX_RCV_BYTES + sizeof( uta_mhdr_t ); ctx->max_mlen = ctx->max_plen + sizeof( uta_mhdr_t ); - ctx->my_name = strdup( "dummy-test" ); + ctx->my_name = strdup( "dummy-test:1111" ); ctx->my_ip = strdup( "30.4.19.86:1111" ); uta_lookup_rtg( ctx ); -- 2.16.6