Fix block of dynamic route table load
[ric-plt/lib/rmr.git] / examples / health_check.c
1 // :vim ts=4 sw=4 noet:
2 /*
3 ==================================================================================
4         Copyright (c) 2019 Nokia
5         Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       health_check.c
23         Abstract:       This is a simple programme which sends a 'health check' message to
24                                 an application and waits for a response. By default, the application
25                                 is assumed to be running on the local host, and listening on 4560,
26                                 but both host and port can be configured as needed. Connection is 
27                                 made via a wormhole, so there is no need for a routing table.
28
29                                 The application being checked is expected to recognise the health
30                                 check message type, and to return the message using the RMR return
31                                 to sender function after changing the message type to  "health response,"
32                                 and leaving the remainder of the payload _unchanged_.  
33
34                                 A timestamp is placed into the outbound payload, and the round trip
35                                 latency is reported (the reason the pinged application should not modify
36                                 the payload.
37
38
39                                 Command line options and parameters:
40                                         [-h host:port]          target
41                                         [-n num-msgs]           total number to send
42                                         [-t seconds]            max timeout per message
43
44                                 Route table:  While we don't need a route table to do wormhole sends we
45                                 do need for RMR to initialise an empty one. To avoid having to have a
46                                 dummy table on disk somewhere, we'll create one and "point" RMR at it.
47
48         Date:           9 August 2019
49         Author:         E. Scott Daniels
50 */
51
52 #include <unistd.h>
53 #include <errno.h>
54 #include <string.h>
55 #include <stdio.h>
56 #include <stdlib.h>
57 #include <sys/epoll.h>
58 #include <time.h>
59 #include <fcntl.h>
60
61 #include <rmr/rmr.h>
62 // include message types header
63
64 #ifndef HEALTH_CHECK
65 #define HEALTH_CHECK    100             // message types
66 #define HEALTH_RESP             101
67 #endif
68
69 /*
70         Our message payload.
71 */
72 typedef struct mpl {
73         char    msg[512];                               // message for human consumption
74         struct  timespec out_ts;                // time this payload was sent
75 } mpl_t;
76
77 // ---------------------------------------------------------------------------
78 /*
79         Very simple checksum over a buffer.
80 */
81 static int sum( unsigned char* buf, int len ) {
82         int sum = 0;
83         int     i = 0;
84         unsigned char*  last;
85
86         last = buf + len;
87         while( buf < last ) {
88                 sum += *(buf++) + i++;
89         }
90
91         return sum % 255;
92 }
93
94 /*
95         Compute the elapsed time between ts1 and ts2.
96         Returns mu-seconds.
97 */
98 static int elapsed( struct timespec* start_ts, struct timespec* end_ts ) {
99         long long start;
100         long long end;
101         int bin;
102
103         start = ( start_ts->tv_sec * 1000000000) + start_ts->tv_nsec;
104         end = ( end_ts->tv_sec * 1000000000) + end_ts->tv_nsec;
105
106         bin = (end - start) / 1000;                     // to mu-sec
107         //bin = (end - start);
108
109         return bin;
110 }
111
112 /*
113         See if my id string is in the buffer immediately after the first >.
114         Return 1 if so, 0 if not.
115 */
116 static int vet_received( char* me, char* buf ) {
117         char*   ch;
118
119         if( (ch = strchr( buf, '>' )) == NULL ) {
120                 return 0;
121         }
122
123         return strcmp( me, ch+1 ) == 0;
124 }
125
126 /*
127         Create an empty route table and set an environment var for RMR to find.
128         This must be called before initialising RMR.
129 */
130 static void mk_rt( ) {
131         int     fd;
132         char    fnb[128];
133         char*   contents = "newrt|start\nnewrt|end\n";
134
135         snprintf( fnb, sizeof( fnb ), "/tmp/health_check.rt" );
136         fd = open( fnb, O_CREAT | O_WRONLY, 0664 );
137         if( fd < 0 ) {
138                 fprintf( stderr, "[FAIL] could not create dummy route table: %s %s\n", fnb, strerror( errno ) );
139                 return;
140         }
141
142         write( fd, contents, strlen( contents ) );
143         if( (close( fd ) < 0 ) ) {
144                 fprintf( stderr, "[FAIL] couldn't close dummy route table: %s: %s\n", fnb, strerror( errno ) );
145                 return;
146         }
147
148         setenv( "RMR_SEED_RT", fnb, 0 );                // set it, but don't overwrite it
149 }
150
151 int main( int argc, char** argv ) {
152         void* mrc;                                                      // msg router context
153         rmr_mbuf_t*             mbuf;                                   // message buffer
154         mpl_t*  payload;                                                // the payload in a message
155         int             ai = 1;                                                 // arg index
156         long    timeout;
157         long    max_timeout = 5;                                // -t to overrride
158         char*   target = "localhost:4560";              // address of target to ping
159         char*   listen_port;                                    // the port we open for "backhaul" connections (random)
160         char*   tok;                                                    // pointer at token in a buffer
161         int             i;
162         char    wbuf[1024];
163         char    me[128];                                                // who I am to vet rts was actually from me
164         int             rand_port = 0;                                  // -r sets and causes us to generate a random listen port
165         int             whid;                                                   // id of wormhole
166         int             num2send = 1;                                   // number of messages to send
167         int     ep_fd = -1;                                             // epoll's file des (given to epoll_wait)
168         int             rcv_fd;                                                 // file des that NNG tickles -- give this to epoll to listen on
169         int             nready;                                                 // number of events ready for receive
170         int             count = 0;
171         int             errors = 0;
172         int             cksum;                                                  // computed simple checksum
173         struct  timespec in_ts;                                 // time we got response
174         struct  epoll_event events[1];                  // list of events to give to epoll
175         struct  epoll_event epe;                                // event definition for event to listen to
176
177         // ---- simple arg parsing ------
178         while( ai < argc ) {
179                 if( *argv[ai] == '-' ) {
180                         switch( argv[ai][1] ) {
181                                 case 'h':                                       // host port
182                                         ai++;
183                                         target = strdup( argv[ai] );
184                                         break;
185
186                                 case 'n':                                       // num to send
187                                         ai++;
188                                         num2send = atoi( argv[ai] );
189                                         break;
190
191                                 case 'r':                                       // generate random listen port
192                                         rand_port = 1;
193                                         ;;
194
195                                 case 't':                                       // timeout
196                                         ai++;
197                                         max_timeout = atoi( argv[ai] );
198                                         break;
199
200                                 default:
201                                         fprintf( stderr, "[FAIL] unrecognised option: %s\n", argv[ai] );
202                                         exit( 1 );
203                         }
204
205                         ai++;
206                 } else {
207                         break;          // not an option, leave with a1 @ first positional parm
208                 }
209         }
210
211         if( rand_port ) {
212                 srand( time( NULL ) );
213                 snprintf( wbuf, sizeof( wbuf ), "%d", 43000 + (rand() % 1000) );                        // random listen port
214                 listen_port = strdup( wbuf );
215         } else {
216                 listen_port = "43086";
217         }
218
219
220         mk_rt();                                // create a dummy route table so we don't have errors/hang
221
222         fprintf( stderr, "[INFO] listen port: %s; sending %d messages\n", listen_port, num2send );
223
224         if( (mrc = rmr_init( listen_port, 1400, 0 )) == NULL ) {                // start without route table listener thread
225                 fprintf( stderr, "[FAIL] unable to initialise RMr\n" );
226                 exit( 1 );
227         }
228         fprintf( stderr, "[INFO] RMR initialised\n" );
229
230         if( (rcv_fd = rmr_get_rcvfd( mrc )) < 0 ) {                     // if we can't get an epoll FD, then we can't timeout; abort
231                 fprintf( stderr, "[FAIL] unable to get an epoll FD\n" );
232                 exit( 1 );
233         }
234
235         if( (ep_fd = epoll_create1( 0 )) < 0 ) {
236                 fprintf( stderr, "[FAIL] unable to create epoll fd: %d\n", errno );
237                 exit( 1 );
238         }
239         epe.events = EPOLLIN;
240         epe.data.fd = rcv_fd;
241
242         if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
243                 fprintf( stderr, "[FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
244                 exit( 1 );
245         }
246
247         while( ! rmr_ready( mrc ) ) {
248                 sleep( 1 );
249         }
250
251         mbuf = rmr_alloc_msg( mrc, sizeof( *payload ) + 100 );          // send buffer with a bit of padding
252
253         fprintf( stderr, "[INFO] starting session with %s, starting to send\n", target );
254         whid = rmr_wh_open( mrc, target );                                                              // open a wormhole directly to the target
255         if( whid < 0 ) {
256                 fprintf( stderr, "[FAIL] unable to connect to %s\n", target );
257                 exit( 2 );
258         }
259
260         fprintf( stderr, "[INFO] connected to %s, starting to send\n", target );
261         rmr_set_stimeout( mrc, 3 );                                                                     // we let rmr retry failures for up to 3 "rounds"
262
263         gethostname( wbuf, sizeof( wbuf ) );
264         snprintf( me, sizeof( me ), "%s-%d", wbuf, getpid( ) );
265
266         errors = 0;
267         while( count < num2send ) {                                                             // we send n messages after the first message is successful
268                 if( !mbuf ) {
269                         fprintf( stderr, "[FAIL] mbuf is nil?\n" );
270                         exit( 1 );
271                 }
272
273                 payload = (mpl_t *) mbuf->payload;
274
275                 snprintf( wbuf, sizeof( payload->msg ), "%s count=%d %d", me, count, rand() );
276                 snprintf( mbuf->payload, 1024, "%d|%s", sum( wbuf , strlen( wbuf ) ), wbuf );
277
278                 mbuf->mtype = HEALTH_CHECK;
279                 mbuf->sub_id = -1;
280                 mbuf->len =  sizeof( *payload );
281                 mbuf->state = 0;
282
283                 clock_gettime( CLOCK_REALTIME, &payload->out_ts );              // mark time out
284                 mbuf = rmr_wh_send_msg( mrc, whid, mbuf );
285
286                 if( mbuf->state == RMR_OK ) {                                                   // good send, wait for response
287                         nready = epoll_wait( ep_fd, events, 1, max_timeout * 1000 );
288                         if( nready > 0 ) {
289                                 clock_gettime( CLOCK_REALTIME, &in_ts );                // mark response received time
290
291                                 mbuf = rmr_rcv_msg( mrc, mbuf );
292                                 payload = (mpl_t *) mbuf->payload;
293                                 tok = strchr( payload->msg, '|' );                              // find end of chksum
294                                 if( tok ) {
295                                         tok++;
296                                         cksum = sum( tok, strlen( tok ) );
297                                         if( cksum != atoi( payload->msg ) ) {
298                                                 fprintf( stderr, "[WRN] response to msg %d received, cksum mismatch; expected %d, got %d\n", 
299                                                         count+1, atoi( payload->msg ), cksum );
300                                         } else {
301                                                 fprintf( stderr, "[INFO] response to msg %d received, %d mu-sec\n",  count+1, elapsed( &payload->out_ts, &in_ts ) );
302                                         }
303                                 }
304                         } else {
305                                 fprintf( stderr, "[ERR] timeout waiting for response to message %d\n", count+1 );
306                                 errors++;
307                         }
308                 } else {
309                         fprintf( stderr, "[ERR] send failed: %d\n", mbuf->state );
310                 }
311
312                 count++;
313                 sleep( 1 );
314         }
315
316         rmr_wh_close( mrc, whid );
317
318         return errors = 0;
319 }
320