From e8a5b2c912d4be9cc93bc52ad7a460b57321c5fd Mon Sep 17 00:00:00 2001 From: "E. Scott Daniels" Date: Mon, 22 Apr 2019 17:04:10 +0000 Subject: [PATCH] test(app): Add test application base Added two simple test applications (a sender and receiver) with a driver script to test RMr message passing funcitons. These apps verify correctness of messages sent at either high or low message rates. Change-Id: I0b1b2c752e7c01b1b510cb44582532bc1f276901 Signed-off-by: E. Scott Daniels --- test/app_test/.gitignore | 3 + test/app_test/Makefile | 57 ++++++++++ test/app_test/README | 12 ++ test/app_test/receiver.c | 198 +++++++++++++++++++++++++++++++++ test/app_test/rt.mask | 21 ++++ test/app_test/run_app_test.ksh | 156 ++++++++++++++++++++++++++ test/app_test/sender.c | 245 +++++++++++++++++++++++++++++++++++++++++ 7 files changed, 692 insertions(+) create mode 100644 test/app_test/.gitignore create mode 100644 test/app_test/Makefile create mode 100644 test/app_test/README create mode 100644 test/app_test/receiver.c create mode 100644 test/app_test/rt.mask create mode 100644 test/app_test/run_app_test.ksh create mode 100644 test/app_test/sender.c diff --git a/test/app_test/.gitignore b/test/app_test/.gitignore new file mode 100644 index 0000000..f744f2d --- /dev/null +++ b/test/app_test/.gitignore @@ -0,0 +1,3 @@ +local.rt +*- +*.o diff --git a/test/app_test/Makefile b/test/app_test/Makefile new file mode 100644 index 0000000..cd6933e --- /dev/null +++ b/test/app_test/Makefile @@ -0,0 +1,57 @@ +#================================================================================== +# Copyright (c) 2019 Nokia +# Copyright (c) 2018-2019 AT&T Intellectual Property. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#================================================================================== +# + +# NOTE: this makefile assumes that RMr has been built using the directory .build +# at the top most repo directory (e.g. ../../.build). It can be changed +# if you need to by adding "build_path=" to the make command line. + +.EXPORT_ALL_VARIABLES: +.ONESHELL: +.SHELLFLAGS = -e +SHELL = /bin/ksh + +build_path ?= ../../.build +header_path := $(shell find ../../.build -name 'rmr.h' |head -1 | sed 's!/rmr/.*!!' ) + +C_INCLUDE_PATH := $(header_path) +LD_LIBRARY_PATH=$(build_path):$(build_path)/lib +LIBRARY_PATH = $(LD_LIBRARY_PATH) + +# These programmes are designed to test some basic application level functions +# from the perspective of two communicating processes. + + +.PHONY: all +all: sender receiver sender_nano receiver_nano + +receiver_nano: receiver.c + gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr -lnanomsg -lpthread -lm + +receiver: receiver.c + gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr_nng -lnng -lpthread -lm + +sender_nano: sender.c + gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr -lnanomsg -lpthread -lm + +sender: sender.c + gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr_nng -lnng -lpthread -lm + + +.PHONY: clean +clean: + rm -f sender sender_nano receiver receiver_nano *.o diff --git a/test/app_test/README b/test/app_test/README new file mode 100644 index 0000000..8db94a4 --- /dev/null +++ b/test/app_test/README @@ -0,0 +1,12 @@ + +This directory contains various sender/receiver applications that can be +driven together to test message exchange through the RMr library. +The run_app_test.ksh script should support building the library, building +a sender and receiver, then running a short test to verify. + +The basic sender and receiver +Sender and receiver perform lightweight check sums on the message body +and on trace data to verify that the messages are being reeived correctly. +The receiver acks message type 5, so the reciver should receive acks at +about the volume of 1/10th the number of messages sent. + diff --git a/test/app_test/receiver.c b/test/app_test/receiver.c new file mode 100644 index 0000000..ade69b3 --- /dev/null +++ b/test/app_test/receiver.c @@ -0,0 +1,198 @@ +// :vim ts=4 sw=4 noet: +/* +================================================================================== + Copyright (c) 2019 Nokia + Copyright (c) 2018-2019 AT&T Intellectual Property. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +================================================================================== +*/ + +/* + Mnemonic: rmr_rcvr.c + Abstract: This is a very simple receiver that does nothing but listen + for messages and write stats every so often to the tty. + + The receiver expects messages which have some trace information + and a message format of: + ck1 ck2| + + ck1 is a simple checksum of the message text (NOT including the + nil at the end of the string. + + ck2 is a simple checksum of the trace data which for the purposes + of testing is assumed to have a terminating nil to keep this simple. + + Good messages are messages where both computed checksums match + the ck1 and ck2 values. + + The receiver will send an 'ack' message back to the sender for + all type 5 messages received. + + The sender and receiver can be run on the same host/container + or on different hosts. The route table is the key to setting + things up properly. See the sender code for rt information. + + Define these environment variables to have some control: + RMR_SEED_RT -- path to the static routing table + RMR_RTG_SVC -- port to listen for RTG connections + + Date: 18 April 2019 + Author: E. Scott Daniels +*/ + +#include +#include +#include +#include +#include +#include + +#include + +static int sum( char* str ) { + int sum = 0; + int i = 0; + + while( *str ) { + sum += *(str++) + i++; + } + + return sum % 255; +} + +/* + Split the message at the first sep and return a pointer to the first + character after. +*/ +static char* split( char* str, char sep ) { + char* s; + + s = strchr( str, sep ); + + if( s ) { + return s+1; + } + + fprintf( stderr, " no pipe in message: (%s)\n", str ); + return NULL; +} + +int main( int argc, char** argv ) { + void* mrc; // msg router context + rmr_mbuf_t* msg = NULL; // message received + int i; + int state; + int errors = 0; + char* listen_port = "4560"; + long count = 0; // total received + long good = 0; // good palyload buffers + long bad = 0; // payload buffers which were not correct + long bad_tr = 0; // trace buffers that were not correct + long timeout = 0; + char* data; + char wbuf[1024]; // we'll pull trace data into here + int nmsgs = 10; // number of messages to stop after (argv[1] overrides) + int rt_count = 0; // retry count + long ack_count = 0; // number of acks sent + + data = getenv( "RMR_RTG_SVC" ); + if( data == NULL ) { + setenv( "RMR_RTG_SVC", "19289", 1 ); // set one that won't collide with the sender if on same host + } + + if( argc > 1 ) { + nmsgs = atoi( argv[1] ); + } + if( argc > 2 ) { + listen_port = argv[2]; + } + + fprintf( stderr, " listening on port: %s for a max of %d messages\n", listen_port, nmsgs ); + + mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE ); // start your engines! + if( mrc == NULL ) { + fprintf( stderr, " ABORT: unable to initialise RMr\n" ); + exit( 1 ); + } + + timeout = time( NULL ) + 20; + while( ! rmr_ready( mrc ) ) { // wait for RMr to load a route table + fprintf( stderr, " waiting for RMr to show ready\n" ); + sleep( 1 ); + + if( time( NULL ) > timeout ) { + fprintf( stderr, " giving up\n" ); + exit( 1 ); + } + } + fprintf( stderr, " rmr now shows ready, listening begins\n" ); + + timeout = time( NULL ) + 20; + while( count < nmsgs ) { + msg = rmr_torcv_msg( mrc, msg, 1000 ); // wait for about 1s so that if sender never starts we eventually escape + + if( msg ) { + if( msg->state == RMR_OK ) { + if( (data = split( msg->payload, '|' )) != NULL ) { + if( sum( data ) == atoi( (char *) msg->payload ) ) { + good++; + } else { + fprintf( stderr, " chk sum bad: computed=%d expected;%d (%s)\n", sum( data ), atoi( msg->payload ), data ); + bad++; + } + } + + if( (data = split( msg->payload, ' ' )) != NULL ) { // data will point to the chksum for the trace data + state = rmr_get_trace( msg, wbuf, 1024 ); // should only copy upto the trace size; we'll check that + if( state > 128 || state < 1 ) { + fprintf( stderr, "trace data size listed unexpectedly long: %d\n", state ); + } + if( sum( wbuf ) != atoi( data ) ) { + fprintf( stderr, " trace chk sum bad: computed=%d expected;%d len=%d (%s)\n", sum( wbuf ), atoi( data ), state, wbuf ); + bad_tr++; + } + } + count++; // messages received for stats output + + if( msg->mtype == 5 ) { // send an ack; sender will count but not process, so data in message is moot + msg = rmr_rts_msg( mrc, msg ); // we don't try to resend if this returns retry + rt_count = 1000; + while( rt_count > 0 && msg != NULL && msg->state == RMR_ERR_RETRY ) { // to work right in nano we need this :( + if( ack_count < 1 ) { // need to connect, so hard wait + sleep( 1 ); + } + rt_count--; + msg = rmr_rts_msg( mrc, msg ); // we don't try to resend if this returns retry + } + if( msg && msg->state == RMR_OK ) { // if it eventually worked + ack_count++; + } + } + } + } + + if( time( NULL ) > timeout ) { + fprintf( stderr, "receiver timed out\n" ); + errors++; + break; + } + } + + fprintf( stderr, " [%s] %ld messages; good=%ld acked=%ld bad=%ld bad-trace=%ld\n", !!(errors + bad + bad_tr) ? "FAIL" : "PASS", count, good, ack_count, bad, bad_tr ); + sleep( 2 ); // let any outbound acks flow before closing + + rmr_close( mrc ); + return !!(errors + bad + bad_tr); // bad rc if any are !0 +} + diff --git a/test/app_test/rt.mask b/test/app_test/rt.mask new file mode 100644 index 0000000..80d139f --- /dev/null +++ b/test/app_test/rt.mask @@ -0,0 +1,21 @@ + +# This is a 'mask' such that the run command can generate with the +# host name for the sender. + +newrt|start +rte|0|localhost:4560 +rte|1|localhost:4560 +rte|2|localhost:4560 +rte|3|localhost:4560 +rte|4|localhost:4560 +rte|5|localhost:4560 +rte|6|localhost:4560 +rte|7|localhost:4560 +rte|8|localhost:4560 +rte|9|localhost:4560 +rte|10|localhost:4560 +rte|11|localhost:4560 +rte|12|localhost:4560 +rte|13|localhost:4560 +rte|999|%%hostname%%:43086 +newrt|end diff --git a/test/app_test/run_app_test.ksh b/test/app_test/run_app_test.ksh new file mode 100644 index 0000000..7ad4934 --- /dev/null +++ b/test/app_test/run_app_test.ksh @@ -0,0 +1,156 @@ +#!/usr/bin/env ksh +# :vi ts=4 sw=4 noet : +#================================================================================== +# Copyright (c) 2019 Nokia +# Copyright (c) 2018-2019 AT&T Intellectual Property. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#================================================================================== +# + +# --------------------------------------------------------------------------------- +# Mnemonic: run_app_test.ksh +# Abstract: This is a simple script to set up and run the basic send/receive +# processes for some library validation on top of nano/nng. +# It should be possible to clone the repo, switch to this directory +# and execute 'ksh run -B' which will build RMr, make the sender and +# recevier then run the basic test. +# +# Example command line: +# ksh ./run # default 10 messages at 1 msg/sec +# ksh ./run -N # default but with nanomsg lib +# ksh ./run -d 100 -n 10000 # send 10k messages with 100ms delay between +# +# Date: 22 April 2019 +# Author: E. Scott Daniels +# --------------------------------------------------------------------------------- + + +# The sender and receiver are run asynch. Their exit statuses are captured in a +# file in order for the 'main' to pick them up easily. +# +function run_sender { + if (( $nano_sender )) + then + ./sender_nano $nmsg $delay + else + ./sender $nmsg $delay + fi + echo $? >/tmp/PID$$.src # must communicate state back via file b/c asynch +} + +function run_rcvr { + if (( $nano_receiver )) + then + ./receiver_nano $nmsg + else + ./receiver $nmsg + fi + echo $? >/tmp/PID$$.rrc +} + +# --------------------------------------------------------- + +if [[ ! -f local.rt ]] # we need the real host name in the local.rt; build one from mask if not there +then + hn=$(hostname) + sed "s!%%hostname%%!$hn!" rt.mask >local.rt +fi + +nmsg=10 # total number of messages to be exchanged (-n value changes) +delay=1000000 # microsec sleep between msg 1,000,000 == 1s +nano_sender=0 # start nano version if set (-N) +nano_receiver=0 +wait=1 +rebuild=0 + +while [[ $1 == -* ]] +do + case $1 in + -B) rebuild=1;; + -d) delay=$2; shift;; + -N) nano_sender=1 + nano_receiver=1 + ;; + -n) nmsg=$2; shift;; + + *) echo "unrecognised option: $1" + echo "usage: $0 [-B] [-d micor-sec-delay] [-N] [-n num-msgs]" + echo " -B forces a rebuild which will use .build" + exit 1 + ;; + esac + + shift +done + +if (( rebuild )) +then + build_path=../../.build + + ( + set -e + mkdir -p $build_path + cd ${build_path%/*} # cd barfs on ../../.build, so we do this + cd ${build_path##*/} + cmake .. + make package + ) + if (( $? != 0 )) + then + echo "build failed" + exit 1 + fi +else + build_path=${BUILD_PATH:-"../../.build"} # we prefer .build at the root level, but allow user option + + if [[ ! -d $build_path ]] + then + echo "cannot find build in: $build_path" + echo "either create, and then build RMr, or set BUILD_PATH as an evironment var before running this" + exit 1 + fi +fi + +export LD_LIBRARY_PATH=$build_path:$build_path/lib +export LIBRARY_PATH=$LD_LIBRARY_PATH +export RMR_SEED_RT=${RMR_SEED_RT:-./local.rt} # allow easy testing with different rt + + +if [[ ! -f ./sender ]] +then + if ! make >/dev/null 2>&1 + then + echo "[FAIL] cannot find sender binary, and cannot make it.... humm?" + exit 1 + fi +fi + +run_rcvr & +run_sender & + +wait +head -1 /tmp/PID$$.rrc | read rrc +head -1 /tmp/PID$$.src | read src + +if (( !! (src + rrc) )) +then + echo "[FAIL] sender rc=$src receiver rc=$rrc" +else + echo "[PASS] sender rc=$src receiver rc=$rrc" +fi + +rm /tmp/PID$$.* + +exit $(( !! (src + rrc) )) + diff --git a/test/app_test/sender.c b/test/app_test/sender.c new file mode 100644 index 0000000..89a0afc --- /dev/null +++ b/test/app_test/sender.c @@ -0,0 +1,245 @@ +// :vim ts=4 sw=4 noet: +/* +================================================================================== + Copyright (c) 2019 Nokia + Copyright (c) 2018-2019 AT&T Intellectual Property. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +================================================================================== +*/ + +/* + Mnemonic: sender.c + Abstract: This is a simple sender which will send a series of messages. + It is expected that the first attempt(s) will fail if the receiver + is not up and this does not start decrementing the number to + send until it has a good send. + + The process will check the receive queue and list received messages + but pass/fail is not dependent on what comes back. + + If the receiver(s) do not become connectable in 20 sec this process + will give up and fail. + + + Message types will vary between 1 and 10, so the route table must + be set up to support those message types. + + Message format is: + ck1 ck2| + + Ck1 is the simple check sum of the msg-text (NOT includeing ) + Ck2 is the simple check sum of the trace data which is a nil terminated + series of bytes. + + Parms: argv[1] == nmsgs; argv[2] == delay; argv[3] == listen port + + Sender will send for at most 20 seconds, so if nmsgs and delay extend + beyond that period the total number of messages sent will be less + than n. + + Date: 18 April 2019 + Author: E. Scott Daniels +*/ + +#include +#include +#include +#include +#include +#include +#include + +#include + +static int sum( char* str ) { + int sum = 0; + int i = 0; + + while( *str ) { + sum += *(str++) + i++; + } + + return sum % 255; +} + +int main( int argc, char** argv ) { + void* mrc; // msg router context + struct epoll_event events[1]; // list of events to give to epoll + struct epoll_event epe; // event definition for event to listen to + int ep_fd = -1; // epoll's file des (given to epoll_wait) + int rcv_fd; // file des that NNG tickles -- give this to epoll to listen on + int nready; // number of events ready for receive + rmr_mbuf_t* sbuf; // send buffer + rmr_mbuf_t* rbuf; // received buffer + int count = 0; + int rt_count = 0; // number of messages requiring a spin retry + int rcvd_count = 0; + char* listen_port = "43086"; + int mtype = 0; + int stats_freq = 100; + int successful = 0; // set to true after we have a successful send + char wbuf[1024]; + char trace[1024]; + long timeout = 0; + int delay = 100000; // usec between send attempts + int nmsgs = 10; // number of messages to send + + if( argc > 1 ) { + nmsgs = atoi( argv[1] ); + } + if( argc > 2 ) { + delay = atoi( argv[2] ); + } + if( argc > 3 ) { + listen_port = argv[3]; + } + + fprintf( stderr, " listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay ); + + if( (mrc = rmr_init( listen_port, 1400, RMRFL_NONE )) == NULL ) { + fprintf( stderr, " unable to initialise RMr\n" ); + exit( 1 ); + } + + if( (rcv_fd = rmr_get_rcvfd( mrc )) >= 0 ) { // epoll only available from NNG -- skip receive later if not NNG + if( rcv_fd < 0 ) { + fprintf( stderr, " unable to set up polling fd\n" ); + exit( 1 ); + } + if( (ep_fd = epoll_create1( 0 )) < 0 ) { + fprintf( stderr, " [FAIL] unable to create epoll fd: %d\n", errno ); + exit( 1 ); + } + epe.events = EPOLLIN; + epe.data.fd = rcv_fd; + + if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 ) { + fprintf( stderr, " [FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) ); + exit( 1 ); + } + } else { + rmr_set_rtimeout( mrc, 0 ); // for nano we must set the receive timeout to 0; non-blocking receive + } + + sbuf = rmr_alloc_msg( mrc, 512 ); // alloc first send buffer; subsequent buffers allcoated on send + //sbuf = rmr_tralloc_msg( mrc, 512, 11, "xxxxxxxxxx" ); // alloc first send buffer; subsequent buffers allcoated on send + rbuf = NULL; // don't need to alloc receive buffer + + timeout = time( NULL ) + 20; // give rmr 20s to find the route table (shouldn't need that much) + while( ! rmr_ready( mrc ) ) { // must have a route table before we can send; wait til RMr says it has one + fprintf( stderr, " waiting for rmr to show ready\n" ); + sleep( 1 ); + + if( time( NULL ) > timeout ) { + fprintf( stderr, " giving up\n" ); + exit( 1 ); + } + } + fprintf( stderr, " rmr is ready; starting to send\n" ); + + timeout = time( NULL ) + 20; + + while( count < nmsgs ) { // we send 10 messages after the first message is successful + snprintf( trace, 100, "%lld", (long long) time( NULL ) ); + rmr_set_trace( sbuf, trace, strlen( trace ) + 1 ); + snprintf( wbuf, 200, "count=%d tr=%s %d stand up and cheer!", count, trace, rand() ); + snprintf( sbuf->payload, 300, "%d %d|%s", sum( wbuf ), sum( trace ), wbuf ); + + sbuf->mtype = mtype; // fill in the message bits + sbuf->len = strlen( sbuf->payload ) + 1; // our receiver likely wants a nice acsii-z string + sbuf->state = 0; + sbuf = rmr_send_msg( mrc, sbuf ); // send it (send returns an empty payload on success, or the original payload on fail/retry) + + switch( sbuf->state ) { + case RMR_ERR_RETRY: + rt_count++; + while( sbuf->state == RMR_ERR_RETRY ) { // soft failure (device busy?) retry + sbuf = rmr_send_msg( mrc, sbuf ); // retry send until it's good (simple test; real programmes should do better) + } + successful = 1; + break; + + case RMR_OK: + successful = 1; + break; + + default: + // some error (not connected likely), don't count this + break; + } + + if( successful ) { // once we have a message that was sent, start to increase things + count++; + mtype++; + if( mtype > 10 ) { // if large number of sends don't require infinite rt entries :) + mtype = 1; + } + } + + if( rcv_fd >= 0 ) { + while( (nready = epoll_wait( ep_fd, events, 1, 0 )) > 0 ) { // if something ready to receive (non-blocking check) + if( events[0].data.fd == rcv_fd ) { // we only are waiting on 1 thing, so [0] is ok + errno = 0; + rbuf = rmr_rcv_msg( mrc, rbuf ); + if( rbuf ) { + rcvd_count++; + } + } + } + } else { // nano, we will only pick up one at a time. + if( (rbuf = rmr_rcv_msg( mrc, rbuf ) ) != NULL ) { + if( rbuf->state == RMR_OK ) { + rcvd_count++; + } + } + } + + if( time( NULL ) > timeout ) { // should only happen if we never connect or nmsg > what we can send in 20sec + fprintf( stderr, "sender timeout\n" ); + break; + } + + if( delay > 0 ) { + usleep( delay ); + } + } + + + timeout = time( NULL ) + 2; // allow 2 seconds for the pipe to drain from the receiver + while( time( NULL ) < timeout ); + if( rcv_fd >= 0 ) { + while( (nready = epoll_wait( ep_fd, events, 1, 100 )) > 0 ) { // if something ready to receive (non-blocking check) + if( events[0].data.fd == rcv_fd ) { // we only are waiting on 1 thing, so [0] is ok + errno = 0; + rbuf = rmr_rcv_msg( mrc, rbuf ); + if( rbuf ) { + rcvd_count++; + timeout = time( NULL ) + 2; + } + } + } + } else { // nano, we will only pick up one at a time. + if( (rbuf = rmr_torcv_msg( mrc, rbuf, 100 ) ) != NULL ) { + if( rbuf->state == RMR_OK ) { + rcvd_count++; + } + } + } + + fprintf( stderr, " [%s] sent %d messages received %d acks retries=%d\n", count == nmsgs ? "PASS" : "FAIL", count, rcvd_count, rt_count ); + rmr_close( mrc ); + + return !( count == nmsgs ); +} + -- 2.16.6