Add example health check example programmes 76/676/1 1.1.1
authorE. Scott Daniels <daniels@research.att.com>
Fri, 9 Aug 2019 19:37:48 +0000 (15:37 -0400)
committerE. Scott Daniels <daniels@research.att.com>
Fri, 9 Aug 2019 19:37:48 +0000 (15:37 -0400)
The example programs health check, and msg echo illustrate
how a simple RMR based health check application might be
implemented.

Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: I4962db7f65c650c00b2a74e1e3c5e71a27cdedfc

examples/Makefile
examples/README
examples/health_check.c [new file with mode: 0644]
examples/msg_echo.c [new file with mode: 0644]

index d44ee37..2572282 100644 (file)
@@ -29,4 +29,9 @@ receiver: receiver.c
 sender: sender.c
        gcc $< -g -o $@ -lrmr_nng -lnng -lpthread -lm
 
+health_check: health_check.c
+       gcc $< -g -o $@ -lrmr_nng -lnng -lpthread -lm
+
+msg_echo: msg_echo.c
+       gcc $< -g -o $@ -lrmr_nng -lnng -lpthread -lm
 
index 4b63eee..7d523fd 100644 (file)
@@ -4,3 +4,28 @@ various aspects of how a user programme can use RMr to send
 and/or receive messages.  These programmes are fairly simple
 in nature, and in most cases error checking is not performed
 to keep the code simple.  
