API and build change and fix summaries. Doc correctsions
and/or changes are not mentioned here; see the commit messages.
+2020 January 20; verison 3.0.1
+ Enable support for dynamic route table updates via RMR session.
+
2020 January 16; version 3.0.0
Introduce support for SI95 transport library to replace NNG.
(RMR library versions will use leading odd numbers to avoid tag collisions
set( major_version "3" ) # should be automatically populated from git tag later, but until CI process sets a tag we use this
set( minor_version "0" )
-set( patch_level "0" )
+set( patch_level "1" )
set( install_root "${CMAKE_INSTALL_PREFIX}" )
set( install_inc "include/rmr" )
call ref to point to all of the various bits and set real len etc,
then we queue it. Raw_msg is expected to include the transport goo
placed in front of the RMR header and payload.
-
- done -- FIX ME?? can we eliminate the buffer copy here?
*/
static void buf2mbuf( uta_ctx_t* ctx, char *raw_msg, int msg_size, int sender_fd ) {
rmr_mbuf_t* mbuf;
}
}
-/*
- if( (mbuf = (rmr_mbuf_t *) malloc( sizeof( *mbuf ))) != NULL ) { // alloc mbuf and point at various bits of payload
- memset( mbuf, 0, sizeof( *mbuf ) );
- mbuf->tp_buf = raw_msg;
- mbuf->ring = ctx->zcb_mring;
-*/
if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) {
mbuf->tp_buf = raw_msg;
mbuf->rts_fd = sender_fd;
- // eliminated :) memcpy( mbuf->tp_buf, river->accum + offset, river->msg_size );
-
ref_tpbuf( mbuf, msg_size ); // point mbuf at bits in the datagram
hdr = mbuf->header; // convenience
if( hdr->flags & HFL_CALL_MSG ) { // call generated message; ignore call-id etc and queue
int need; // bytes needed for something
int i;
- // for speed these checks should be enabled only in debug mode and assume we always get a good context
- if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
- return SI_RET_OK;
- }
-
- if( fd >= ctx->nrivers || fd < 0 ) {
- if( DEBUG ) fprintf( stderr, "[DBUG] callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
- return SI_RET_OK;
+ if( PARINOID_CHECKS ) { // PARINOID mode is slower; off by default
+ if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+ return SI_RET_OK;
+ }
+
+ if( fd >= ctx->nrivers || fd < 0 ) {
+ if( DEBUG ) fprintf( stderr, "[DBUG] callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
+ return SI_RET_OK;
+ }
}
- // -------- end debug checks -----------------
-
if( buflen <= 0 ) {
return SI_RET_OK;
}
/*
fprintf( stderr, "\n>>>>> data callback for %d bytes from %d\n", buflen, fd );
for( i = 0; i < 40; i++ ) {
-fprintf( stderr, "%02x ", (unsigned char) *(buf+i) );
+ fprintf( stderr, "%02x ", (unsigned char) *(buf+i) );
}
fprintf( stderr, "\n" );
*/
This is expected to execute in a separate thread. It is responsible for
_all_ receives and queues them on the appropriate ring, or chute.
It does this by registering the callback function above with the SI world
- and then caling SIwait() to drive the callback when data has arrived.
+ and then calling SIwait() to drive the callback when data has arrived.
The "state" of the message is checked which determines where the message
#include "ring_static.c" // message ring support
#include "rt_generic_static.c" // route table things not transport specific
#include "rtable_si_static.c" // route table things -- transport specific
-#include "rtc_static.c" // route table collector
-#include "rtc_si_static.c" // our private test function
+#include "rtc_si_static.c" // specific RMR only route table collector (SI only for now)
#include "tools_static.c"
#include "sr_si_static.c" // send/receive static functions
#include "wormholes.c" // wormhole api externals and related static functions (must be LAST!)
char wbuf[1024]; // work buffer
char* tok; // pointer at token in a buffer
char* tok2;
+ int static_rtc = 0; // if rtg env var is < 1, then we set and don't listen on a port
int state;
int i;
if( ! announced ) {
- fprintf( stderr, "[INFO] ric message routing library on SI95 mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
+ fprintf( stderr, "[INFO] ric message routing library on SI95/b mv=%d flg=%02x (%s %s.%s.%s built: %s)\n",
RMR_MSG_VER, flags, QUOTE_DEF(GIT_ID), QUOTE_DEF(MAJOR_VER), QUOTE_DEF(MINOR_VER), QUOTE_DEF(PATCH_VER), __DATE__ );
announced = 1;
}
port = proto_port; // assume something like "1234" was passed
}
+ if( (tok = getenv( "ENV_RTG_PORT" )) != NULL ) { // must check port here -- if < 1 then we just start static file 'listener'
+ if( atoi( tok ) < 1 ) {
+ static_rtc = 1;
+ }
+ }
+
if( (tok = getenv( ENV_SRC_ID )) != NULL ) { // env var overrides what we dig from system
tok = strdup( tok ); // something we can destroy
if( *tok == '[' ) { // we allow an ipv6 address here
return NULL;
}
- if( !(flags & FL_NOTHREAD) ) { // skip if internal function that doesnt need an rtc
- if( pthread_create( &ctx->rtc_th, NULL, rtc_file, (void *) ctx ) ) { // kick the rt collector thread
- fprintf( stderr, "[WRN] rmr_init: unable to start route table collector thread: %s", strerror( errno ) );
+ if( !(flags & FL_NOTHREAD) ) { // skip if internal function that doesnt need a RTC
+ if( static_rtc ) {
+ if( pthread_create( &ctx->rtc_th, NULL, rtc_file, (void *) ctx ) ) { // kick the rt collector thread as just file reader
+ fprintf( stderr, "[WRN] rmr_init: unable to start static route table collector thread: %s", strerror( errno ) );
+ }
+ } else {
+ if( pthread_create( &ctx->rtc_th, NULL, rtc, (void *) ctx ) ) { // kick the real rt collector thread
+ fprintf( stderr, "[WRN] rmr_init: unable to start dynamic route table collector thread: %s", strerror( errno ) );
+ }
}
}
- //fprintf( stderr, ">>>>> starting threaded receiver with ctx=%p si_ctx=%p\n", ctx, ctx->si_ctx );
ctx->flags |= CFL_MTC_ENABLED; // for SI threaded receiver is the only way
if( pthread_create( &ctx->mtc_th, NULL, mt_receive, (void *) ctx ) ) { // so kick it
fprintf( stderr, "[WRN] rmr_init: unable to start multi-threaded receiver: %s", strerror( errno ) );
/*
Mnemonic: rtc_si_static.c
- Abstract: This is a test module to allow the route table to be read
- from a static spot and NOT to attempt to listen for updates
- from some outside source.
+ Abstract: The route table collector is started as a separate pthread and
+ is responsible for listening for route table updates from a
+ route manager or route table generator process.
+
+ This comes from the common src and may be moved back there once
+ it is not necessary to support raw sessions (all route table
+ gen messages are received over rmr channel).
Author: E. Scott Daniels
- Date: 18 October 2019
+ Date: 29 November 2018 (extracted to common 13 March 2019)
+ Imported to si base 17 Jan 2020.
*/
+
#ifndef _rtc_si_staic_c
#define _rtc_si_staic_c
#include <netdb.h>
#include <errno.h>
#include <string.h>
-#include <errno.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>
lseek( vfd, 0, 0 );
read( vfd, wbuf, 10 );
vlevel = atoi( wbuf );
- }
-
+ }
+
read_static_rt( ctx, vlevel ); // seed the route table if one provided
sleep( 60 );
}
}
+
+static int refresh_vlevel( int vfd ) {
+ int vlevel = 0;
+ char rbuf[128];
+
+ if( vfd >= 0 ) { // if file is open, read current value
+ rbuf[0] = 0;
+ lseek( vfd, 0, 0 );
+ read( vfd, rbuf, 10 );
+ vlevel = atoi( rbuf );
+ }
+
+ return vlevel;
+}
+
+/*
+ Route Table Collector
+ A side thread which opens a socket and subscribes to a routing table generator.
+ It may do other things along the way (latency measurements?).
+
+ The pointer is a pointer to the context.
+
+ Listens for records from the route table generation publisher, expecting
+ one of the following, newline terminated, ASCII records:
+ rte|msg-type||]name:port,name:port,...;name:port,... // route table entry with one or more groups of endpoints
+ new|start // start of new table
+ new|end // end of new table; complete
+
+ Name must be a host name which can be looked up via gethostbyname() (DNS).
+
+ Multiple endpoints (name:port) may be given separated by a comma; an endpoint is selected using round robin
+ for each message of the type that is sent.
+
+ Multiple endpoint groups can be given as a comma separated list of endpoints, separated by semicolons:
+ group1n1:port,group1n2:port,group1n3:port;group2n1:port,group2n2:port
+
+ If multiple groups are given, when send() is called for the cooresponding message type,
+ the message will be sent to one endpoint in each group.
+
+ msg-type is the numeric message type (e.g. 102). If it is given as n,name then it is assumed
+ that the entry applies only to the instance running with the hostname 'name.'
+
+ Buffers received from the route table generator can contain multiple newline terminated
+ records, but each buffer must be less than 4K in length, and the last record in a
+ buffer may NOT be split across buffers.
+
+ Other chores:
+ In addition to the primary task of getting, vetting, and installing a new route table, or
+ updates to the existing table, this thread will periodically cause the send counts for each
+ endpoint known to be written to standard error. The frequency is once every 180 seconds, and
+ more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
+*/
+static void* rtc( void* vctx ) {
+ uta_ctx_t* ctx; // context user has -- where we pin the route table
+ uta_ctx_t* pvt_cx; // private context for session with rtg
+ rmr_mbuf_t* msg = NULL; // message from rtg
+ char* payload; // payload in the message
+ size_t mlen;
+ size_t clen; // length to copy and mark
+ char* port; // a port number we listen/connect to
+ char* fport; // pointer to the real buffer to free
+ size_t buf_size; // nng needs var pointer not just size?
+ char* nextr; // pointer at next record in the message
+ char* curr; // current record
+ int i;
+ long blabber = 0; // time of last blabber so we don't flood if rtg goes bad
+ int cstate = -1; // connection state to rtg
+ int state; // processing state of some nng function
+ char* tokens[128];
+ char wbuf[128];
+ char* pbuf = NULL;
+ int pbuf_size = 0; // number allocated in pbuf
+ int ntoks;
+ int raw_interface = 0; // rtg is using raw NNG/Nano not RMr to send updates
+ int vfd = -1; // verbose file des if we have one
+ int vlevel = 0; // how chatty we should be 0== no nattering allowed
+ char* eptr;
+ int epfd = -1; // fd for epoll so we can multi-task
+ struct epoll_event events[1]; // list of events to give to epoll; we only have one we care about
+ struct epoll_event epe; // event definition for event to listen to
+ int count_delay = 30; // number of seconds between writing count info; initially every 30s
+ int bump_freq = 0; // time at which we will bump count frequency to every 5 minutes
+
+
+ if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+ fprintf( stderr, "[CRI] rmr_rtc: internal mishap: context passed in was nil\n" );
+ return NULL;
+ }
+
+ if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
+ vfd = open( eptr, O_RDONLY );
+ vlevel = refresh_vlevel( vfd );
+ }
+
+ read_static_rt( ctx, vlevel ); // seed the route table if one provided
+
+ if( (port = getenv( ENV_RTG_PORT )) == NULL || ! *port ) { // port we need to open to listen for RTG connections
+ port = strdup( DEF_RTG_PORT );
+ } else {
+ port = strdup( port );
+ }
+
+ if( (curr = getenv( ENV_RTG_RAW )) != NULL ) {
+ raw_interface = atoi( curr ) > 0; // if > 0 we assume that rtg messages are NOT coming from an RMr based process
+ }
+
+ fport = port; // must hold to free
+
+ ntoks = uta_tokenise( port, tokens, 120, ':' ); // assume tcp:port, but it could be port or old style host:port
+ switch( ntoks ) {
+ case 1:
+ port = tokens[0]; // just the port
+ break;
+
+ case 2:
+ port = tokens[1]; // tcp:port or :port
+ break;
+
+ default:
+ port = DEF_RTG_PORT; // this shouldn't happen, but parnioia is good
+ break;
+ }
+
+ if( (pvt_cx = init( port, MAX_RTG_MSG_SZ, FL_NOTHREAD )) == NULL ) { // open a private context (no RT listener!)
+ fprintf( stderr, "[CRI] rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
+
+ while( TRUE ) { // no listen port, just dump counts now and then
+ sleep( count_delay );
+ rt_epcounts( ctx->rtable, ctx->my_name );
+ }
+
+ free( fport ); // parinoid free and return
+ return NULL;
+ }
+
+ if( DEBUG ) fprintf( stderr, "[DBUG] rtc thread is running and listening; listening for rtg conns on %s\n", port );
+ free( fport );
+
+ // future: if we need to register with the rtg, then build a message and send it through a wormhole here
+
+ bump_freq = time( NULL ) + 300; // after 5 minutes we decrease the count frequency
+ blabber = 0;
+ while( 1 ) { // until the cows return, pigs fly, or somesuch event likely not to happen
+ while( msg == NULL || msg->len <= 0 ) { // until we actually have something from the other side
+ msg = rmr_torcv_msg( pvt_cx, msg, 1000 );
+
+ if( time( NULL ) > blabber ) {
+ vlevel = refresh_vlevel( vfd );
+ if( vlevel >= 0 ) { // allow it to be forced off with -n in verbose file
+ blabber = time( NULL ) + count_delay; // set next time to blabber, then do so
+ if( blabber > bump_freq ) {
+ count_delay = 300;
+ }
+ rt_epcounts( ctx->rtable, ctx->my_name );
+ }
+ }
+ }
+
+ vlevel = refresh_vlevel( vfd ); // ensure it's fresh when we get a message
+
+ if( msg != NULL && msg->len > 0 ) {
+ payload = msg->payload;
+ mlen = msg->len; // usable bytes in the payload
+ if( vlevel > 1 ) {
+ fprintf( stderr, "[DBUG] rmr_rtc: received rt message; %d bytes (%s)\n", (int) mlen, msg->payload );
+ } else {
+ if( DEBUG > 1 || (vlevel > 0) ) fprintf( stderr, "[DBUG] rmr_rtc: received rt message; %d bytes\n", (int) mlen );
+ }
+
+ if( pbuf_size <= mlen ) {
+ if( pbuf ) {
+ free( pbuf );
+ }
+ if( mlen < 512 ) {
+ pbuf_size = 512;
+ } else {
+ pbuf_size = mlen * 2;
+ }
+ pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
+ }
+ memcpy( pbuf, payload, mlen );
+ pbuf[mlen] = 0; // don't depend on sender making this a legit string
+
+ curr = pbuf;
+ while( curr ) { // loop over each record in the buffer
+ nextr = strchr( curr, '\n' ); // allow multiple newline records, find end of current and mark
+
+ if( nextr ) {
+ *(nextr++) = 0;
+ }
+
+ if( vlevel > 1 ) {
+ fprintf( stderr, "[DBUG] rmr_rtc: processing (%s)\n", curr );
+ }
+ parse_rt_rec( ctx, curr, vlevel ); // parse record and add to in progress table
+
+ curr = nextr;
+ }
+
+ if( ctx->shutdown ) { // mostly for testing, but allows user app to close us down if rmr_*() function sets this
+ break;
+ }
+
+ msg->len = 0; // force back into the listen loop
+ }
+ }
+
+ return NULL; // unreachable, but some compilers don't see that and complain.
+}
+
+
#endif
status = SInewsession( gptr, tpptr ); // accept connection
} else { // data received on a regular port (we support just tcp now
status = RECV( fd, gptr->rbuf, MAX_RBUF, 0 ); // read data
+ //fprintf( stderr, ">>>>> wait popped status =%d\n", status );
if( status > 0 && ! (tpptr->flags & TPF_DRAIN) ) {
if( (cbptr = gptr->cbtab[SI_CB_CDATA].cbrtn) != NULL ) {
status = (*cbptr)( gptr->cbtab[SI_CB_CDATA].cbdata, fd, gptr->rbuf, status );
uta_mhdr_t* hdr; // convenience pointer
int tr_len; // trace data len (default or override)
int* alen; // convenience pointer to set allocated len
- //int tpb_len; // transport buffer total len
-static int logged = 0;
tr_len = trlo > 0 ? trlo : ctx->trace_data_len;
int ver;
int hlen; // header len to use for a truncation check
- msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN; // FIX ME: hard 50 needs to be some kind of tp header struct
-
- // do NOT reduce alen any more. alen must be TP_HEADER + RMR_HEADER + user space
- // get payload size will do the right thing and subtract TP_HEADER and RMR_HEADER lengths
- //alen -= 50; // actual length of "rmr space"
+ msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
v1hdr = (uta_v1mhdr_t *) msg->header; // v1 will always allow us to suss out the version
--- /dev/null
+// :vim ts=4 sw=4 noet:
+/*
+==================================================================================
+ Copyright (c) 2019 Nokia
+ Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+*/
+
+/*
+ Mnemonic: mt_listener.c
+ Abstract: This simple application runs multiple "listener" threads. Each thread
+ receives from a single RMR context to validate the ability spin
+ several listening threads in an application.
+
+ Message format is:
+ ck1 ck2|<msg-txt> @ tid<nil>
+
+ Ck1 is the simple check sum of the msg-text (NOT includeing <nil>)
+ Ck2 is the simple check sum of the trace data which is a nil terminated
+ series of bytes.
+ tid is the thread id assigned by the main thread.
+
+ Parms: argv[1] == number of msgs to send (10)
+ argv[2] == delay (mu-seconds, 1000000 default)
+ argv[3] == number of threads (3)
+ argv[4] == listen port
+
+ Sender will send for at most 20 seconds, so if nmsgs and delay extend
+ beyond that period the total number of messages sent will be less
+ than n.
+
+ Date: 18 April 2019
+ Author: E. Scott Daniels
+*/
+
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/epoll.h>
+#include <time.h>
+#include <pthread.h>
+
+
+#include <rmr/rmr.h>
+#include "time_tools.c" // our time based test tools
+
+#define TRACE_SIZE 40 // bytes in header to provide for trace junk
+#define WBUF_SIZE 2048
+
+/*
+ Thread data
+*/
+typedef struct tdata {
+ int id; // the id we'll pass to RMr mt-call function NOT the thread id
+ int n2get; // number of messages to expect
+ int delay; // max delay waiting for n2get messages
+ void* mrc; // RMr context
+ int state;
+} tdata_t;
+
+
+
+// --------------------------------------------------------------------------------
+
+
+static int sum( char* str ) {
+ int sum = 0;
+ int i = 0;
+
+ while( *str ) {
+ sum += *(str++) + i++;
+ }
+
+ return sum % 255;
+}
+
+/*
+ Split the message at the first sep and return a pointer to the first
+ character after.
+*/
+static char* split( char* str, char sep ) {
+ char* s;
+
+ s = strchr( str, sep );
+
+ if( s ) {
+ return s+1;
+ }
+
+ //fprintf( stderr, "<RCVR> no pipe in message: (%s)\n", str );
+ return NULL;
+}
+
+/*
+ Executed as a thread, this puppy will listen for messages and report
+ what it receives.
+*/
+static void* mk_calls( void* data ) {
+ tdata_t* control;
+ rmr_mbuf_t* msg = NULL; // message
+ int* count_bins = NULL;
+ char* wbuf = NULL;
+ char buf2[128];
+ int i;
+ int state = 0;
+ char* msg_data; // bits after checksum info in payload
+ long good = 0; // counters
+ long bad = 0;
+ long bad_tr = 0;
+ long count = 0; // total msgs received
+ struct timespec start_ts;
+ struct timespec end_ts;
+ int elap; // elapsed time to receive messages
+ time_t timeout;
+
+ count_bins = (int *) malloc( sizeof( int ) * 11 );
+
+ wbuf = (char *) malloc( sizeof( char ) * WBUF_SIZE );
+
+ if( (control = (tdata_t *) data) == NULL ) {
+ fprintf( stderr, "thread data was nil; bailing out\n" );
+ }
+ fprintf( stderr, "<THRD> id=%d thread receiver started expecting=%d messages timeout=%d seconds\n",
+ control->id, control->n2get, control->delay );
+
+ timeout = time( NULL ) + control->delay; // max time to wait for a good message
+ while( count < control->n2get ) { // wait for n messages -- no timeout
+ msg = rmr_torcv_msg( control->mrc, msg, 1000 ); // pop after ~1 second
+
+ if( msg ) {
+ //fprintf( stderr, "<THRD> id=%d got type=%d state=%s msg=(%s)\n", control->id, msg->mtype, msg->state == RMR_OK ? "OK" : "timeout", msg->payload );
+ if( msg->state == RMR_OK ) {
+ if( good == 0 ) { // mark time of first good message
+ set_time( &start_ts );
+ }
+ set_time( &end_ts ); // mark the time of last good message
+
+ if( (msg_data = split( msg->payload, '|' )) != NULL ) {
+ if( sum( msg_data ) == atoi( (char *) msg->payload ) ) {
+ good++;
+ } else {
+ fprintf( stderr, "<RCVR> chk sum bad: computed=%d expected;%d (%s)\n", sum( msg_data ),
+ atoi( msg->payload ), msg_data );
+ bad++;
+ }
+
+ if( (msg_data = split( msg->payload, ' ' )) != NULL ) { // data will point to the chksum for the trace data
+ state = rmr_get_trace( msg, wbuf, 1024 ); // should only copy upto the trace size; we'll check that
+ if( state > 128 || state < 0 ) {
+ fprintf( stderr, "trace data size listed unexpectedly long: %d\n", state );
+ } else {
+ if( state && sum( wbuf ) != atoi( msg_data ) ) {
+ fprintf( stderr, "<RCVR> trace chk sum bad: computed=%d expected;%d len=%d (%s)\n", sum( wbuf ),
+ atoi( msg_data ), state, wbuf );
+ bad_tr++;
+ }
+ }
+ }
+ } else {
+ good++; // nothing to check, assume good
+ }
+ count++;
+
+ if( msg->mtype >= 0 && msg->mtype <= 10 ) {
+ count_bins[msg->mtype]++;
+ }
+ }
+ } else {
+ fprintf( stderr, "<THRD> id=%d timeout with nil msg\n", control->id );
+ }
+
+ if( time( NULL ) > timeout ) {
+ fprintf( stderr, "<THRD> id=%d timeout before receiving %d messages\n", control->id, control->n2get );
+ break;
+ }
+ }
+ elap = elapsed( &start_ts, &end_ts, ELAP_MS );
+ if( elap > 0 ) {
+ fprintf( stderr, "<THRD> id=%d received %ld messages in %d ms rate = %ld msg/sec\n", control->id, count, elap, (count/elap)*1000 );
+ } else {
+ fprintf( stderr, "<THRD> id=%d runtime too short to compute received rate\n", control->id );
+ }
+
+ snprintf( wbuf, WBUF_SIZE, "<THRD> id=%d histogram: ", control->id ); // build histogram so we can write with one fprintf call
+ for( i = 0; i < 11; i++ ) {
+ snprintf( buf2, sizeof( buf2 ), "%5d ", count_bins[i] );
+ strcat( wbuf, buf2 );
+ }
+ fprintf( stderr, "%s\n", wbuf );
+
+ fprintf( stderr, "<THRD> id=%d %ld messages %ld good %ld bad\n", control->id, count, good, bad );
+
+ control->state = bad > 0 ? -1 : 0; // set to indicate done and <0 to indicate some failure
+ control->state += count < control->n2get ? -2 : 0;
+ return NULL;
+}
+
+int main( int argc, char** argv ) {
+ void* mrc; // msg router context
+ rmr_mbuf_t* rbuf = NULL; // received on 'normal' flow
+ struct epoll_event events[1]; // list of events to give to epoll
+ struct epoll_event epe; // event definition for event to listen to
+ int ep_fd = -1; // epoll's file des (given to epoll_wait)
+ char* listen_port = "43086";
+ long timeout = 0; // time the main thread will pop if listeners have not returned
+ int delay = 30; // max time to wait for n messages
+ int nmsgs = 10; // number of messages to expect
+ int nthreads = 3; // default number of listener threads
+ tdata_t* cvs; // vector of control blocks
+ int i;
+ pthread_t* pt_info; // thread stuff
+ int failures = 0;
+
+ if( argc > 1 ) {
+ nmsgs = atoi( argv[1] );
+ }
+ if( argc > 2 ) {
+ delay = atoi( argv[2] );
+ }
+ if( argc > 3 ) {
+ nthreads = atoi( argv[3] );
+ }
+ if( argc > 4 ) {
+ listen_port = argv[4];
+ }
+
+ fprintf( stderr, "<MTL> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
+
+ if( (mrc = rmr_init( listen_port, 1400, 0 )) == NULL ) {
+ fprintf( stderr, "<MTL> unable to initialise RMr\n" );
+ exit( 1 );
+ }
+
+ rmr_init_trace( mrc, TRACE_SIZE );
+
+ cvs = malloc( sizeof( tdata_t ) * nthreads );
+ pt_info = malloc( sizeof( pthread_t ) * nthreads );
+ if( cvs == NULL ) {
+ fprintf( stderr, "<MTL> unable to allocate control vector\n" );
+ exit( 1 );
+ }
+
+ timeout = time( NULL ) + 20; // give rmr 20s to find the route table (shouldn't need that much)
+ while( ! rmr_ready( mrc ) ) { // must have a route table before we can send; wait til RMr says it has one
+ fprintf( stderr, "<MTL> waiting for rmr to show ready\n" );
+ sleep( 1 );
+
+ if( time( NULL ) > timeout ) {
+ fprintf( stderr, "<MTL> giving up\n" );
+ exit( 1 );
+ }
+ }
+ fprintf( stderr, "<MTL> rmr is ready; starting threads\n" );
+
+ for( i = 0; i < nthreads; i++ ) {
+ cvs[i].mrc = mrc;
+ cvs[i].id = i + 2; // we pass this as the call-id to rmr, so must be >1
+ cvs[i].delay = delay;
+ cvs[i].n2get = nmsgs;
+ cvs[i].state = 1;
+
+ fprintf( stderr, "kicking %d i=%d\n", i+2, i );
+ pthread_create( &pt_info[i], NULL, mk_calls, &cvs[i] ); // kick a thread
+ }
+
+ timeout = time( NULL ) + 300; // wait up to 5 minutes
+ i = 0;
+ while( nthreads > 0 ) {
+ if( cvs[i].state < 1 ) { // states 0 or below indicate done. 0 == good; <0 is failure
+ nthreads--;
+ if( cvs[i].state < 0 ) {
+ failures++;
+ }
+ i++;
+ }
+
+ if( time( NULL ) > timeout ) {
+ failures += nthreads;
+ fprintf( stderr, "<MTL> timeout waiting for threads to finish; %d were not finished\n", nthreads );
+ break;
+ }
+
+ sleep( 1 );
+ }
+
+ fprintf( stderr, "<MTL> [%s] failing threads=%d\n", failures == 0 ? "PASS" : "FAIL", failures );
+ rmr_close( mrc );
+
+ return failures > 0;
+}
+