From 8761b5d1ab23c663814b8f83191e4a32a20e1429 Mon Sep 17 00:00:00 2001 From: "E. Scott Daniels" Date: Fri, 9 Aug 2019 15:37:48 -0400 Subject: [PATCH] Add example health check example programmes The example programs health check, and msg echo illustrate how a simple RMR based health check application might be implemented. Signed-off-by: E. Scott Daniels Change-Id: I4962db7f65c650c00b2a74e1e3c5e71a27cdedfc --- examples/Makefile | 5 + examples/README | 25 ++++ examples/health_check.c | 320 ++++++++++++++++++++++++++++++++++++++++++++++++ examples/msg_echo.c | 156 +++++++++++++++++++++++ 4 files changed, 506 insertions(+) create mode 100644 examples/health_check.c create mode 100644 examples/msg_echo.c diff --git a/examples/Makefile b/examples/Makefile index d44ee37..2572282 100644 --- a/examples/Makefile +++ b/examples/Makefile @@ -29,4 +29,9 @@ receiver: receiver.c sender: sender.c gcc $< -g -o $@ -lrmr_nng -lnng -lpthread -lm +health_check: health_check.c + gcc $< -g -o $@ -lrmr_nng -lnng -lpthread -lm + +msg_echo: msg_echo.c + gcc $< -g -o $@ -lrmr_nng -lnng -lpthread -lm diff --git a/examples/README b/examples/README index 4b63eee..7d523fd 100644 --- a/examples/README +++ b/examples/README @@ -4,3 +4,28 @@ various aspects of how a user programme can use RMr to send and/or receive messages. These programmes are fairly simple in nature, and in most cases error checking is not performed to keep the code simple. + + +Health Check + This is an example of how a health check process might + be implemented. It sends 1 or more messages to an RMR + application and waits for the response. The latency of + each round trip (mu-seconds) is written to the tty. + + +Message echoer + This is a simple process which returns the received message + back to the sender, optionally changing the message type + while leaving all other parts of the message unchanged. + This is a good verification for applications like the + health checker. + +Receiver + This is a simple receiver process which returns messages to + the sender when a specific type is sent. Messages are checked + for accuracy when run with the example sender. + +Sender + A small sender which puts in information that lets the receiver + confirm that the message was received correctly (simple checksum + on portions of the payload and RMR header data. diff --git a/examples/health_check.c b/examples/health_check.c new file mode 100644 index 0000000..b363b6e --- /dev/null +++ b/examples/health_check.c @@ -0,0 +1,320 @@ +// :vim ts=4 sw=4 noet: +/* +================================================================================== + Copyright (c) 2019 Nokia + Copyright (c) 2018-2019 AT&T Intellectual Property. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +================================================================================== +*/ + +/* + Mnemonic: health_check.c + Abstract: This is a simple programme which sends a 'health check' message to + an application and waits for a response. By default, the application + is assumed to be running on the local host, and listening on 4560, + but both host and port can be configured as needed. Connection is + made via a wormhole, so there is no need for a routing table. + + The application being checked is expected to recognise the health + check message type, and to return the message using the RMR return + to sender function after changing the message type to "health response," + and leaving the remainder of the payload _unchanged_. + + A timestamp is placed into the outbound payload, and the round trip + latency is reported (the reason the pinged application should not modify + the payload. + + + Command line options and parameters: + [-h host:port] target + [-n num-msgs] total number to send + [-t seconds] max timeout per message + + Route table: While we don't need a route table to do wormhole sends we + do need for RMR to initialise an empty one. To avoid having to have a + dummy table on disk somewhere, we'll create one and "point" RMR at it. + + Date: 9 August 2019 + Author: E. Scott Daniels +*/ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +// include message types header + +#ifndef HEALTH_CHECK +#define HEALTH_CHECK 100 // message types +#define HEALTH_RESP 101 +#endif + +/* + Our message payload. +*/ +typedef struct mpl { + char msg[512]; // message for human consumption + struct timespec out_ts; // time this payload was sent +} mpl_t; + +// --------------------------------------------------------------------------- +/* + Very simple checksum over a buffer. +*/ +static int sum( unsigned char* buf, int len ) { + int sum = 0; + int i = 0; + unsigned char* last; + + last = buf + len; + while( buf < last ) { + sum += *(buf++) + i++; + } + + return sum % 255; +} + +/* + Compute the elapsed time between ts1 and ts2. + Returns mu-seconds. +*/ +static int elapsed( struct timespec* start_ts, struct timespec* end_ts ) { + long long start; + long long end; + int bin; + + start = ( start_ts->tv_sec * 1000000000) + start_ts->tv_nsec; + end = ( end_ts->tv_sec * 1000000000) + end_ts->tv_nsec; + + bin = (end - start) / 1000; // to mu-sec + //bin = (end - start); + + return bin; +} + +/* + See if my id string is in the buffer immediately after the first >. + Return 1 if so, 0 if not. +*/ +static int vet_received( char* me, char* buf ) { + char* ch; + + if( (ch = strchr( buf, '>' )) == NULL ) { + return 0; + } + + return strcmp( me, ch+1 ) == 0; +} + +/* + Create an empty route table and set an environment var for RMR to find. + This must be called before initialising RMR. +*/ +static void mk_rt( ) { + int fd; + char fnb[128]; + char* contents = "newrt|start\nnewrt|end\n"; + + snprintf( fnb, sizeof( fnb ), "/tmp/health_check.rt" ); + fd = open( fnb, O_CREAT | O_WRONLY, 0664 ); + if( fd < 0 ) { + fprintf( stderr, "[FAIL] could not create dummy route table: %s %s\n", fnb, strerror( errno ) ); + return; + } + + write( fd, contents, strlen( contents ) ); + if( (close( fd ) < 0 ) ) { + fprintf( stderr, "[FAIL] couldn't close dummy route table: %s: %s\n", fnb, strerror( errno ) ); + return; + } + + setenv( "RMR_SEED_RT", fnb, 0 ); // set it, but don't overwrite it +} + +int main( int argc, char** argv ) { + void* mrc; // msg router context + rmr_mbuf_t* mbuf; // message buffer + mpl_t* payload; // the payload in a message + int ai = 1; // arg index + long timeout; + long max_timeout = 5; // -t to overrride + char* target = "localhost:4560"; // address of target to ping + char* listen_port; // the port we open for "backhaul" connections (random) + char* tok; // pointer at token in a buffer + int i; + char wbuf[1024]; + char me[128]; // who I am to vet rts was actually from me + int rand_port = 0; // -r sets and causes us to generate a random listen port + int whid; // id of wormhole + int num2send = 1; // number of messages to send + int ep_fd = -1; // epoll's file des (given to epoll_wait) + int rcv_fd; // file des that NNG tickles -- give this to epoll to listen on + int nready; // number of events ready for receive + int count = 0; + int errors = 0; + int cksum; // computed simple checksum + struct timespec in_ts; // time we got response + struct epoll_event events[1]; // list of events to give to epoll + struct epoll_event epe; // event definition for event to listen to + + // ---- simple arg parsing ------ + while( ai < argc ) { + if( *argv[ai] == '-' ) { + switch( argv[ai][1] ) { + case 'h': // host port + ai++; + target = strdup( argv[ai] ); + break; + + case 'n': // num to send + ai++; + num2send = atoi( argv[ai] ); + break; + + case 'r': // generate random listen port + rand_port = 1; + ;; + + case 't': // timeout + ai++; + max_timeout = atoi( argv[ai] ); + break; + + default: + fprintf( stderr, "[FAIL] unrecognised option: %s\n", argv[ai] ); + exit( 1 ); + } + + ai++; + } else { + break; // not an option, leave with a1 @ first positional parm + } + } + + if( rand_port ) { + srand( time( NULL ) ); + snprintf( wbuf, sizeof( wbuf ), "%d", 43000 + (rand() % 1000) ); // random listen port + listen_port = strdup( wbuf ); + } else { + listen_port = "43086"; + } + + + mk_rt(); // create a dummy route table so we don't have errors/hang + + fprintf( stderr, "[INFO] listen port: %s; sending %d messages\n", listen_port, num2send ); + + if( (mrc = rmr_init( listen_port, 1400, 0 )) == NULL ) { // start without route table listener thread + fprintf( stderr, "[FAIL] unable to initialise RMr\n" ); + exit( 1 ); + } + fprintf( stderr, "[INFO] RMR initialised\n" ); + + if( (rcv_fd = rmr_get_rcvfd( mrc )) < 0 ) { // if we can't get an epoll FD, then we can't timeout; abort + fprintf( stderr, "[FAIL] unable to get an epoll FD\n" ); + exit( 1 ); + } + + if( (ep_fd = epoll_create1( 0 )) < 0 ) { + fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno ); + exit( 1 ); + } + epe.events = EPOLLIN; + epe.data.fd = rcv_fd; + + if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 ) { + fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) ); + exit( 1 ); + } + + while( ! rmr_ready( mrc ) ) { + sleep( 1 ); + } + + mbuf = rmr_alloc_msg( mrc, sizeof( *payload ) + 100 ); // send buffer with a bit of padding + + fprintf( stderr, "[INFO] starting session with %s, starting to send\n", target ); + whid = rmr_wh_open( mrc, target ); // open a wormhole directly to the target + if( whid < 0 ) { + fprintf( stderr, "[FAIL] unable to connect to %s\n", target ); + exit( 2 ); + } + + fprintf( stderr, "[INFO] connected to %s, starting to send\n", target ); + rmr_set_stimeout( mrc, 3 ); // we let rmr retry failures for up to 3 "rounds" + + gethostname( wbuf, sizeof( wbuf ) ); + snprintf( me, sizeof( me ), "%s-%d", wbuf, getpid( ) ); + + errors = 0; + while( count < num2send ) { // we send n messages after the first message is successful + if( !mbuf ) { + fprintf( stderr, "[FAIL] mbuf is nil?\n" ); + exit( 1 ); + } + + payload = (mpl_t *) mbuf->payload; + + snprintf( wbuf, sizeof( payload->msg ), "%s count=%d %d", me, count, rand() ); + snprintf( mbuf->payload, 1024, "%d|%s", sum( wbuf , strlen( wbuf ) ), wbuf ); + + mbuf->mtype = HEALTH_CHECK; + mbuf->sub_id = -1; + mbuf->len = sizeof( *payload ); + mbuf->state = 0; + + clock_gettime( CLOCK_REALTIME, &payload->out_ts ); // mark time out + mbuf = rmr_wh_send_msg( mrc, whid, mbuf ); + + if( mbuf->state == RMR_OK ) { // good send, wait for response + nready = epoll_wait( ep_fd, events, 1, max_timeout * 1000 ); + if( nready > 0 ) { + clock_gettime( CLOCK_REALTIME, &in_ts ); // mark response received time + + mbuf = rmr_rcv_msg( mrc, mbuf ); + payload = (mpl_t *) mbuf->payload; + tok = strchr( payload->msg, '|' ); // find end of chksum + if( tok ) { + tok++; + cksum = sum( tok, strlen( tok ) ); + if( cksum != atoi( payload->msg ) ) { + fprintf( stderr, "[WRN] response to msg %d received, cksum mismatch; expected %d, got %d\n", + count+1, atoi( payload->msg ), cksum ); + } else { + fprintf( stderr, "[INFO] response to msg %d received, %d mu-sec\n", count+1, elapsed( &payload->out_ts, &in_ts ) ); + } + } + } else { + fprintf( stderr, "[ERR] timeout waiting for response to message %d\n", count+1 ); + errors++; + } + } else { + fprintf( stderr, "[ERR] send failed: %d\n", mbuf->state ); + } + + count++; + sleep( 1 ); + } + + rmr_wh_close( mrc, whid ); + + return errors = 0; +} + diff --git a/examples/msg_echo.c b/examples/msg_echo.c new file mode 100644 index 0000000..11254b8 --- /dev/null +++ b/examples/msg_echo.c @@ -0,0 +1,156 @@ +// :vim ts=4 sw=4 noet: +/* +================================================================================== + Copyright (c) 2019 Nokia + Copyright (c) 2018-2019 AT&T Intellectual Property. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +================================================================================== +*/ + +/* + Mnemonic: msg_echo.c + Abstract: This is a simple message receiver which will echo the received + message back to the sender using an RMR return to sender call. + All of the message will be left unchanged, though the message type + may be changed by supplying it on the command line as the first + positional parameter. + + Because this process uses the rts call in RMR, it does not need + a route table. However, RMR needs to have at least an empty table + in order to work properly. To avoid having the user make a dummy + table, we will create an empty one in /tmp and set the needed + environment var so the RMR initialisation process finds it. + + Date: 9 August 2019 + Author: E. Scott Daniels +*/ + +#include +#include +#include +#include +#include +#include +#include + +#include + +/* + Create an empty route table and set an environment var for RMR to find. + This must be called before initialising RMR. +*/ +static void mk_rt( ) { + int fd; + char fnb[128]; + char* contents = "newrt|start\nnewrt|end\n"; + + snprintf( fnb, sizeof( fnb ), "/tmp/msg_echo.rt" ); + fd = open( fnb, O_CREAT | O_WRONLY, 0664 ); + if( fd < 0 ) { + fprintf( stderr, "[FAIL] could not create dummy route table: %s %s\n", fnb, strerror( errno ) ); + return; + } + + write( fd, contents, strlen( contents ) ); + if( (close( fd ) < 0 ) ) { + fprintf( stderr, "[FAIL] couldn't close dummy route table: %s: %s\n", fnb, strerror( errno ) ); + return; + } + + setenv( "RMR_SEED_RT", fnb, 0 ); // set it, but don't overwrite it +} + +int main( int argc, char** argv ) { + void* mrc; // msg router context + rmr_mbuf_t* msg = NULL; // message received + int i; + int state; + int errors = 0; + char* listen_port = "4560"; + long timeout = 0; + char* data; // pointer at env data we sussed out + char wbuf[1024]; // we'll pull trace data into here, and use as general working buffer + char sbuf[128]; // short buffer + int mtype = -1; // if set on command line, we'll add to msg before rts + int ai = 1; // argument index + + data = getenv( "RMR_RTG_SVC" ); + if( data == NULL ) { + setenv( "RMR_RTG_SVC", "19289", 1 ); // set one that won't collide with the sender if on same host + } + + // ---- simple arg parsing ------ + while( ai < argc ) { + if( *argv[ai] == '-' ) { + switch( argv[ai][1] ) { + case 'p': // timeout + ai++; + listen_port = argv[ai]; + break; + + case 't': // rts message type + ai++; + mtype = atoi( argv[ai] ); + break; + + default: + fprintf( stderr, "[FAIL] unrecognised option: %s\n", argv[ai] ); + fprintf( stderr, "\nusage: %s [-p port] [-t msg-type]\n", argv[0] ); + exit( 1 ); + } + + ai++; + } else { + break; // not an option, leave with a1 @ first positional parm + } + } + + fprintf( stderr, " listening on port: %s will return messages with type: %d\n", listen_port, mtype ); + + mk_rt(); // make an empty rt + + mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE ); // start your engines! + if( mrc == NULL ) { + fprintf( stderr, " ABORT: unable to initialise RMr\n" ); + exit( 1 ); + } + + timeout = time( NULL ) + 20; + while( ! rmr_ready( mrc ) ) { // wait for RMr to configure the route table + fprintf( stderr, " waiting for RMr to show ready\n" ); + sleep( 1 ); + + if( time( NULL ) > timeout ) { + fprintf( stderr, " giving up\n" ); + exit( 1 ); + } + } + fprintf( stderr, " rmr now shows ready, listening begins\n" ); + + while( 1 ) { // listen until the cows come home, pigs fly... + msg = rmr_rcv_msg( mrc, msg ); + + if( msg && msg->state == RMR_OK ) { + if( mtype >= 0 ) { + msg->mtype = mtype; + msg->sub_id = RMR_VOID_SUBID; + } + + msg = rmr_rts_msg( mrc, msg ); + } + } + + return 0; // unreachable, but some compilers swak if this isn't here. +} + -- 2.16.6