Add dynamic route table update to SI95 support 78/2278/2 3.0.1
authorE. Scott Daniels <daniels@research.att.com>
Mon, 20 Jan 2020 15:31:44 +0000 (10:31 -0500)
committerE. Scott Daniels <daniels@research.att.com>
Mon, 20 Jan 2020 20:58:06 +0000 (15:58 -0500)
The initial SI95 support did not include dynamic route table
update capabilities. The library will now support the receipt
of route table generator messages via an RMR session over the
RTG service port.

Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: I02e9f72e049152abd9ef516cccb7705eca4aa7bd

CHANGES
CMakeLists.txt
src/rmr/si/src/mt_call_si_static.c
src/rmr/si/src/rmr_si.c
src/rmr/si/src/rtc_si_static.c
src/rmr/si/src/si95/siwait.c
src/rmr/si/src/sr_si_static.c
test/app_test/mt_listener.c [new file with mode: 0644]

diff --git a/CHANGES b/CHANGES
index 1a46b2a..3ae419a 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -2,6 +2,9 @@
 API and build change  and fix summaries. Doc correctsions
 and/or changes are not mentioned here; see the commit messages.
 
+2020 January 20; verison 3.0.1
+       Enable support for dynamic route table updates via RMR session.
+
 2020 January 16; version 3.0.0
        Introduce support for SI95 transport library to replace NNG.
        (RMR library versions will use leading odd numbers to avoid tag collisions
index 2cff539..d44827a 100644 (file)
@@ -38,7 +38,7 @@ cmake_minimum_required( VERSION 3.5 )
 
 set( major_version "3" )               # should be automatically populated from git tag later, but until CI process sets a tag we use this
 set( minor_version "0" )
-set( patch_level "0" )
+set( patch_level "1" )
 
 set( install_root "${CMAKE_INSTALL_PREFIX}" )
 set( install_inc "include/rmr" )
index 6e2e8aa..fa603ff 100644 (file)
@@ -54,8 +54,6 @@ static inline void queue_normal( uta_ctx_t* ctx, rmr_mbuf_t* mbuf ) {
        call ref to point to all of the various bits and set real len etc,
        then we queue it.  Raw_msg is expected to include the transport goo
        placed in front of the RMR header and payload.
-
-       done -- FIX ME?? can we eliminate the buffer copy here?
 */
 static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd ) {
        rmr_mbuf_t*             mbuf;
@@ -70,18 +68,10 @@ static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd
                }
        }
 
