From 5200efe1e6dd13b1e1241ce623c4978151be34e8 Mon Sep 17 00:00:00 2001 From: "E. Scott Daniels" Date: Mon, 20 Jan 2020 10:31:44 -0500 Subject: [PATCH] Add dynamic route table update to SI95 support The initial SI95 support did not include dynamic route table update capabilities. The library will now support the receipt of route table generator messages via an RMR session over the RTG service port. Signed-off-by: E. Scott Daniels Change-Id: I02e9f72e049152abd9ef516cccb7705eca4aa7bd --- CHANGES | 3 + CMakeLists.txt | 2 +- src/rmr/si/src/mt_call_si_static.c | 33 ++-- src/rmr/si/src/rmr_si.c | 25 ++- src/rmr/si/src/rtc_si_static.c | 230 +++++++++++++++++++++++++++- src/rmr/si/src/si95/siwait.c | 1 + src/rmr/si/src/sr_si_static.c | 8 +- test/app_test/mt_listener.c | 305 +++++++++++++++++++++++++++++++++++++ 8 files changed, 563 insertions(+), 44 deletions(-) create mode 100644 test/app_test/mt_listener.c diff --git a/CHANGES b/CHANGES index 1a46b2a..3ae419a 100644 --- a/CHANGES +++ b/CHANGES @@ -2,6 +2,9 @@ API and build change and fix summaries. Doc correctsions and/or changes are not mentioned here; see the commit messages. +2020 January 20; verison 3.0.1 + Enable support for dynamic route table updates via RMR session. + 2020 January 16; version 3.0.0 Introduce support for SI95 transport library to replace NNG. (RMR library versions will use leading odd numbers to avoid tag collisions diff --git a/CMakeLists.txt b/CMakeLists.txt index 2cff539..d44827a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -38,7 +38,7 @@ cmake_minimum_required( VERSION 3.5 ) set( major_version "3" ) # should be automatically populated from git tag later, but until CI process sets a tag we use this set( minor_version "0" ) -set( patch_level "0" ) +set( patch_level "1" ) set( install_root "${CMAKE_INSTALL_PREFIX}" ) set( install_inc "include/rmr" ) diff --git a/src/rmr/si/src/mt_call_si_static.c b/src/rmr/si/src/mt_call_si_static.c index 6e2e8aa..fa603ff 100644 --- a/src/rmr/si/src/mt_call_si_static.c +++ b/src/rmr/si/src/mt_call_si_static.c @@ -54,8 +54,6 @@ static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) { call ref to point to all of the various bits and set real len etc, then we queue it. Raw_msg is expected to include the transport goo placed in front of the RMR header and payload. - - done -- FIX ME?? can we eliminate the buffer copy here? */ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd ) { rmr_mbuf_t* mbuf; @@ -70,18 +68,10 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd } } -/* - if( (mbuf = (rmr_mbuf_t *) malloc( sizeof( *mbuf ))) != NULL ) { // alloc mbuf and point at various bits of payload - memset( mbuf, 0, sizeof( *mbuf ) ); - mbuf->tp_buf = raw_msg; - mbuf->ring = ctx->zcb_mring; -*/ if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) { mbuf->tp_buf = raw_msg; mbuf->rts_fd = sender_fd; - // eliminated :) memcpy( mbuf->tp_buf, river->accum + offset, river->msg_size ); - ref_tpbuf( mbuf, msg_size ); // point mbuf at bits in the datagram hdr = mbuf->header; // convenience if( hdr->flags & HFL_CALL_MSG ) { // call generated message; ignore call-id etc and queue @@ -124,18 +114,17 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { int need; // bytes needed for something int i; - // for speed these checks should be enabled only in debug mode and assume we always get a good context - if( (ctx = (uta_ctx_t *) vctx) == NULL ) { - return SI_RET_OK; - } - - if( fd >= ctx->nrivers || fd < 0 ) { - if( DEBUG ) fprintf( stderr, "[DBUG] callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers ); - return SI_RET_OK; + if( PARINOID_CHECKS ) { // PARINOID mode is slower; off by default + if( (ctx = (uta_ctx_t *) vctx) == NULL ) { + return SI_RET_OK; + } + + if( fd >= ctx->nrivers || fd < 0 ) { + if( DEBUG ) fprintf( stderr, "[DBUG] callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers ); + return SI_RET_OK; + } } - // -------- end debug checks ----------------- - if( buflen <= 0 ) { return SI_RET_OK; } @@ -159,7 +148,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) { /* fprintf( stderr, "\n>>>>> data callback for %d bytes from %d\n", buflen, fd ); for( i = 0; i < 40; i++ ) { -fprintf( stderr, "%02x ", (unsigned char) *(buf+i) ); + fprintf( stderr, "%02x ", (unsigned char) *(buf+i) ); } fprintf( stderr, "\n" ); */ @@ -226,7 +215,7 @@ fprintf( stderr, "\n" ); This is expected to execute in a separate thread. It is responsible for _all_ receives and queues them on the appropriate ring, or chute. It does this by registering the callback function above with the SI world - and then caling SIwait() to drive the callback when data has arrived. + and then calling SIwait() to drive the callback when data has arrived. The "state" of the message is checked which determines where the message diff --git a/src/rmr/si/src/rmr_si.c b/src/rmr/si/src/rmr_si.c index b1bfd1f..8e499c1 100644 --- a/src/rmr/si/src/rmr_si.c +++ b/src/rmr/si/src/rmr_si.c @@ -65,8 +65,7 @@ #include "ring_static.c" // message ring support #include "rt_generic_static.c" // route table things not transport specific #include "rtable_si_static.c" // route table things -- transport specific -#include "rtc_static.c" // route table collector -#include "rtc_si_static.c" // our private test function +#include "rtc_si_static.c" // specific RMR only route table collector (SI only for now) #include "tools_static.c" #include "sr_si_static.c" // send/receive static functions #include "wormholes.c" // wormhole api externals and related static functions (must be LAST!) @@ -535,11 +534,12 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { char wbuf[1024]; // work buffer char* tok; // pointer at token in a buffer char* tok2; + int static_rtc = 0; // if rtg env var is < 1, then we set and don't listen on a port int state; int i; if( ! announced ) { - fprintf( stderr, "[INFO] ric message routing library on SI95 mv=%d flg=%02x (%s %s.%s.%s built: %s)\n", + fprintf( stderr, "[INFO] ric message routing library on SI95/b mv=%d flg=%02x (%s %s.%s.%s built: %s)\n", RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ ); announced = 1; } @@ -600,6 +600,12 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { port = proto_port; // assume something like "1234" was passed } + if( (tok = getenv( "ENV_RTG_PORT" )) != NULL ) { // must check port here -- if < 1 then we just start static file 'listener' + if( atoi( tok ) < 1 ) { + static_rtc = 1; + } + } + if( (tok = getenv( ENV_SRC_ID )) != NULL ) { // env var overrides what we dig from system tok = strdup( tok ); // something we can destroy if( *tok == '[' ) { // we allow an ipv6 address here @@ -665,13 +671,18 @@ static void* init( char* uproto_port, int max_msg_size, int flags ) { return NULL; } - if( !(flags & FL_NOTHREAD) ) { // skip if internal function that doesnt need an rtc - if( pthread_create( &ctx->rtc_th, NULL, rtc_file, (void *) ctx ) ) { // kick the rt collector thread - fprintf( stderr, "[WRN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) ); + if( !(flags & FL_NOTHREAD) ) { // skip if internal function that doesnt need a RTC + if( static_rtc ) { + if( pthread_create( &ctx->rtc_th, NULL, rtc_file, (void *) ctx ) ) { // kick the rt collector thread as just file reader + fprintf( stderr, "[WRN] rmr_init: unable to start static route table collector thread: %s", strerror( errno ) ); + } + } else { + if( pthread_create( &ctx->rtc_th, NULL, rtc, (void *) ctx ) ) { // kick the real rt collector thread + fprintf( stderr, "[WRN] rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) ); + } } } - //fprintf( stderr, ">>>>> starting threaded receiver with ctx=%p si_ctx=%p\n", ctx, ctx->si_ctx ); ctx->flags |= CFL_MTC_ENABLED; // for SI threaded receiver is the only way if( pthread_create( &ctx->mtc_th, NULL, mt_receive, (void *) ctx ) ) { // so kick it fprintf( stderr, "[WRN] rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) ); diff --git a/src/rmr/si/src/rtc_si_static.c b/src/rmr/si/src/rtc_si_static.c index 76fea79..193672f 100644 --- a/src/rmr/si/src/rtc_si_static.c +++ b/src/rmr/si/src/rtc_si_static.c @@ -20,14 +20,20 @@ /* Mnemonic: rtc_si_static.c - Abstract: This is a test module to allow the route table to be read - from a static spot and NOT to attempt to listen for updates - from some outside source. + Abstract: The route table collector is started as a separate pthread and + is responsible for listening for route table updates from a + route manager or route table generator process. + + This comes from the common src and may be moved back there once + it is not necessary to support raw sessions (all route table + gen messages are received over rmr channel). Author: E. Scott Daniels - Date: 18 October 2019 + Date: 29 November 2018 (extracted to common 13 March 2019) + Imported to si base 17 Jan 2020. */ + #ifndef _rtc_si_staic_c #define _rtc_si_staic_c @@ -36,7 +42,6 @@ #include #include #include -#include #include #include #include @@ -69,12 +74,223 @@ static void* rtc_file( void* vctx ) { lseek( vfd, 0, 0 ); read( vfd, wbuf, 10 ); vlevel = atoi( wbuf ); - } - + } + read_static_rt( ctx, vlevel ); // seed the route table if one provided sleep( 60 ); } } + +static int refresh_vlevel( int vfd ) { + int vlevel = 0; + char rbuf[128]; + + if( vfd >= 0 ) { // if file is open, read current value + rbuf[0] = 0; + lseek( vfd, 0, 0 ); + read( vfd, rbuf, 10 ); + vlevel = atoi( rbuf ); + } + + return vlevel; +} + +/* + Route Table Collector + A side thread which opens a socket and subscribes to a routing table generator. + It may do other things along the way (latency measurements?). + + The pointer is a pointer to the context. + + Listens for records from the route table generation publisher, expecting + one of the following, newline terminated, ASCII records: + rte|msg-type||]name:port,name:port,...;name:port,... // route table entry with one or more groups of endpoints + new|start // start of new table + new|end // end of new table; complete + + Name must be a host name which can be looked up via gethostbyname() (DNS). + + Multiple endpoints (name:port) may be given separated by a comma; an endpoint is selected using round robin + for each message of the type that is sent. + + Multiple endpoint groups can be given as a comma separated list of endpoints, separated by semicolons: + group1n1:port,group1n2:port,group1n3:port;group2n1:port,group2n2:port + + If multiple groups are given, when send() is called for the cooresponding message type, + the message will be sent to one endpoint in each group. + + msg-type is the numeric message type (e.g. 102). If it is given as n,name then it is assumed + that the entry applies only to the instance running with the hostname 'name.' + + Buffers received from the route table generator can contain multiple newline terminated + records, but each buffer must be less than 4K in length, and the last record in a + buffer may NOT be split across buffers. + + Other chores: + In addition to the primary task of getting, vetting, and installing a new route table, or + updates to the existing table, this thread will periodically cause the send counts for each + endpoint known to be written to standard error. The frequency is once every 180 seconds, and + more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0. +*/ +static void* rtc( void* vctx ) { + uta_ctx_t* ctx; // context user has -- where we pin the route table + uta_ctx_t* pvt_cx; // private context for session with rtg + rmr_mbuf_t* msg = NULL; // message from rtg + char* payload; // payload in the message + size_t mlen; + size_t clen; // length to copy and mark + char* port; // a port number we listen/connect to + char* fport; // pointer to the real buffer to free + size_t buf_size; // nng needs var pointer not just size? + char* nextr; // pointer at next record in the message + char* curr; // current record + int i; + long blabber = 0; // time of last blabber so we don't flood if rtg goes bad + int cstate = -1; // connection state to rtg + int state; // processing state of some nng function + char* tokens[128]; + char wbuf[128]; + char* pbuf = NULL; + int pbuf_size = 0; // number allocated in pbuf + int ntoks; + int raw_interface = 0; // rtg is using raw NNG/Nano not RMr to send updates + int vfd = -1; // verbose file des if we have one + int vlevel = 0; // how chatty we should be 0== no nattering allowed + char* eptr; + int epfd = -1; // fd for epoll so we can multi-task + struct epoll_event events[1]; // list of events to give to epoll; we only have one we care about + struct epoll_event epe; // event definition for event to listen to + int count_delay = 30; // number of seconds between writing count info; initially every 30s + int bump_freq = 0; // time at which we will bump count frequency to every 5 minutes + + + if( (ctx = (uta_ctx_t *) vctx) == NULL ) { + fprintf( stderr, "[CRI] rmr_rtc: internal mishap: context passed in was nil\n" ); + return NULL; + } + + if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) { + vfd = open( eptr, O_RDONLY ); + vlevel = refresh_vlevel( vfd ); + } + + read_static_rt( ctx, vlevel ); // seed the route table if one provided + + if( (port = getenv( ENV_RTG_PORT )) == NULL || ! *port ) { // port we need to open to listen for RTG connections + port = strdup( DEF_RTG_PORT ); + } else { + port = strdup( port ); + } + + if( (curr = getenv( ENV_RTG_RAW )) != NULL ) { + raw_interface = atoi( curr ) > 0; // if > 0 we assume that rtg messages are NOT coming from an RMr based process + } + + fport = port; // must hold to free + + ntoks = uta_tokenise( port, tokens, 120, ':' ); // assume tcp:port, but it could be port or old style host:port + switch( ntoks ) { + case 1: + port = tokens[0]; // just the port + break; + + case 2: + port = tokens[1]; // tcp:port or :port + break; + + default: + port = DEF_RTG_PORT; // this shouldn't happen, but parnioia is good + break; + } + + if( (pvt_cx = init( port, MAX_RTG_MSG_SZ, FL_NOTHREAD )) == NULL ) { // open a private context (no RT listener!) + fprintf( stderr, "[CRI] rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" ); + + while( TRUE ) { // no listen port, just dump counts now and then + sleep( count_delay ); + rt_epcounts( ctx->rtable, ctx->my_name ); + } + + free( fport ); // parinoid free and return + return NULL; + } + + if( DEBUG ) fprintf( stderr, "[DBUG] rtc thread is running and listening; listening for rtg conns on %s\n", port ); + free( fport ); + + // future: if we need to register with the rtg, then build a message and send it through a wormhole here + + bump_freq = time( NULL ) + 300; // after 5 minutes we decrease the count frequency + blabber = 0; + while( 1 ) { // until the cows return, pigs fly, or somesuch event likely not to happen + while( msg == NULL || msg->len <= 0 ) { // until we actually have something from the other side + msg = rmr_torcv_msg( pvt_cx, msg, 1000 ); + + if( time( NULL ) > blabber ) { + vlevel = refresh_vlevel( vfd ); + if( vlevel >= 0 ) { // allow it to be forced off with -n in verbose file + blabber = time( NULL ) + count_delay; // set next time to blabber, then do so + if( blabber > bump_freq ) { + count_delay = 300; + } + rt_epcounts( ctx->rtable, ctx->my_name ); + } + } + } + + vlevel = refresh_vlevel( vfd ); // ensure it's fresh when we get a message + + if( msg != NULL && msg->len > 0 ) { + payload = msg->payload; + mlen = msg->len; // usable bytes in the payload + if( vlevel > 1 ) { + fprintf( stderr, "[DBUG] rmr_rtc: received rt message; %d bytes (%s)\n", (int) mlen, msg->payload ); + } else { + if( DEBUG > 1 || (vlevel > 0) ) fprintf( stderr, "[DBUG] rmr_rtc: received rt message; %d bytes\n", (int) mlen ); + } + + if( pbuf_size <= mlen ) { + if( pbuf ) { + free( pbuf ); + } + if( mlen < 512 ) { + pbuf_size = 512; + } else { + pbuf_size = mlen * 2; + } + pbuf = (char *) malloc( sizeof( char ) * pbuf_size ); + } + memcpy( pbuf, payload, mlen ); + pbuf[mlen] = 0; // don't depend on sender making this a legit string + + curr = pbuf; + while( curr ) { // loop over each record in the buffer + nextr = strchr( curr, '\n' ); // allow multiple newline records, find end of current and mark + + if( nextr ) { + *(nextr++) = 0; + } + + if( vlevel > 1 ) { + fprintf( stderr, "[DBUG] rmr_rtc: processing (%s)\n", curr ); + } + parse_rt_rec( ctx, curr, vlevel ); // parse record and add to in progress table + + curr = nextr; + } + + if( ctx->shutdown ) { // mostly for testing, but allows user app to close us down if rmr_*() function sets this + break; + } + + msg->len = 0; // force back into the listen loop + } + } + + return NULL; // unreachable, but some compilers don't see that and complain. +} + + #endif diff --git a/src/rmr/si/src/si95/siwait.c b/src/rmr/si/src/si95/siwait.c index b5df75f..d45efa6 100644 --- a/src/rmr/si/src/si95/siwait.c +++ b/src/rmr/si/src/si95/siwait.c @@ -113,6 +113,7 @@ extern int SIwait( struct ginfo_blk *gptr ) { status = SInewsession( gptr, tpptr ); // accept connection } else { // data received on a regular port (we support just tcp now status = RECV( fd, gptr->rbuf, MAX_RBUF, 0 ); // read data + //fprintf( stderr, ">>>>> wait popped status =%d\n", status ); if( status > 0 && ! (tpptr->flags & TPF_DRAIN) ) { if( (cbptr = gptr->cbtab[SI_CB_CDATA].cbrtn) != NULL ) { status = (*cbptr)( gptr->cbtab[SI_CB_CDATA].cbdata, fd, gptr->rbuf, status ); diff --git a/src/rmr/si/src/sr_si_static.c b/src/rmr/si/src/sr_si_static.c index 5c751d7..703e80c 100644 --- a/src/rmr/si/src/sr_si_static.c +++ b/src/rmr/si/src/sr_si_static.c @@ -116,8 +116,6 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s uta_mhdr_t* hdr; // convenience pointer int tr_len; // trace data len (default or override) int* alen; // convenience pointer to set allocated len - //int tpb_len; // transport buffer total len -static int logged = 0; tr_len = trlo > 0 ? trlo : ctx->trace_data_len; @@ -243,11 +241,7 @@ static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen ) { int ver; int hlen; // header len to use for a truncation check - msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN; // FIX ME: hard 50 needs to be some kind of tp header struct - - // do NOT reduce alen any more. alen must be TP_HEADER + RMR_HEADER + user space - // get payload size will do the right thing and subtract TP_HEADER and RMR_HEADER lengths - //alen -= 50; // actual length of "rmr space" + msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN; v1hdr = (uta_v1mhdr_t *) msg->header; // v1 will always allow us to suss out the version diff --git a/test/app_test/mt_listener.c b/test/app_test/mt_listener.c new file mode 100644 index 0000000..7800722 --- /dev/null +++ b/test/app_test/mt_listener.c @@ -0,0 +1,305 @@ +// :vim ts=4 sw=4 noet: +/* +================================================================================== + Copyright (c) 2019 Nokia + Copyright (c) 2018-2019 AT&T Intellectual Property. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +================================================================================== +*/ + +/* + Mnemonic: mt_listener.c + Abstract: This simple application runs multiple "listener" threads. Each thread + receives from a single RMR context to validate the ability spin + several listening threads in an application. + + Message format is: + ck1 ck2| @ tid + + Ck1 is the simple check sum of the msg-text (NOT includeing ) + Ck2 is the simple check sum of the trace data which is a nil terminated + series of bytes. + tid is the thread id assigned by the main thread. + + Parms: argv[1] == number of msgs to send (10) + argv[2] == delay (mu-seconds, 1000000 default) + argv[3] == number of threads (3) + argv[4] == listen port + + Sender will send for at most 20 seconds, so if nmsgs and delay extend + beyond that period the total number of messages sent will be less + than n. + + Date: 18 April 2019 + Author: E. Scott Daniels +*/ + +#include +#include +#include +#include +#include +#include +#include +#include + + +#include +#include "time_tools.c" // our time based test tools + +#define TRACE_SIZE 40 // bytes in header to provide for trace junk +#define WBUF_SIZE 2048 + +/* + Thread data +*/ +typedef struct tdata { + int id; // the id we'll pass to RMr mt-call function NOT the thread id + int n2get; // number of messages to expect + int delay; // max delay waiting for n2get messages + void* mrc; // RMr context + int state; +} tdata_t; + + + +// -------------------------------------------------------------------------------- + + +static int sum( char* str ) { + int sum = 0; + int i = 0; + + while( *str ) { + sum += *(str++) + i++; + } + + return sum % 255; +} + +/* + Split the message at the first sep and return a pointer to the first + character after. +*/ +static char* split( char* str, char sep ) { + char* s; + + s = strchr( str, sep ); + + if( s ) { + return s+1; + } + + //fprintf( stderr, " no pipe in message: (%s)\n", str ); + return NULL; +} + +/* + Executed as a thread, this puppy will listen for messages and report + what it receives. +*/ +static void* mk_calls( void* data ) { + tdata_t* control; + rmr_mbuf_t* msg = NULL; // message + int* count_bins = NULL; + char* wbuf = NULL; + char buf2[128]; + int i; + int state = 0; + char* msg_data; // bits after checksum info in payload + long good = 0; // counters + long bad = 0; + long bad_tr = 0; + long count = 0; // total msgs received + struct timespec start_ts; + struct timespec end_ts; + int elap; // elapsed time to receive messages + time_t timeout; + + count_bins = (int *) malloc( sizeof( int ) * 11 ); + + wbuf = (char *) malloc( sizeof( char ) * WBUF_SIZE ); + + if( (control = (tdata_t *) data) == NULL ) { + fprintf( stderr, "thread data was nil; bailing out\n" ); + } + fprintf( stderr, " id=%d thread receiver started expecting=%d messages timeout=%d seconds\n", + control->id, control->n2get, control->delay ); + + timeout = time( NULL ) + control->delay; // max time to wait for a good message + while( count < control->n2get ) { // wait for n messages -- no timeout + msg = rmr_torcv_msg( control->mrc, msg, 1000 ); // pop after ~1 second + + if( msg ) { + //fprintf( stderr, " id=%d got type=%d state=%s msg=(%s)\n", control->id, msg->mtype, msg->state == RMR_OK ? "OK" : "timeout", msg->payload ); + if( msg->state == RMR_OK ) { + if( good == 0 ) { // mark time of first good message + set_time( &start_ts ); + } + set_time( &end_ts ); // mark the time of last good message + + if( (msg_data = split( msg->payload, '|' )) != NULL ) { + if( sum( msg_data ) == atoi( (char *) msg->payload ) ) { + good++; + } else { + fprintf( stderr, " chk sum bad: computed=%d expected;%d (%s)\n", sum( msg_data ), + atoi( msg->payload ), msg_data ); + bad++; + } + + if( (msg_data = split( msg->payload, ' ' )) != NULL ) { // data will point to the chksum for the trace data + state = rmr_get_trace( msg, wbuf, 1024 ); // should only copy upto the trace size; we'll check that + if( state > 128 || state < 0 ) { + fprintf( stderr, "trace data size listed unexpectedly long: %d\n", state ); + } else { + if( state && sum( wbuf ) != atoi( msg_data ) ) { + fprintf( stderr, " trace chk sum bad: computed=%d expected;%d len=%d (%s)\n", sum( wbuf ), + atoi( msg_data ), state, wbuf ); + bad_tr++; + } + } + } + } else { + good++; // nothing to check, assume good + } + count++; + + if( msg->mtype >= 0 && msg->mtype <= 10 ) { + count_bins[msg->mtype]++; + } + } + } else { + fprintf( stderr, " id=%d timeout with nil msg\n", control->id ); + } + + if( time( NULL ) > timeout ) { + fprintf( stderr, " id=%d timeout before receiving %d messages\n", control->id, control->n2get ); + break; + } + } + elap = elapsed( &start_ts, &end_ts, ELAP_MS ); + if( elap > 0 ) { + fprintf( stderr, " id=%d received %ld messages in %d ms rate = %ld msg/sec\n", control->id, count, elap, (count/elap)*1000 ); + } else { + fprintf( stderr, " id=%d runtime too short to compute received rate\n", control->id ); + } + + snprintf( wbuf, WBUF_SIZE, " id=%d histogram: ", control->id ); // build histogram so we can write with one fprintf call + for( i = 0; i < 11; i++ ) { + snprintf( buf2, sizeof( buf2 ), "%5d ", count_bins[i] ); + strcat( wbuf, buf2 ); + } + fprintf( stderr, "%s\n", wbuf ); + + fprintf( stderr, " id=%d %ld messages %ld good %ld bad\n", control->id, count, good, bad ); + + control->state = bad > 0 ? -1 : 0; // set to indicate done and <0 to indicate some failure + control->state += count < control->n2get ? -2 : 0; + return NULL; +} + +int main( int argc, char** argv ) { + void* mrc; // msg router context + rmr_mbuf_t* rbuf = NULL; // received on 'normal' flow + struct epoll_event events[1]; // list of events to give to epoll + struct epoll_event epe; // event definition for event to listen to + int ep_fd = -1; // epoll's file des (given to epoll_wait) + char* listen_port = "43086"; + long timeout = 0; // time the main thread will pop if listeners have not returned + int delay = 30; // max time to wait for n messages + int nmsgs = 10; // number of messages to expect + int nthreads = 3; // default number of listener threads + tdata_t* cvs; // vector of control blocks + int i; + pthread_t* pt_info; // thread stuff + int failures = 0; + + if( argc > 1 ) { + nmsgs = atoi( argv[1] ); + } + if( argc > 2 ) { + delay = atoi( argv[2] ); + } + if( argc > 3 ) { + nthreads = atoi( argv[3] ); + } + if( argc > 4 ) { + listen_port = argv[4]; + } + + fprintf( stderr, " listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay ); + + if( (mrc = rmr_init( listen_port, 1400, 0 )) == NULL ) { + fprintf( stderr, " unable to initialise RMr\n" ); + exit( 1 ); + } + + rmr_init_trace( mrc, TRACE_SIZE ); + + cvs = malloc( sizeof( tdata_t ) * nthreads ); + pt_info = malloc( sizeof( pthread_t ) * nthreads ); + if( cvs == NULL ) { + fprintf( stderr, " unable to allocate control vector\n" ); + exit( 1 ); + } + + timeout = time( NULL ) + 20; // give rmr 20s to find the route table (shouldn't need that much) + while( ! rmr_ready( mrc ) ) { // must have a route table before we can send; wait til RMr says it has one + fprintf( stderr, " waiting for rmr to show ready\n" ); + sleep( 1 ); + + if( time( NULL ) > timeout ) { + fprintf( stderr, " giving up\n" ); + exit( 1 ); + } + } + fprintf( stderr, " rmr is ready; starting threads\n" ); + + for( i = 0; i < nthreads; i++ ) { + cvs[i].mrc = mrc; + cvs[i].id = i + 2; // we pass this as the call-id to rmr, so must be >1 + cvs[i].delay = delay; + cvs[i].n2get = nmsgs; + cvs[i].state = 1; + + fprintf( stderr, "kicking %d i=%d\n", i+2, i ); + pthread_create( &pt_info[i], NULL, mk_calls, &cvs[i] ); // kick a thread + } + + timeout = time( NULL ) + 300; // wait up to 5 minutes + i = 0; + while( nthreads > 0 ) { + if( cvs[i].state < 1 ) { // states 0 or below indicate done. 0 == good; <0 is failure + nthreads--; + if( cvs[i].state < 0 ) { + failures++; + } + i++; + } + + if( time( NULL ) > timeout ) { + failures += nthreads; + fprintf( stderr, " timeout waiting for threads to finish; %d were not finished\n", nthreads ); + break; + } + + sleep( 1 ); + } + + fprintf( stderr, " [%s] failing threads=%d\n", failures == 0 ? "PASS" : "FAIL", failures ); + rmr_close( mrc ); + + return failures > 0; +} + -- 2.16.6