+
+
+Health Check
+       This is an example of how a health check process might
+       be implemented. It sends 1 or more messages to an RMR
+       application and waits for the response. The latency of
+       each round trip (mu-seconds) is written to the tty.
+       
+
+Message echoer
+       This is a simple process which returns the received message
+       back to the sender, optionally changing the message type
+       while leaving all other parts of the message unchanged.
+       This is a good verification for applications like the
+       health checker.
+
+Receiver
+       This is a simple receiver process which returns messages to
+       the sender when a specific type is sent.  Messages are checked
+       for accuracy when run with the example sender.
+
+Sender
+       A small sender which puts in information that lets the receiver
+       confirm that the message was received correctly (simple checksum
+       on portions of the payload and RMR header data.
diff --git a/examples/health_check.c b/examples/health_check.c
new file mode 100644 (file)
index 0000000..b363b6e
--- /dev/null
@@ -0,0 +1,320 @@
+// :vim ts=4 sw=4 noet:
+/*
+==================================================================================
+       Copyright (c) 2019 Nokia
+       Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+          http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+       Mnemonic:       health_check.c
+       Abstract:       This is a simple programme which sends a 'health check' message to
+                               an application and waits for a response. By default, the application
+                               is assumed to be running on the local host, and listening on 4560,
+                               but both host and port can be configured as needed. Connection is 
+                               made via a wormhole, so there is no need for a routing table.
+
+                               The application being checked is expected to recognise the health
+                               check message type, and to return the message using the RMR return
+                               to sender function after changing the message type to  "health response,"
+                               and leaving the remainder of the payload _unchanged_.  
+
+                               A timestamp is placed into the outbound payload, and the round trip
+                               latency is reported (the reason the pinged application should not modify
+                               the payload.
+
+
+                               Command line options and parameters:
+                                       [-h host:port]          target
+                                       [-n num-msgs]           total number to send
+                                       [-t seconds]            max timeout per message
+
+                               Route table:  While we don't need a route table to do wormhole sends we
+                               do need for RMR to initialise an empty one. To avoid having to have a
+                               dummy table on disk somewhere, we'll create one and "point" RMR at it.
+
+       Date:           9 August 2019
+       Author:         E. Scott Daniels
+*/
+
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/epoll.h>
+#include <time.h>
+#include <fcntl.h>
+
+#include <rmr/rmr.h>
+// include message types header
+
+#ifndef HEALTH_CHECK
+#define HEALTH_CHECK   100             // message types
+#define HEALTH_RESP            101
+#endif
+
+/*
+       Our message payload.
+*/
+typedef struct mpl {
+       char    msg[512];                               // message for human consumption
+       struct  timespec out_ts;                // time this payload was sent
+} mpl_t;
+
+// ---------------------------------------------------------------------------
+/*
+       Very simple checksum over a buffer.
+*/
+static int sum( unsigned char* buf, int len ) {
+       int sum = 0;
+       int     i = 0;
+       unsigned char*  last;
+
+       last = buf + len;
+       while( buf < last ) {
+               sum += *(buf++) + i++;
+       }
+
+       return sum % 255;
+}
+
+/*
+       Compute the elapsed time between ts1 and ts2.
+       Returns mu-seconds.
+*/
+static int elapsed( struct timespec* start_ts, struct timespec* end_ts ) {
+       long long start;
+       long long end;
+       int bin;
+
+       start = ( start_ts->tv_sec * 1000000000) + start_ts->tv_nsec;
+       end = ( end_ts->tv_sec * 1000000000) + end_ts->tv_nsec;
+
+       bin = (end - start) / 1000;                     // to mu-sec
+       //bin = (end - start);
+
+       return bin;
+}
+
+/*
+       See if my id string is in the buffer immediately after the first >.
+       Return 1 if so, 0 if not.
+*/
+static int vet_received( char* me, char* buf ) {
+       char*   ch;
+
+       if( (ch = strchr( buf, '>' )) == NULL ) {
+               return 0;
+       }
+
+       return strcmp( me, ch+1 ) == 0;
+}
+
+/*
+       Create an empty route table and set an environment var for RMR to find.
+       This must be called before initialising RMR.
+*/
+static void mk_rt( ) {
+       int     fd;
+       char    fnb[128];
+       char*   contents = "newrt|start\nnewrt|end\n";
+
+       snprintf( fnb, sizeof( fnb ), "/tmp/health_check.rt" );
+       fd = open( fnb, O_CREAT | O_WRONLY, 0664 );
+       if( fd < 0 ) {
+               fprintf( stderr, "[FAIL] could not create dummy route table: %s %s\n", fnb, strerror( errno ) );
+               return;
+       }
+
+       write( fd, contents, strlen( contents ) );
+       if( (close( fd ) < 0 ) ) {
+               fprintf( stderr, "[FAIL] couldn't close dummy route table: %s: %s\n", fnb, strerror( errno ) );
+               return;
+       }
+
+       setenv( "RMR_SEED_RT", fnb, 0 );                // set it, but don't overwrite it
+}
+
+int main( int argc, char** argv ) {
+       void* mrc;                                                      // msg router context
+       rmr_mbuf_t*             mbuf;                                   // message buffer
+       mpl_t*  payload;                                                // the payload in a message
+       int             ai = 1;                                                 // arg index
+       long    timeout;
+       long    max_timeout = 5;                                // -t to overrride
+       char*   target = "localhost:4560";              // address of target to ping
+       char*   listen_port;                                    // the port we open for "backhaul" connections (random)
+       char*   tok;                                                    // pointer at token in a buffer
+       int             i;
+       char    wbuf[1024];
+       char    me[128];                                                // who I am to vet rts was actually from me
+       int             rand_port = 0;                                  // -r sets and causes us to generate a random listen port
+       int             whid;                                                   // id of wormhole
+       int             num2send = 1;                                   // number of messages to send
+       int     ep_fd = -1;                                             // epoll's file des (given to epoll_wait)
+       int             rcv_fd;                                                 // file des that NNG tickles -- give this to epoll to listen on
+       int             nready;                                                 // number of events ready for receive
+       int             count = 0;
+       int             errors = 0;
+       int             cksum;                                                  // computed simple checksum
+       struct  timespec in_ts;                                 // time we got response
+       struct  epoll_event events[1];                  // list of events to give to epoll
+       struct  epoll_event epe;                                // event definition for event to listen to
+
+       // ---- simple arg parsing ------
+       while( ai < argc ) {
+               if( *argv[ai] == '-' ) {
+                       switch( argv[ai][1] ) {
+                               case 'h':                                       // host port
+                                       ai++;
+                                       target = strdup( argv[ai] );
+                                       break;
+
+                               case 'n':                                       // num to send
+                                       ai++;
+                                       num2send = atoi( argv[ai] );
+                                       break;
+
+                               case 'r':                                       // generate random listen port
+                                       rand_port = 1;
+                                       ;;
+
+                               case 't':                                       // timeout
+                                       ai++;
+                                       max_timeout = atoi( argv[ai] );
+                                       break;
+
+                               default:
+                                       fprintf( stderr, "[FAIL] unrecognised option: %s\n", argv[ai] );
+                                       exit( 1 );
+                       }
+
+                       ai++;
+               } else {
+                       break;          // not an option, leave with a1 @ first positional parm
+               }
+       }
+
+       if( rand_port ) {
+               srand( time( NULL ) );
+               snprintf( wbuf, sizeof( wbuf ), "%d", 43000 + (rand() % 1000) );                        // random listen port
+               listen_port = strdup( wbuf );
+       } else {
+               listen_port = "43086";
+       }
+
+
+       mk_rt();                                // create a dummy route table so we don't have errors/hang
+
+       fprintf( stderr, "[INFO] listen port: %s; sending %d messages\n", listen_port, num2send );
+
+       if( (mrc = rmr_init( listen_port, 1400, 0 )) == NULL ) {                // start without route table listener thread
+               fprintf( stderr, "[FAIL] unable to initialise RMr\n" );
+               exit( 1 );
+       }
+       fprintf( stderr, "[INFO] RMR initialised\n" );
+
+       if( (rcv_fd = rmr_get_rcvfd( mrc )) < 0 ) {                     // if we can't get an epoll FD, then we can't timeout; abort
+               fprintf( stderr, "[FAIL] unable to get an epoll FD\n" );
+               exit( 1 );
+       }
+
+       if( (ep_fd = epoll_create1( 0 )) < 0 ) {
+               fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno );
+               exit( 1 );
+       }
+       epe.events = EPOLLIN;
+       epe.data.fd = rcv_fd;
+
+       if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
+               fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
+               exit( 1 );
+       }
+
+       while( ! rmr_ready( mrc ) ) {
+               sleep( 1 );
+       }
+
+       mbuf = rmr_alloc_msg( mrc, sizeof( *payload ) + 100 );          // send buffer with a bit of padding
+
+       fprintf( stderr, "[INFO] starting session with %s, starting to send\n", target );
+       whid = rmr_wh_open( mrc, target );                                                              // open a wormhole directly to the target
+       if( whid < 0 ) {
+               fprintf( stderr, "[FAIL] unable to connect to %s\n", target );
+               exit( 2 );
+       }
+
+       fprintf( stderr, "[INFO] connected to %s, starting to send\n", target );
+       rmr_set_stimeout( mrc, 3 );                                                                     // we let rmr retry failures for up to 3 "rounds"
+
+       gethostname( wbuf, sizeof( wbuf ) );
+       snprintf( me, sizeof( me ), "%s-%d", wbuf, getpid( ) );
+
+       errors = 0;
+       while( count < num2send ) {                                                             // we send n messages after the first message is successful
+               if( !mbuf ) {
+                       fprintf( stderr, "[FAIL] mbuf is nil?\n" );
+                       exit( 1 );
+               }
+
+               payload = (mpl_t *) mbuf->payload;
+
+               snprintf( wbuf, sizeof( payload->msg ), "%s count=%d %d", me, count, rand() );
+               snprintf( mbuf->payload, 1024, "%d|%s", sum( wbuf , strlen( wbuf ) ), wbuf );
+
+               mbuf->mtype = HEALTH_CHECK;
+               mbuf->sub_id = -1;
+               mbuf->len =  sizeof( *payload );
+               mbuf->state = 0;
+
+               clock_gettime( CLOCK_REALTIME, &payload->out_ts );              // mark time out
+               mbuf = rmr_wh_send_msg( mrc, whid, mbuf );
+
+               if( mbuf->state == RMR_OK ) {                                                   // good send, wait for response
+                       nready = epoll_wait( ep_fd, events, 1, max_timeout * 1000 );
+                       if( nready > 0 ) {
+                               clock_gettime( CLOCK_REALTIME, &in_ts );                // mark response received time
+
+                               mbuf = rmr_rcv_msg( mrc, mbuf );
+                               payload = (mpl_t *) mbuf->payload;
+                               tok = strchr( payload->msg, '|' );                              // find end of chksum
+                               if( tok ) {
+                                       tok++;
+                                       cksum = sum( tok, strlen( tok ) );
+                                       if( cksum != atoi( payload->msg ) ) {
+                                               fprintf( stderr, "[WRN] response to msg %d received, cksum mismatch; expected %d, got %d\n", 
+                                                       count+1, atoi( payload->msg ), cksum );
+                                       } else {
+                                               fprintf( stderr, "[INFO] response to msg %d received, %d mu-sec\n",  count+1, elapsed( &payload->out_ts, &in_ts ) );
+                                       }
+                               }
+                       } else {
+                               fprintf( stderr, "[ERR] timeout waiting for response to message %d\n", count+1 );
+                               errors++;
+                       }
+               } else {
+                       fprintf( stderr, "[ERR] send failed: %d\n", mbuf->state );
+               }
+
+               count++;
+               sleep( 1 );
+       }
+
+       rmr_wh_close( mrc, whid );
+
+       return errors = 0;
+}
+
diff --git a/examples/msg_echo.c b/examples/msg_echo.c
new file mode 100644 (file)
index 0000000..11254b8
--- /dev/null
@@ -0,0 +1,156 @@
+// :vim ts=4 sw=4 noet:
+/*
+==================================================================================
+       Copyright (c) 2019 Nokia
+       Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+          http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+       Mnemonic:       msg_echo.c
+       Abstract:       This is a simple message receiver which will echo the received
+                               message back to the sender using an RMR return to sender call.
+                               All of the message will be left unchanged, though the message type
+                               may be changed by supplying it on the command line as the first 
+                               positional parameter.
+
+                               Because this process uses the rts call in RMR, it does not need
+                               a route table. However, RMR needs to have at least an empty table
+                               in order to work properly. To avoid having the user make a dummy
+                               table, we will create an empty one in /tmp and set the needed 
+                               environment var so the RMR initialisation process finds it.
+
+       Date:           9 August 2019
+       Author:         E. Scott Daniels
+*/
+
+#include <unistd.h>
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <time.h>
+#include <string.h>
+#include <fcntl.h>
+
+#include <rmr/rmr.h>
+
+/*
+       Create an empty route table and set an environment var for RMR to find.
+       This must be called before initialising RMR.
+*/
+static void mk_rt( ) {
+       int     fd;
+       char    fnb[128];
+       char*   contents = "newrt|start\nnewrt|end\n";
+
+       snprintf( fnb, sizeof( fnb ), "/tmp/msg_echo.rt" );
+       fd = open( fnb, O_CREAT | O_WRONLY, 0664 );
+       if( fd < 0 ) {
+               fprintf( stderr, "[FAIL] could not create dummy route table: %s %s\n", fnb, strerror( errno ) );
+               return;
+       }
+
+       write( fd, contents, strlen( contents ) );
+       if( (close( fd ) < 0 ) ) {
+               fprintf( stderr, "[FAIL] couldn't close dummy route table: %s: %s\n", fnb, strerror( errno ) );
+               return;
+       }
+
+       setenv( "RMR_SEED_RT", fnb, 0 );                // set it, but don't overwrite it
+}
+
+int main( int argc, char** argv ) {
+       void* mrc;                                              // msg router context
+       rmr_mbuf_t* msg = NULL;                         // message received
+       int i;
+       int             state;
+       int             errors = 0;
+       char*   listen_port = "4560";
+       long timeout = 0;
+       char*   data;                                           // pointer at env data we sussed out
+       char    wbuf[1024];                                     // we'll pull trace data into here, and use as general working buffer
+       char    sbuf[128];                                      // short buffer
+       int             mtype = -1;                                     // if set on command line, we'll add to msg before rts
+       int             ai = 1;                                         // argument index
+
+       data = getenv( "RMR_RTG_SVC" );
+       if( data == NULL ) {
+               setenv( "RMR_RTG_SVC", "19289", 1 );            // set one that won't collide with the sender if on same host
+       }
+
+       // ---- simple arg parsing ------
+       while( ai < argc ) {
+               if( *argv[ai] == '-' ) {
+                       switch( argv[ai][1] ) {
+                               case 'p':                                       // timeout
+                                       ai++;
+                                       listen_port = argv[ai];
+                                       break;
+
+                               case 't':                                       // rts message type
+                                       ai++;
+                                       mtype = atoi( argv[ai] );
+                                       break;
+
+                               default:
+                                       fprintf( stderr, "[FAIL] unrecognised option: %s\n", argv[ai] );
+                                       fprintf( stderr, "\nusage: %s [-p port] [-t msg-type]\n", argv[0] );
+                                       exit( 1 );
+                       }
+
+                       ai++;
+               } else {
+                       break;          // not an option, leave with a1 @ first positional parm
+               }
+       }
+
+       fprintf( stderr, "<ECHO> listening on port: %s will return messages with type: %d\n", listen_port, mtype );
+       
+       mk_rt();                                                        // make an empty rt
+
+       mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE );   // start your engines!
+       if( mrc == NULL ) {
+               fprintf( stderr, "<ECHO> ABORT:  unable to initialise RMr\n" );
+               exit( 1 );
+       }
+
+       timeout = time( NULL ) + 20;
+       while( ! rmr_ready( mrc ) ) {                                                           // wait for RMr to configure the route table
+               fprintf( stderr, "<ECHO> waiting for RMr to show ready\n" );
+               sleep( 1 );
+
+               if( time( NULL ) > timeout ) {
+                       fprintf( stderr, "<ECHO> giving up\n" );
+                       exit( 1 );
+               }
+       }
+       fprintf( stderr, "<ECHO> rmr now shows ready, listening begins\n" );
+
+       while( 1 ) {                                                    // listen until the cows come home, pigs fly...
+               msg = rmr_rcv_msg( mrc, msg );
+
+               if( msg && msg->state == RMR_OK ) {
+                       if( mtype >= 0 ) {
+                               msg->mtype = mtype;
+                               msg->sub_id = RMR_VOID_SUBID;
+                       }
+
+                       msg = rmr_rts_msg( mrc, msg );
+               }
+       }
+
+       return  0;              // unreachable, but some compilers swak if this isn't here.
+}
+