-/*
-       if( (mbuf = (rmr_mbuf_t *) malloc( sizeof( *mbuf ))) != NULL ) {                // alloc mbuf and point at various bits of payload
-               memset( mbuf, 0, sizeof( *mbuf ) );
-               mbuf->tp_buf = raw_msg;
-               mbuf->ring = ctx->zcb_mring;
-*/
        if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) {
                mbuf->tp_buf = raw_msg;
                mbuf->rts_fd = sender_fd;
 
-               // eliminated :)   memcpy( mbuf->tp_buf, river->accum + offset, river->msg_size );
-
                ref_tpbuf( mbuf, msg_size );                            // point mbuf at bits in the datagram
                hdr = mbuf->header;                                                     // convenience
                if( hdr->flags & HFL_CALL_MSG ) {                       // call generated message; ignore call-id etc and queue
@@ -124,18 +114,17 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
        int                             need;                   // bytes needed for something
        int                             i;
 
-       // for speed these checks should be enabled only in debug mode and assume we always get a good context
-       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
-               return SI_RET_OK;
-       }
-
-       if( fd >= ctx->nrivers || fd < 0 ) {
-               if( DEBUG ) fprintf( stderr, "[DBUG] callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
-               return SI_RET_OK;
+       if( PARINOID_CHECKS ) {                                                                 // PARINOID mode is slower; off by default
+               if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+                       return SI_RET_OK;
+               }
+       
+               if( fd >= ctx->nrivers || fd < 0 ) {
+                       if( DEBUG ) fprintf( stderr, "[DBUG] callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
+                       return SI_RET_OK;
+               }
        }
 
-       // -------- end debug checks -----------------
-
        if( buflen <= 0 ) {
                return SI_RET_OK;
        }
@@ -159,7 +148,7 @@ static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
 /*
 fprintf( stderr, "\n>>>>> data callback for %d bytes from %d\n", buflen, fd );
 for( i = 0; i < 40; i++ ) {
-fprintf( stderr, "%02x ", (unsigned char) *(buf+i) );
+       fprintf( stderr, "%02x ", (unsigned char) *(buf+i) );
 }
 fprintf( stderr, "\n" );
 */
@@ -226,7 +215,7 @@ fprintf( stderr, "\n" );
        This is expected to execute in a separate thread. It is responsible for
        _all_ receives and queues them on the appropriate ring, or chute.
        It does this by registering the callback function above with the SI world
-       and then caling SIwait() to drive the callback when data has arrived.
+       and then calling SIwait() to drive the callback when data has arrived.
 
 
        The "state" of the message is checked which determines where the message
index b1bfd1f..8e499c1 100644 (file)
@@ -65,8 +65,7 @@
 #include "ring_static.c"                       // message ring support
 #include "rt_generic_static.c"         // route table things not transport specific
 #include "rtable_si_static.c"          // route table things -- transport specific
-#include "rtc_static.c"                                // route table collector
-#include "rtc_si_static.c"                     // our private test function
+#include "rtc_si_static.c"                     // specific RMR only route table collector (SI only for now)
 #include "tools_static.c"
 #include "sr_si_static.c"                      // send/receive static functions
 #include "wormholes.c"                         // wormhole api externals and related static functions (must be LAST!)
@@ -535,11 +534,12 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
        char    wbuf[1024];                                     // work buffer
        char*   tok;                                            // pointer at token in a buffer
        char*   tok2;
+       int             static_rtc = 0;                         // if rtg env var is < 1, then we set and don't listen on a port
        int             state;
        int             i;
 
        if( ! announced ) {
-               fprintf( stderr, "[INFO] ric message routing library on SI95 mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
+               fprintf( stderr, "[INFO] ric message routing library on SI95/b mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
                        RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
                announced = 1;
        }
@@ -600,6 +600,12 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
                port = proto_port;                      // assume something like "1234" was passed
        }
 
+       if( (tok = getenv( "ENV_RTG_PORT" )) != NULL ) {                                // must check port here -- if < 1 then we just start static file 'listener'
+               if( atoi( tok ) < 1 ) {
+                       static_rtc = 1;
+               }
+       }
+
        if( (tok = getenv( ENV_SRC_ID )) != NULL ) {                                                    // env var overrides what we dig from system
                tok = strdup( tok );                                    // something we can destroy
                if( *tok == '[' ) {                                             // we allow an ipv6 address here
@@ -665,13 +671,18 @@ static void* init(  char* uproto_port, int max_msg_size, int flags ) {
                return NULL;
        }
 
-       if( !(flags & FL_NOTHREAD) ) {                                                                                          // skip if internal function that doesnt need an rtc
-               if( pthread_create( &ctx->rtc_th,  NULL, rtc_file, (void *) ctx ) ) {   // kick the rt collector thread
-                       fprintf( stderr, "[WRN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
+       if( !(flags & FL_NOTHREAD) ) {                                                                                          // skip if internal function that doesnt need a RTC
+               if( static_rtc ) {
+                       if( pthread_create( &ctx->rtc_th,  NULL, rtc_file, (void *) ctx ) ) {   // kick the rt collector thread as just file reader
+                               fprintf( stderr, "[WRN] rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
+                       }
+               } else {
+                       if( pthread_create( &ctx->rtc_th,  NULL, rtc, (void *) ctx ) ) {        // kick the real rt collector thread
+                               fprintf( stderr, "[WRN] rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
+                       }
                }
        }
 
-       //fprintf( stderr, ">>>>> starting threaded receiver with ctx=%p si_ctx=%p\n", ctx, ctx->si_ctx );
        ctx->flags |= CFL_MTC_ENABLED;                                                                                          // for SI threaded receiver is the only way
        if( pthread_create( &ctx->mtc_th,  NULL, mt_receive, (void *) ctx ) ) {         // so kick it
                fprintf( stderr, "[WRN] rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
index 76fea79..193672f 100644 (file)
 
 /*
        Mnemonic:       rtc_si_static.c
-       Abstract:       This is a test module to allow the route table to be read
-                               from a static spot and NOT to attempt to listen for updates
-                               from some outside source.
+       Abstract:       The route table collector is started as a separate pthread and
+                               is responsible for listening for route table updates from a
+                               route manager or route table generator process.
+
+                               This comes from the common src and may be moved back there once
+                               it is not necessary to support raw sessions (all route table
+                               gen messages are received over rmr channel).
 
        Author:         E. Scott Daniels
-       Date:           18 October 2019
+       Date:           29 November 2018 (extracted to common 13 March 2019)
+                               Imported to si base 17 Jan 2020.
 */
 
+
 #ifndef _rtc_si_staic_c
 #define _rtc_si_staic_c
 
@@ -36,7 +42,6 @@
 #include <netdb.h>
 #include <errno.h>
 #include <string.h>
-#include <errno.h>
 #include <fcntl.h>
 #include <sys/types.h>
 #include <sys/stat.h>
@@ -69,12 +74,223 @@ static void* rtc_file( void* vctx ) {
                        lseek( vfd, 0, 0 );
                        read( vfd, wbuf, 10 );
                        vlevel = atoi( wbuf );
-               }                
-       
+               }
+
                read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
 
                sleep( 60 );
        }
 
 }
+
+static int refresh_vlevel( int vfd ) {
+       int vlevel = 0;
+       char    rbuf[128];
+
+       if( vfd >= 0 ) {                                        // if file is open, read current value
+               rbuf[0] = 0;
+               lseek( vfd, 0, 0 );
+               read( vfd, rbuf, 10 );
+               vlevel = atoi( rbuf );
+       }
+
+       return vlevel;
+}
+
+/*
+       Route Table Collector
+       A side thread which opens a socket and subscribes to a routing table generator.
+       It may do other things along the way (latency measurements?).
+
+       The pointer is a pointer to the context.
+
+       Listens for records from the route table generation publisher, expecting
+       one of the following, newline terminated, ASCII records:
+               rte|msg-type||]name:port,name:port,...;name:port,...                    // route table entry with one or more groups of endpoints
+               new|start                                                               // start of new table
+               new|end                                                                 // end of new table; complete
+
+               Name must be a host name which can be looked up via gethostbyname() (DNS).
+
+               Multiple endpoints (name:port) may be given separated by a comma; an endpoint is selected using round robin
+                       for each message of the type that is sent.
+
+               Multiple endpoint groups can be given as a comma separated list of endpoints, separated by semicolons:
+                               group1n1:port,group1n2:port,group1n3:port;group2n1:port,group2n2:port
+
+               If multiple groups are given, when send() is called for the cooresponding message type,
+               the message will be sent to one endpoint in each group.
+
+               msg-type is the numeric message type (e.g. 102). If it is given as n,name then it is assumed
+               that the entry applies only to the instance running with the hostname 'name.'
+
+       Buffers received from the route table generator can contain multiple newline terminated
+       records, but each buffer must be less than 4K in length, and the last record in a
+       buffer may NOT be split across buffers.
+
+       Other chores:
+       In addition to the primary task of getting, vetting, and installing a new route table, or
+       updates to the existing table, this thread will periodically cause the send counts for each
+       endpoint known to be written to standard error. The frequency is once every 180 seconds, and
+       more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
+*/
+static void* rtc( void* vctx ) {
+       uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
+       uta_ctx_t*      pvt_cx;                                 // private context for session with rtg
+       rmr_mbuf_t*     msg = NULL;                             // message from rtg
+       char*   payload;                                        // payload in the message
+       size_t  mlen;
+       size_t  clen;                                           // length to copy and mark
+       char*   port;                                           // a port number we listen/connect to
+       char*   fport;                                          // pointer to the real buffer to free
+       size_t  buf_size;                                       // nng needs var pointer not just size?
+       char*   nextr;                                          // pointer at next record in the message
+       char*   curr;                                           // current record
+       int     i;
+       long    blabber = 0;                            // time of last blabber so we don't flood if rtg goes bad
+       int             cstate = -1;                            // connection state to rtg
+       int             state;                                          // processing state of some nng function
+       char*   tokens[128];
+       char    wbuf[128];
+       char*   pbuf = NULL;
+       int             pbuf_size = 0;                          // number allocated in pbuf
+       int             ntoks;
+       int             raw_interface = 0;                      // rtg is using raw NNG/Nano not RMr to send updates
+       int             vfd = -1;                                       // verbose file des if we have one
+       int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
+       char*   eptr;
+       int             epfd = -1;                                      // fd for epoll so we can multi-task
+       struct  epoll_event events[1];          // list of events to give to epoll; we only have one we care about
+       struct  epoll_event epe;                        // event definition for event to listen to
+       int             count_delay = 30;                       // number of seconds between writing count info; initially every 30s
+       int             bump_freq = 0;                          // time at which we will bump count frequency to every 5 minutes
+
+
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               fprintf( stderr, "[CRI] rmr_rtc: internal mishap: context passed in was nil\n" );
+               return NULL;
+       }
+
+       if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
+               vfd = open( eptr, O_RDONLY );
+               vlevel = refresh_vlevel( vfd );
+       }
+
+       read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
+
+       if( (port = getenv( ENV_RTG_PORT )) == NULL || ! *port ) {              // port we need to open to listen for RTG connections
+               port = strdup( DEF_RTG_PORT );
+       } else {
+               port = strdup( port );
+       }
+
+       if( (curr = getenv( ENV_RTG_RAW )) != NULL ) {
+               raw_interface = atoi( curr ) > 0;                               // if > 0 we assume that rtg messages are NOT coming from an RMr based process
+       }
+
+       fport = port;           // must hold to free
+
+       ntoks = uta_tokenise( port, tokens, 120, ':' );                 // assume tcp:port, but it could be port or old style host:port
+       switch( ntoks ) {
+               case 1:
+                               port = tokens[0];                       // just the port
+                               break;
+
+               case 2:
+                               port = tokens[1];                       // tcp:port or :port
+                               break;
+
+               default:
+                               port = DEF_RTG_PORT;            // this shouldn't happen, but parnioia is good
+                               break;
+       }
+
+       if( (pvt_cx = init( port, MAX_RTG_MSG_SZ, FL_NOTHREAD )) == NULL ) {                            // open a private context (no RT listener!)
+               fprintf( stderr, "[CRI] rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
+
+               while( TRUE ) {                                                                                         // no listen port, just dump counts now and then
+                       sleep( count_delay );
+                       rt_epcounts( ctx->rtable, ctx->my_name );
+               }
+
+               free( fport );                                  // parinoid free and return
+               return NULL;
+       }
+
+       if( DEBUG ) fprintf( stderr, "[DBUG] rtc thread is running and listening; listening for rtg conns on %s\n", port );
+       free( fport );
+
+       // future:  if we need to register with the rtg, then build a message and send it through a wormhole here
+
+       bump_freq = time( NULL ) + 300;                         // after 5 minutes we decrease the count frequency
+       blabber = 0;
+       while( 1 ) {                    // until the cows return, pigs fly, or somesuch event likely not to happen
+               while( msg == NULL || msg->len <= 0 ) {                                                 // until we actually have something from the other side
+                       msg = rmr_torcv_msg( pvt_cx, msg, 1000 );
+
+                       if( time( NULL ) > blabber  ) {
+                               vlevel = refresh_vlevel( vfd );
+                               if( vlevel >= 0 ) {                                                                             // allow it to be forced off with -n in verbose file
+                                       blabber = time( NULL ) + count_delay;                           // set next time to blabber, then do so
+                                       if( blabber > bump_freq ) {
+                                               count_delay = 300;
+                                       }
+                                       rt_epcounts( ctx->rtable, ctx->my_name );
+                               }
+                       }
+               }
+
+               vlevel = refresh_vlevel( vfd );                 // ensure it's fresh when we get a message
+
+               if( msg != NULL && msg->len > 0 ) {
+                       payload = msg->payload;
+                       mlen = msg->len;                                        // usable bytes in the payload
+                       if( vlevel > 1 ) {
+                               fprintf( stderr, "[DBUG] rmr_rtc: received rt message; %d bytes (%s)\n", (int) mlen, msg->payload );
+                       } else {
+                               if( DEBUG > 1 || (vlevel > 0) ) fprintf( stderr, "[DBUG] rmr_rtc: received rt message; %d bytes\n", (int) mlen );
+                       }
+
+                       if( pbuf_size <= mlen ) {
+                               if( pbuf ) {
+                                       free( pbuf );
+                               }
+                               if( mlen < 512 ) {
+                                       pbuf_size = 512;
+                               } else {
+                                       pbuf_size = mlen * 2;
+                               }
+                               pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
+                       }
+                       memcpy( pbuf, payload, mlen );
+                       pbuf[mlen] = 0;                                                                         // don't depend on sender making this a legit string
+
+                       curr = pbuf;
+                       while( curr ) {                                                         // loop over each record in the buffer
+                               nextr = strchr( curr, '\n' );                   // allow multiple newline records, find end of current and mark
+
+                               if( nextr ) {
+                                       *(nextr++) = 0;
+                               }
+
+                               if( vlevel > 1 ) {
+                                       fprintf( stderr, "[DBUG] rmr_rtc: processing (%s)\n", curr );
+                               }
+                               parse_rt_rec( ctx, curr, vlevel );              // parse record and add to in progress table
+
+                               curr = nextr;
+                       }
+
+                       if( ctx->shutdown ) {           // mostly for testing, but allows user app to close us down if rmr_*() function sets this
+                               break;
+                       }
+
+                       msg->len = 0;                           // force back into the listen loop
+               }
+       }
+
+       return NULL;    // unreachable, but some compilers don't see that and complain.
+}
+
+
 #endif
index b5df75f..d45efa6 100644 (file)
@@ -113,6 +113,7 @@ extern int SIwait( struct ginfo_blk *gptr ) {
                                                                status = SInewsession( gptr, tpptr );                   // accept connection
                                                        } else  {                                                                                       //  data received on a regular port (we support just tcp now
                                                                status = RECV( fd, gptr->rbuf, MAX_RBUF, 0 );   //  read data 
+                                                               //fprintf( stderr, ">>>>> wait popped status =%d\n", status );
                                                                if( status > 0  &&  ! (tpptr->flags & TPF_DRAIN) ) {
                                                                        if( (cbptr = gptr->cbtab[SI_CB_CDATA].cbrtn) != NULL ) {
                                                                                status = (*cbptr)( gptr->cbtab[SI_CB_CDATA].cbdata, fd, gptr->rbuf, status );
index 5c751d7..703e80c 100644 (file)
@@ -116,8 +116,6 @@ static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int s
        uta_mhdr_t*     hdr;                    // convenience pointer
        int                     tr_len;                 // trace data len (default or override)
        int*            alen;                   // convenience pointer to set allocated len
-       //int                   tpb_len;                // transport buffer total len
-static int logged = 0;
 
        tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
 
@@ -243,11 +241,7 @@ static void ref_tpbuf( rmr_mbuf_t* msg, size_t alen )  {
        int ver;
        int     hlen;                                           // header len to use for a truncation check
 
-       msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;                              // FIX ME:  hard 50 needs to be some kind of tp header struct
-
-       // do NOT reduce alen any more.  alen must be TP_HEADER + RMR_HEADER + user space
-       // get payload size will do the right thing and subtract TP_HEADER and RMR_HEADER lengths
-       //alen -= 50;                                           // actual length of "rmr space" 
+       msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
 
        v1hdr = (uta_v1mhdr_t *) msg->header;                                   // v1 will always allow us to suss out the version
 
diff --git a/test/app_test/mt_listener.c b/test/app_test/mt_listener.c
new file mode 100644 (file)
index 0000000..7800722
--- /dev/null
@@ -0,0 +1,305 @@
+// :vim ts=4 sw=4 noet:
+/*
+==================================================================================
+       Copyright (c) 2019 Nokia
+       Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+          http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+       Mnemonic:       mt_listener.c
+       Abstract:       This simple application runs multiple "listener" threads. Each thread
+                               receives from a single RMR context to validate the ability spin
+                               several listening threads in an application.
+
+                               Message format is:
+                                       ck1 ck2|<msg-txt> @ tid<nil>
+
+                               Ck1 is the simple check sum of the msg-text (NOT includeing <nil>)
+                               Ck2 is the simple check sum of the trace data which is a nil terminated
+                               series of bytes.
+                               tid is the thread id assigned by the main thread.
+
+                               Parms:  argv[1] == number of msgs to send (10)
+                                               argv[2] == delay                (mu-seconds, 1000000 default)
+                                               argv[3] == number of threads (3)
+                                               argv[4] == listen port
+
+                               Sender will send for at most 20 seconds, so if nmsgs and delay extend
+                               beyond that period the total number of messages sent will be less
+                               than n.
+
+       Date:           18 April 2019
+       Author:         E. Scott Daniels
+*/
+
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/epoll.h>
+#include <time.h>
+#include <pthread.h>
+
+
+#include <rmr/rmr.h>
+#include "time_tools.c"                // our time based test tools
+
+#define TRACE_SIZE 40          // bytes in header to provide for trace junk
+#define WBUF_SIZE 2048
+
+/*
+       Thread data
+*/
+typedef struct tdata {
+       int     id;                                     // the id we'll pass to RMr mt-call function NOT the thread id
+       int n2get;                              // number of messages to expect
+       int delay;                              // max delay waiting for n2get messages
+       void* mrc;                              // RMr context
+       int     state;
+} tdata_t;
+
+
+
+// --------------------------------------------------------------------------------
+
+
+static int sum( char* str ) {
+       int sum = 0;
+       int i = 0;
+
+       while( *str ) {
+               sum += *(str++) + i++;
+       }
+
+       return sum % 255;
+}
+
+/*
+       Split the message at the first sep and return a pointer to the first
+       character after.
+*/
+static char* split( char* str, char sep ) {
+       char*   s;
+
+       s = strchr( str, sep );
+
+       if( s ) {
+               return s+1;
+       }
+
+       //fprintf( stderr, "<RCVR> no pipe in message: (%s)\n", str );
+       return NULL;
+}
+
+/*
+       Executed as a thread, this puppy will listen for messages and report
+       what it receives.
+*/
+static void* mk_calls( void* data ) {
+       tdata_t*        control;
+       rmr_mbuf_t*             msg = NULL;                                     // message
+       int*    count_bins = NULL;
+       char*   wbuf = NULL;
+       char    buf2[128];
+       int             i;
+       int             state = 0;
+       char*   msg_data;                                               // bits after checksum info in payload
+       long    good = 0;                                               // counters
+       long    bad = 0;
+       long    bad_tr = 0;
+       long    count = 0;                                              // total msgs received
+       struct timespec start_ts;
+       struct timespec end_ts;
+       int             elap;                                                   // elapsed time to receive messages
+       time_t  timeout;
+
+       count_bins = (int *) malloc( sizeof( int ) * 11 );
+
+       wbuf = (char *) malloc( sizeof( char ) * WBUF_SIZE );
+
+       if( (control  = (tdata_t *) data) == NULL ) {
+               fprintf( stderr, "thread data was nil; bailing out\n" );
+       }
+       fprintf( stderr, "<THRD> id=%d thread receiver started expecting=%d messages timeout=%d seconds\n", 
+               control->id, control->n2get, control->delay );
+
+       timeout = time( NULL ) + control->delay;                        // max time to wait for a good message
+       while( count < control->n2get ) {               // wait for n messages -- no timeout
+               msg = rmr_torcv_msg( control->mrc, msg, 1000 );                         //  pop after ~1 second
+
+               if( msg ) {
+                       //fprintf( stderr, "<THRD> id=%d got type=%d state=%s msg=(%s)\n", control->id, msg->mtype, msg->state == RMR_OK ? "OK" : "timeout", msg->payload );
+                       if( msg->state == RMR_OK ) {
+                               if( good == 0 ) {                               // mark time of first good message
+                                       set_time( &start_ts );
+                               }
+                               set_time( &end_ts );            // mark the time of last good message
+
+                               if( (msg_data = split( msg->payload, '|'  )) != NULL ) {
+                                       if( sum( msg_data ) == atoi( (char *) msg->payload ) ) {
+                                               good++;
+                                       } else {
+                                               fprintf( stderr, "<RCVR> chk sum bad: computed=%d expected;%d (%s)\n", sum( msg_data ), 
+                                                       atoi( msg->payload ), msg_data );
+                                               bad++;
+                                       }
+
+                                       if( (msg_data = split( msg->payload, ' ' )) != NULL ) {                 // data will point to the chksum for the trace data
+                                               state = rmr_get_trace( msg, wbuf, 1024 );                               // should only copy upto the trace size; we'll check that
+                                               if( state > 128 || state < 0 ) {
+                                                       fprintf( stderr, "trace data size listed unexpectedly long: %d\n", state );
+                                               } else {
+                                                       if( state  &&  sum( wbuf ) != atoi( msg_data ) ) {
+                                                               fprintf( stderr, "<RCVR> trace chk sum bad: computed=%d expected;%d len=%d (%s)\n", sum( wbuf ), 
+                                                                               atoi( msg_data ), state, wbuf );
+                                                               bad_tr++;
+                                                       }
+                                               }
+                                       }
+                               } else {
+                                       good++;         // nothing to check, assume good
+                               }
+                               count++;
+
+                               if( msg->mtype >= 0 && msg->mtype <= 10 ) {
+                                       count_bins[msg->mtype]++;
+                               }
+                       }
+               } else {
+                       fprintf( stderr, "<THRD> id=%d timeout with nil msg\n", control->id );
+               }
+
+               if( time( NULL ) > timeout ) {
+                       fprintf( stderr, "<THRD> id=%d timeout before receiving %d messages\n", control->id, control->n2get );
+                       break;
+               }
+       }
+       elap = elapsed( &start_ts, &end_ts, ELAP_MS );
+       if( elap > 0 ) {
+               fprintf( stderr, "<THRD> id=%d received %ld messages in %d ms rate = %ld msg/sec\n", control->id, count, elap, (count/elap)*1000 );
+       } else {
+               fprintf( stderr, "<THRD> id=%d runtime too short to compute received rate\n", control->id );
+       }
+
+       snprintf( wbuf, WBUF_SIZE, "<THRD> id=%d histogram: ", control->id );           // build histogram so we can write with one fprintf call
+       for( i = 0; i < 11; i++ ) {
+               snprintf( buf2, sizeof( buf2 ), "%5d ", count_bins[i] );
+               strcat( wbuf, buf2 );
+       }
+       fprintf( stderr, "%s\n", wbuf );
+
+       fprintf( stderr, "<THRD> id=%d %ld messages %ld good %ld bad\n", control->id, count, good, bad );
+
+       control->state = bad > 0 ? -1 : 0;                                              // set to indicate done and <0 to indicate some failure
+       control->state += count < control->n2get ? -2 : 0;
+       return NULL;
+}
+
+int main( int argc, char** argv ) {
+       void* mrc;                                                      // msg router context
+       rmr_mbuf_t*     rbuf = NULL;                            // received on 'normal' flow
+       struct  epoll_event events[1];                  // list of events to give to epoll
+       struct  epoll_event epe;                                // event definition for event to listen to
+       int     ep_fd = -1;                                             // epoll's file des (given to epoll_wait)
+       char*   listen_port = "43086";
+       long    timeout = 0;                                    // time the main thread will pop if listeners have not returned
+       int             delay = 30;                                             // max time to wait for n messages
+       int             nmsgs = 10;                                             // number of messages to expect
+       int             nthreads = 3;                                   // default number of listener threads
+       tdata_t*        cvs;                                            // vector of control blocks
+       int                     i;
+       pthread_t*      pt_info;                                        // thread stuff
+       int     failures = 0;
+
+       if( argc > 1 ) {
+               nmsgs = atoi( argv[1] );
+       }
+       if( argc > 2 ) {
+               delay = atoi( argv[2] );
+       }
+       if( argc > 3 ) {
+               nthreads = atoi( argv[3] );
+       }
+       if( argc > 4 ) {
+               listen_port = argv[4];
+       }
+
+       fprintf( stderr, "<MTL> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
+
+       if( (mrc = rmr_init( listen_port, 1400, 0 )) == NULL ) {
+               fprintf( stderr, "<MTL> unable to initialise RMr\n" );
+               exit( 1 );
+       }
+
+       rmr_init_trace( mrc, TRACE_SIZE );
+
+       cvs = malloc( sizeof( tdata_t ) * nthreads );
+       pt_info = malloc( sizeof( pthread_t ) * nthreads );
+       if( cvs == NULL ) {
+               fprintf( stderr, "<MTL> unable to allocate control vector\n" );
+               exit( 1 );      
+       }
+
+       timeout = time( NULL ) + 20;            // give rmr 20s to find the route table (shouldn't need that much)
+       while( ! rmr_ready( mrc ) ) {           // must have a route table before we can send; wait til RMr says it has one
+               fprintf( stderr, "<MTL> waiting for rmr to show ready\n" );
+               sleep( 1 );
+
+               if( time( NULL ) > timeout ) {
+                       fprintf( stderr, "<MTL> giving up\n" );
+                       exit( 1 );
+               }
+       }
+       fprintf( stderr, "<MTL> rmr is ready; starting threads\n" );
+
+       for( i = 0; i < nthreads; i++ ) {
+               cvs[i].mrc = mrc;
+               cvs[i].id = i + 2;                              // we pass this as the call-id to rmr, so must be >1
+               cvs[i].delay = delay;
+               cvs[i].n2get = nmsgs;
+               cvs[i].state = 1;
+
+               fprintf( stderr, "kicking %d i=%d\n", i+2, i );
+               pthread_create( &pt_info[i], NULL, mk_calls, &cvs[i] );         // kick a thread
+       }
+
+       timeout = time( NULL ) + 300;   // wait up to 5 minutes
+       i = 0;
+       while( nthreads > 0 ) {
+               if( cvs[i].state < 1 ) {                        // states 0 or below indicate done. 0 == good; <0 is failure
+                       nthreads--;
+                       if( cvs[i].state < 0 ) {
+                               failures++;
+                       }
+                       i++;
+               }
+               
+               if( time( NULL ) > timeout ) {
+                       failures += nthreads;
+                       fprintf( stderr, "<MTL> timeout waiting for threads to finish; %d were not finished\n", nthreads );
+                       break;
+               }
+
+               sleep( 1 );
+       }
+
+       fprintf( stderr, "<MTL> [%s] failing threads=%d\n", failures == 0 ? "PASS" : "FAIL",  failures );
+       rmr_close( mrc );
+
+       return failures > 0;
+}
+