test(app): Add test application base 73/73/1
authorE. Scott Daniels <daniels@research.att.com>
Mon, 22 Apr 2019 17:04:10 +0000 (17:04 +0000)
committerE. Scott Daniels <daniels@research.att.com>
Mon, 22 Apr 2019 17:04:10 +0000 (17:04 +0000)
Added two simple test applications (a sender and receiver)
with a driver script to test RMr message passing funcitons.
These apps verify correctness of messages sent at either
high or low message rates.

Change-Id: I0b1b2c752e7c01b1b510cb44582532bc1f276901
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
test/app_test/.gitignore [new file with mode: 0644]
test/app_test/Makefile [new file with mode: 0644]
test/app_test/README [new file with mode: 0644]
test/app_test/receiver.c [new file with mode: 0644]
test/app_test/rt.mask [new file with mode: 0644]
test/app_test/run_app_test.ksh [new file with mode: 0644]
test/app_test/sender.c [new file with mode: 0644]

diff --git a/test/app_test/.gitignore b/test/app_test/.gitignore
new file mode 100644 (file)
index 0000000..f744f2d
--- /dev/null
@@ -0,0 +1,3 @@
+local.rt
+*-
+*.o
diff --git a/test/app_test/Makefile b/test/app_test/Makefile
new file mode 100644 (file)
index 0000000..cd6933e
--- /dev/null
@@ -0,0 +1,57 @@
+#==================================================================================
+#    Copyright (c) 2019 Nokia
+#    Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#==================================================================================
+#
+
+# NOTE:  this makefile assumes that RMr has been built using the directory .build
+#              at the top most repo directory (e.g. ../../.build). It can be changed 
+#              if you need to by adding "build_path=<path>" to the make command line.
+
+.EXPORT_ALL_VARIABLES:
+.ONESHELL:
+.SHELLFLAGS = -e
+SHELL = /bin/ksh
+
+build_path ?= ../../.build
+header_path := $(shell find ../../.build -name 'rmr.h' |head -1 | sed 's!/rmr/.*!!' )
+
+C_INCLUDE_PATH := $(header_path)
+LD_LIBRARY_PATH=$(build_path):$(build_path)/lib
+LIBRARY_PATH = $(LD_LIBRARY_PATH)
+
+# These programmes are designed to test some basic application level functions
+# from the perspective of two communicating processes. 
+
+
+.PHONY: all
+all: sender receiver sender_nano receiver_nano
+
+receiver_nano: receiver.c
+       gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr -lnanomsg -lpthread -lm
+
+receiver: receiver.c
+       gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr_nng -lnng -lpthread -lm
+
+sender_nano: sender.c
+       gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr -lnanomsg -lpthread -lm
+
+sender: sender.c
+       gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr_nng -lnng -lpthread -lm
+
+
+.PHONY: clean
+clean:
+       rm -f sender sender_nano receiver receiver_nano *.o
diff --git a/test/app_test/README b/test/app_test/README
new file mode 100644 (file)
index 0000000..8db94a4
--- /dev/null
@@ -0,0 +1,12 @@
+
+This directory contains various sender/receiver applications that can be
+driven together to test message exchange through the RMr library. 
+The run_app_test.ksh script should support building the library, building
+a sender and receiver, then running a short test to verify.
+
+The basic sender and receiver
+Sender and receiver perform lightweight check sums on the message body 
+and on trace data to verify that the messages are being reeived correctly.
+The receiver acks message type 5, so the reciver should receive acks at
+about the volume of 1/10th the number of messages sent.
+
diff --git a/test/app_test/receiver.c b/test/app_test/receiver.c
new file mode 100644 (file)
index 0000000..ade69b3
--- /dev/null
@@ -0,0 +1,198 @@
+// :vim ts=4 sw=4 noet:
+/*
+==================================================================================
+    Copyright (c) 2019 Nokia
+    Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+       Mnemonic:       rmr_rcvr.c
+       Abstract:       This is a very simple receiver that does nothing but listen
+                               for messages and write stats every so often to the tty.
+
+                               The receiver expects messages which have some trace information
+                               and a message format of:
+                                       ck1 ck2|<msg text><nil>
+
+                               ck1 is a simple checksum of the message text (NOT including the
+                               nil at the end of the string. 
+
+                               ck2 is a simple checksum of the trace data which for the purposes
+                               of testing is assumed to have a terminating nil to keep this simple.
+
+                               Good messages are messages where both computed checksums match
+                               the ck1 and ck2 values. 
+
+                               The receiver will send an 'ack' message back to the sender for
+                               all type 5 messages received.
+
+                               The sender and receiver can be run on the same host/container
+                               or on different hosts. The route table is the key to setting 
+                               things up properly.  See the sender code for rt information.
+
+                               Define these environment variables to have some control:
+                                       RMR_SEED_RT -- path to the static routing table
+                                       RMR_RTG_SVC -- port to listen for RTG connections
+
+       Date:           18 April 2019
+       Author:         E. Scott Daniels
+*/
+
+#include <unistd.h>
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <time.h>
+#include <string.h>
+
+#include <rmr/rmr.h>
+
+static int sum( char* str ) {
+       int sum = 0;
+       int     i = 0;
+
+       while( *str ) {
+               sum += *(str++) + i++;
+       }
+
+       return sum % 255;
+}
+
+/*
+       Split the message at the first sep and return a pointer to the first
+       character after. 
+*/
+static char* split( char* str, char sep ) {
+       char*   s;
+
+       s = strchr( str, sep );
+
+       if( s ) {
+               return s+1;
+       }
+
+       fprintf( stderr, "<RCVR> no pipe in message: (%s)\n", str );
+       return NULL;
+}
+
+int main( int argc, char** argv ) {
+    void* mrc;                                         // msg router context
+    rmr_mbuf_t* msg = NULL;                            // message received
+       int i;
+       int             state;
+       int             errors = 0;
+       char*   listen_port = "4560";
+       long count = 0;                                         // total received
+       long good = 0;                                          // good palyload buffers
+       long bad = 0;                                           // payload buffers which were not correct
+       long bad_tr = 0;                                        // trace buffers that were not correct
+       long timeout = 0;
+       char*   data;
+       char    wbuf[1024];                                     // we'll pull trace data into here
+       int             nmsgs = 10;                                     // number of messages to stop after (argv[1] overrides)
+       int             rt_count = 0;                           // retry count
+       long ack_count = 0;                                     // number of acks sent
+
+       data = getenv( "RMR_RTG_SVC" );
+       if( data == NULL ) {
+               setenv( "RMR_RTG_SVC", "19289", 1 );            // set one that won't collide with the sender if on same host
+       }
+
+       if( argc > 1 ) {
+               nmsgs = atoi( argv[1] );
+       }
+       if( argc > 2 ) {
+               listen_port = argv[2];
+       }
+       
+       fprintf( stderr, "<RCVR> listening on port: %s for a max of %d messages\n", listen_port, nmsgs );
+
+    mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE );      // start your engines!
+       if( mrc == NULL ) {
+               fprintf( stderr, "<RCVR> ABORT:  unable to initialise RMr\n" );
+               exit( 1 );
+       }
+
+       timeout = time( NULL ) + 20;
+       while( ! rmr_ready( mrc ) ) {                                                           // wait for RMr to load a route table
+               fprintf( stderr, "<RCVR> waiting for RMr to show ready\n" );
+               sleep( 1 );
+
+               if( time( NULL ) > timeout ) {
+                       fprintf( stderr, "<RCVR> giving up\n" );
+                       exit( 1 );
+               }
+       }
+       fprintf( stderr, "<RCVR> rmr now shows ready, listening begins\n" );
+
+       timeout = time( NULL ) + 20;
+    while( count < nmsgs ) {
+               msg = rmr_torcv_msg( mrc, msg, 1000 );                          // wait for about 1s so that if sender never starts we eventually escape
+               
+               if( msg ) {
+                       if( msg->state == RMR_OK ) {
+                               if( (data = split( msg->payload, '|'  )) != NULL ) {
+                                       if( sum( data ) == atoi( (char *) msg->payload ) ) {
+                                               good++;
+                                       } else {
+                                               fprintf( stderr, "<RCVR> chk sum bad: computed=%d expected;%d (%s)\n", sum( data ), atoi( msg->payload ), data );
+                                               bad++;
+                                       }
+                               }
+
+                               if( (data = split( msg->payload, ' ' )) != NULL ) {                     // data will point to the chksum for the trace data
+                                       state = rmr_get_trace( msg, wbuf, 1024 );                               // should only copy upto the trace size; we'll check that
+                                       if( state > 128 || state < 1 ) {
+                                               fprintf( stderr, "trace data size listed unexpectedly long: %d\n", state );
+                                       }
+                                       if( sum( wbuf ) != atoi( data ) ) {
+                                               fprintf( stderr, "<RCVR> trace chk sum bad: computed=%d expected;%d len=%d (%s)\n", sum( wbuf ), atoi( data ), state, wbuf );
+                                               bad_tr++;
+                                       }
+                               }
+                               count++;                                                                        // messages received for stats output
+
+                               if( msg->mtype == 5 ) {                                         // send an ack; sender will count but not process, so data in message is moot
+                                       msg = rmr_rts_msg( mrc, msg );                                                          // we don't try to resend if this returns retry
+                                       rt_count = 1000;
+                                       while( rt_count > 0 && msg != NULL && msg->state == RMR_ERR_RETRY ) {           // to work right in nano we need this :(
+                                               if( ack_count < 1 ) {                                                                   // need to connect, so hard wait
+                                                       sleep( 1 );
+                                               }
+                                               rt_count--;
+                                               msg = rmr_rts_msg( mrc, msg );                                                  // we don't try to resend if this returns retry
+                                       }
+                                       if( msg && msg->state == RMR_OK ) {                                                     // if it eventually worked
+                                               ack_count++;
+                                       }
+                               }
+                       }
+               }
+
+               if( time( NULL ) > timeout ) {
+                       fprintf( stderr, "receiver timed out\n" );
+                       errors++;
+                       break;
+               }
+    }
+
+       fprintf( stderr, "<RCVR> [%s] %ld messages;  good=%ld  acked=%ld bad=%ld  bad-trace=%ld\n", !!(errors + bad + bad_tr) ? "FAIL" : "PASS", count, good, ack_count, bad, bad_tr );
+       sleep( 2 );                                                                     // let any outbound acks flow before closing
+
+       rmr_close( mrc );
+       return !!(errors + bad + bad_tr);                       // bad rc if any are !0
+}
+
diff --git a/test/app_test/rt.mask b/test/app_test/rt.mask
new file mode 100644 (file)
index 0000000..80d139f
--- /dev/null
@@ -0,0 +1,21 @@
+
+# This is a 'mask' such that the run command can generate with the 
+# host name for the sender.
+
+newrt|start
+rte|0|localhost:4560
+rte|1|localhost:4560
+rte|2|localhost:4560
+rte|3|localhost:4560
+rte|4|localhost:4560
+rte|5|localhost:4560
+rte|6|localhost:4560
+rte|7|localhost:4560
+rte|8|localhost:4560
+rte|9|localhost:4560
+rte|10|localhost:4560
+rte|11|localhost:4560
+rte|12|localhost:4560
+rte|13|localhost:4560
+rte|999|%%hostname%%:43086
+newrt|end
diff --git a/test/app_test/run_app_test.ksh b/test/app_test/run_app_test.ksh
new file mode 100644 (file)
index 0000000..7ad4934
--- /dev/null
@@ -0,0 +1,156 @@
+#!/usr/bin/env ksh
+# :vi ts=4 sw=4 noet :
+#==================================================================================
+#    Copyright (c) 2019 Nokia
+#    Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#==================================================================================
+#
+
+# ---------------------------------------------------------------------------------
+#      Mnemonic:       run_app_test.ksh
+#      Abstract:       This is a simple script to set up and run the basic send/receive
+#                              processes for some library validation on top of nano/nng.
+#                              It should be possible to clone the repo, switch to this directory
+#                              and execute  'ksh run -B'  which will build RMr, make the sender and
+#                              recevier then  run the basic test.
+#
+#                              Example command line:
+#                                      ksh ./run               # default 10 messages at 1 msg/sec
+#                                      ksh ./run -N    # default but with nanomsg lib
+#                                      ksh ./run -d 100 -n 10000 # send 10k messages with 100ms delay between
+#
+#      Date:           22 April 2019
+#      Author:         E. Scott Daniels
+# ---------------------------------------------------------------------------------
+
+
+# The sender and receiver are run asynch. Their exit statuses are captured in a
+# file in order for the 'main' to pick them up easily.
+#
+function run_sender {
+       if (( $nano_sender ))
+       then
+               ./sender_nano $nmsg $delay
+       else
+               ./sender $nmsg $delay
+       fi
+       echo $? >/tmp/PID$$.src         # must communicate state back via file b/c asynch
+}
+
+function run_rcvr {
+       if (( $nano_receiver ))
+       then
+               ./receiver_nano $nmsg
+       else
+               ./receiver $nmsg
+       fi
+       echo $? >/tmp/PID$$.rrc
+}
+
+# ---------------------------------------------------------
+
+if [[ ! -f local.rt ]]         # we need the real host name in the local.rt; build one from mask if not there
+then
+       hn=$(hostname)
+       sed "s!%%hostname%%!$hn!" rt.mask >local.rt
+fi
+
+nmsg=10                                                # total number of messages to be exchanged (-n value changes)
+delay=1000000                          # microsec sleep between msg 1,000,000 == 1s
+nano_sender=0                          # start nano version if set (-N)
+nano_receiver=0
+wait=1
+rebuild=0
+
+while [[ $1 == -* ]]
+do
+       case $1 in 
+               -B)     rebuild=1;;
+               -d)     delay=$2; shift;;
+               -N)     nano_sender=1
+                       nano_receiver=1
+                       ;;
+               -n)     nmsg=$2; shift;;
+
+               *)      echo "unrecognised option: $1"
+                       echo "usage: $0 [-B] [-d micor-sec-delay] [-N] [-n num-msgs]"
+                       echo "  -B forces a rebuild which will use .build"
+                       exit 1
+                       ;;
+       esac
+
+       shift
+done
+
+if (( rebuild )) 
+then
+       build_path=../../.build
+
+       (
+               set -e
+               mkdir -p $build_path
+               cd ${build_path%/*}                             # cd barfs on ../../.build, so we do this
+               cd ${build_path##*/}
+               cmake ..
+               make package
+       )
+       if (( $? != 0 ))
+       then
+               echo "build failed"
+               exit 1
+       fi
+else
+       build_path=${BUILD_PATH:-"../../.build"}        # we prefer .build at the root level, but allow user option
+
+       if [[ ! -d $build_path ]]
+       then
+               echo "cannot find build in: $build_path"
+               echo "either create, and then build RMr, or set BUILD_PATH as an evironment var before running this"
+               exit 1
+       fi
+fi
+
+export LD_LIBRARY_PATH=$build_path:$build_path/lib
+export LIBRARY_PATH=$LD_LIBRARY_PATH
+export RMR_SEED_RT=${RMR_SEED_RT:-./local.rt}          # allow easy testing with different rt
+
+
+if [[ ! -f ./sender ]]
+then
+       if ! make >/dev/null 2>&1
+       then
+               echo "[FAIL] cannot find sender binary, and cannot make it.... humm?"
+               exit 1
+       fi
+fi
+
+run_rcvr &
+run_sender &
+
+wait
+head -1 /tmp/PID$$.rrc | read rrc
+head -1 /tmp/PID$$.src | read src
+
+if (( !! (src + rrc) ))
+then
+       echo "[FAIL] sender rc=$src  receiver rc=$rrc"
+else
+       echo "[PASS] sender rc=$src  receiver rc=$rrc"
+fi
+
+rm /tmp/PID$$.*
+
+exit $(( !! (src + rrc) ))
+
diff --git a/test/app_test/sender.c b/test/app_test/sender.c
new file mode 100644 (file)
index 0000000..89a0afc
--- /dev/null
@@ -0,0 +1,245 @@
+// :vim ts=4 sw=4 noet:
+/*
+==================================================================================
+    Copyright (c) 2019 Nokia
+    Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+       Mnemonic:       sender.c
+       Abstract:       This is a simple sender which will send a series of messages. 
+                               It is expected that the first attempt(s) will fail if the receiver
+                               is not up and this does not start decrementing the number to
+                               send until it has a good send.  
+
+                               The process will check the receive queue and list received messages
+                               but pass/fail is not dependent on what comes back.
+
+                               If the receiver(s) do not become connectable in 20 sec this process
+                               will give up and fail. 
+
+
+                               Message types will vary between 1 and 10, so the route table must
+                               be set up to support those message types.  
+
+                               Message format is:
+                                       ck1 ck2|<msg-txt><nil>
+
+                               Ck1 is the simple check sum of the msg-text (NOT includeing <nil>)
+                               Ck2 is the simple check sum of the trace data which is a nil terminated
+                               series of bytes. 
+
+                               Parms:  argv[1] == nmsgs; argv[2] == delay; argv[3] == listen port
+
+                               Sender will send for at most 20 seconds, so if nmsgs and delay extend
+                               beyond that period the total number of messages sent will be less
+                               than n.
+
+       Date:           18 April 2019
+       Author:         E. Scott Daniels
+*/
+
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/epoll.h>
+#include <time.h>
+
+#include <rmr/rmr.h>
+
+static int sum( char* str ) {
+       int sum = 0;
+       int     i = 0;
+
+       while( *str ) {
+               sum += *(str++) + i++;
+       }
+
+       return sum % 255;
+}
+
+int main( int argc, char** argv ) {
+    void* mrc;                                                 // msg router context
+    struct epoll_event events[1];                      // list of events to give to epoll
+    struct epoll_event epe;                 // event definition for event to listen to
+    int     ep_fd = -1;                                                // epoll's file des (given to epoll_wait)
+    int rcv_fd;                                                // file des that NNG tickles -- give this to epoll to listen on
+       int nready;                                                             // number of events ready for receive
+       rmr_mbuf_t*             sbuf;                                   // send buffer
+       rmr_mbuf_t*             rbuf;                                   // received buffer
+       int     count = 0;
+       int     rt_count = 0;                                           // number of messages requiring a spin retry
+       int     rcvd_count = 0;
+       char*   listen_port = "43086";
+       int             mtype = 0;
+       int             stats_freq = 100;
+       int             successful = 0;                                 // set to true after we have a successful send
+       char    wbuf[1024];
+       char    trace[1024];
+       long    timeout = 0;
+       int             delay = 100000;                                 // usec between send attempts
+       int             nmsgs = 10;                                             // number of messages to send
+
+       if( argc > 1 ) {
+               nmsgs = atoi( argv[1] );
+       }
+       if( argc > 2 ) {
+               delay = atoi( argv[2] );
+       }
+       if( argc > 3 ) {
+               listen_port = argv[3];
+       }
+
+       fprintf( stderr, "<SNDR> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
+
+    if( (mrc = rmr_init( listen_port, 1400, RMRFL_NONE )) == NULL ) {
+               fprintf( stderr, "<SNDR> unable to initialise RMr\n" );
+               exit( 1 );
+       }
+
+    if( (rcv_fd = rmr_get_rcvfd( mrc )) >= 0 ) {                       // epoll only available from NNG -- skip receive later if not NNG
+               if( rcv_fd < 0 ) {
+                       fprintf( stderr, "<SNDR> unable to set up polling fd\n" );
+                       exit( 1 );
+               }
+               if( (ep_fd = epoll_create1( 0 )) < 0 ) {
+                       fprintf( stderr, "<SNDR> [FAIL] unable to create epoll fd: %d\n", errno );
+                       exit( 1 );
+               }
+       epe.events = EPOLLIN;
+       epe.data.fd = rcv_fd;
+
+       if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
+                       fprintf( stderr, "<SNDR> [FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
+                       exit( 1 );
+               }
+       } else {
+               rmr_set_rtimeout( mrc, 0 );                     // for nano we must set the receive timeout to 0; non-blocking receive
+       }
+
+       sbuf = rmr_alloc_msg( mrc, 512 );       // alloc first send buffer; subsequent buffers allcoated on send
+       //sbuf = rmr_tralloc_msg( mrc, 512, 11, "xxxxxxxxxx" ); // alloc first send buffer; subsequent buffers allcoated on send
+       rbuf = NULL;                                            // don't need to alloc receive buffer
+
+       timeout = time( NULL ) + 20;            // give rmr 20s to find the route table (shouldn't need that much)
+       while( ! rmr_ready( mrc ) ) {           // must have a route table before we can send; wait til RMr says it has one
+               fprintf( stderr, "<SNDR> waiting for rmr to show ready\n" );
+               sleep( 1 );
+
+               if( time( NULL ) > timeout ) {
+                       fprintf( stderr, "<SNDR> giving up\n" );
+                       exit( 1 );
+               }
+       }
+       fprintf( stderr, "<SNDR> rmr is ready; starting to send\n" );
+       
+       timeout = time( NULL ) + 20;
+
+    while( count < nmsgs ) {                                                           // we send 10 messages after the first message is successful
+               snprintf( trace, 100, "%lld", (long long) time( NULL ) );
+               rmr_set_trace( sbuf, trace, strlen( trace ) + 1 );
+               snprintf( wbuf, 200, "count=%d tr=%s %d stand up and cheer!", count, trace, rand() );
+               snprintf( sbuf->payload, 300, "%d %d|%s", sum( wbuf ), sum( trace ), wbuf );
+
+               sbuf->mtype = mtype;                                                    // fill in the message bits
+               sbuf->len =  strlen( sbuf->payload ) + 1;               // our receiver likely wants a nice acsii-z string
+               sbuf->state = 0;
+               sbuf = rmr_send_msg( mrc, sbuf );                               // send it (send returns an empty payload on success, or the original payload on fail/retry)
+
+               switch( sbuf->state ) {
+                       case RMR_ERR_RETRY:
+                               rt_count++;
+                               while( sbuf->state == RMR_ERR_RETRY ) {                 // soft failure (device busy?) retry
+                                       sbuf = rmr_send_msg( mrc, sbuf );                       // retry send until it's good (simple test; real programmes should do better)
+                               }
+                               successful = 1;
+                               break;
+
+                       case RMR_OK:
+                               successful = 1;
+                               break;
+
+                       default:
+                               // some error (not connected likely), don't count this
+                               break;
+               }
+
+               if( successful ) {                              // once we have a message that was sent, start to increase things
+                       count++;
+                       mtype++;
+                       if( mtype > 10 ) {                      // if large number of sends don't require infinite rt entries :)
+                               mtype = 1;
+                       }
+               }
+
+               if( rcv_fd >= 0 ) {
+                       while( (nready = epoll_wait( ep_fd, events, 1, 0 )) > 0 ) {                     // if something ready to receive (non-blocking check)
+                               if( events[0].data.fd == rcv_fd ) {                                             // we only are waiting on 1 thing, so [0] is ok
+                                       errno = 0;
+                                       rbuf = rmr_rcv_msg( mrc, rbuf );
+                                       if( rbuf ) {
+                                               rcvd_count++;
+                                       }
+                               }
+                       }
+               } else {                                // nano, we will only pick up one at a time.
+                       if(     (rbuf = rmr_rcv_msg( mrc, rbuf ) ) != NULL ) {
+                               if( rbuf->state == RMR_OK ) {
+                                       rcvd_count++;
+                               }
+                       }
+               }
+
+               if( time( NULL ) > timeout ) {                                          // should only happen if we never connect or nmsg > what we can send in 20sec
+                       fprintf( stderr, "sender timeout\n" );
+                       break;
+               }
+
+               if( delay > 0 ) {
+                       usleep( delay );
+               }
+    }
+
+       
+       timeout = time( NULL ) + 2;                             // allow 2 seconds for the pipe to drain from the receiver
+       while( time( NULL ) < timeout );
+               if( rcv_fd >= 0 ) {
+                       while( (nready = epoll_wait( ep_fd, events, 1, 100 )) > 0 ) {                   // if something ready to receive (non-blocking check)
+                               if( events[0].data.fd == rcv_fd ) {                                             // we only are waiting on 1 thing, so [0] is ok
+                                       errno = 0;
+                                       rbuf = rmr_rcv_msg( mrc, rbuf );
+                                       if( rbuf ) {
+                                               rcvd_count++;
+                                               timeout = time( NULL ) + 2;
+                                       }
+                               }
+                       }
+               } else {                                // nano, we will only pick up one at a time.
+                       if(     (rbuf = rmr_torcv_msg( mrc, rbuf, 100 ) ) != NULL ) {
+                               if( rbuf->state == RMR_OK ) {
+                                       rcvd_count++;
+                               }
+                       }
+               }
+
+       fprintf( stderr, "<SNDR> [%s] sent %d messages   received %d acks retries=%d\n", count == nmsgs ? "PASS" : "FAIL",  count, rcvd_count, rt_count );
+       rmr_close( mrc );
+
+       return !( count == nmsgs );
+}
+