Fix RMR routing statistic data printout crash
[ric-plt/lib/rmr.git] / src / rmr / common / src / rtc_static.c
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2021 Nokia
5         Copyright (c) 2018-2021 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rt_collector.c
23         Abstract:       The route table collector is started as a separate pthread and
24                                 is responsible for listening for route table updates from a
25                                 route manager or route table generator process.
26
27         Author:         E. Scott Daniels
28         Date:           29 November 2018 (extracted to common 13 March 2019)
29 */
30
31 #ifndef _rt_collector_c
32 #define _rt_collector_c
33
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <netdb.h>
37 #include <errno.h>
38 #include <string.h>
39 #include <errno.h>
40 #include <fcntl.h>
41 #include <sys/types.h>
42 #include <sys/stat.h>
43 #include <unistd.h>
44
45 #include <RIC_message_types.h>          // needed for RMR/Rt Mgr msg types
46
47 // ---- local constants ------------------
48                                                                                 // flags
49 #define RTCFL_HAVE_UPDATE       0x01            // an update from RM was received
50
51 #define MAX_RTC_BUF     5 * 1024                        // max buffer size we'll expect is 4k, add some fudge room
52
53 // ------------------------------------------------------------------------------------------------
54
55 /*
56         Opens the vlevel control file if needed and reads the vlevel from it.
57         The file is rewound if already open so that external updates are captured.
58         The current level is returnd; 0 on error.
59
60         The environment variable (ENV_VERBOSE_FILE) is used to supply the file to
61         open and read. If missing, we will try /tmp/rmr.v.  We will try to open the file
62         on each call if not alrady open; this allows the value to be supplied after
63         start which helps debugging.
64
65         If close_file is true, then we will close the open vfd and return 0;
66 */
67 extern int refresh_vlevel( int close_file ) {
68         static int vfd = -1;
69
70         char*   eptr;
71         char    wbuf[128];                      // read buffer; MUST be 11 or greater
72         int             vlevel = 0;
73
74         if( close_file ) {
75                 if( vfd >= 0 ) {
76                         close( vfd );
77                         vfd = -1;
78                 }
79                 return 0;
80         }
81
82         if( vfd < 0 ) {                         // attempt to find/open on all calls if not open
83                 if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
84                         vfd = open( eptr, O_RDONLY );
85                 } else {
86                         vfd = open( "/tmp/rmr.v", O_RDONLY );
87                 }
88                 if( vfd < 0 ) {
89                         return 0;
90                 }
91         }
92
93         memset( wbuf, 0, sizeof( char ) * 11 );                 // ensure what we read will be nil terminated
94         if( lseek( vfd, 0, SEEK_SET ) == 0 && read( vfd, wbuf, 10 ) > 0 ) {
95                 vlevel = atoi( wbuf );
96         }
97
98         return vlevel;
99 }
100
101 /*
102         Loop forever (assuming we're running in a pthread reading the static table
103         every minute or so.
104 */
105 static void* rtc_file( void* vctx ) {
106         uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
107         char*   eptr;
108         int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
109         char    wbuf[256];
110
111
112         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
113                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
114                 return NULL;
115         }
116
117         ctx->flags |= CFL_NO_RTACK;                             // no attempt to ack when reading from a file
118         while( 1 ) {
119                 vlevel = refresh_vlevel( 0 );
120                 read_static_rt( ctx, vlevel );                                          // refresh from the file
121
122                 if( ctx->shutdown != 0 ) {                                                      // allow for graceful termination and unit testing
123                         refresh_vlevel( 1 );                                                            // close the verbose file if open
124                         return NULL;
125                 }
126
127                 if( ctx->rtable_ready ) {
128                         sleep( 60 );
129                 } else {
130                         sleep( 1 );                                                                             // check every second until we have a good one
131                 }
132         }
133 }
134
135 /*
136         Rtc_parse_msg parses a single message from the route manager. We allow multiple, newline terminated,
137         records in each message; it is required that the last record in the message be complete (we do not
138         reconstruct records split over multiple messages).  For each record, we call the record parser
139         to parse and add the information to the table being built.
140
141         This function was broken from the main rtc() function in order to be able to unit test it. Without
142         this as a standalone funciton, it was impossible to simulate a message arriving on the RTC's private
143         context.
144
145         To reduce malloc/free cycles, we allocate a static work buffer and expand it when needed; in other
146         words, this is not thread safe but it shouldn't need to be.
147 */
148 static void rtc_parse_msg( uta_ctx_t *ctx, uta_ctx_t* pvt_cx, rmr_mbuf_t* msg, int vlevel,  int* flags ) {
149         static  unsigned char* pbuf = NULL;
150         static  int pbuf_size = 0;
151
152         unsigned char* payload;
153         unsigned char* curr;
154         unsigned char* nextr;
155         int mlen;
156
157         payload = msg->payload;
158         mlen = msg->len;                                        // usable bytes in the payload
159
160         if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message type=%d len=%d\n", msg->mtype, (int) mlen );
161         switch( msg->mtype ) {
162                 case RMRRM_TABLE_DATA:
163                         if( (*flags & RTCFL_HAVE_UPDATE) == 0 ) {
164                                 *flags |= RTCFL_HAVE_UPDATE;
165                                 rmr_vlog( RMR_VL_INFO, "message flow from route manager starts\n" );
166                         }
167
168                         if( pbuf_size <= mlen ) {
169                                 if( pbuf ) {
170                                         free( pbuf );
171                                 }
172                                 if( mlen < 512 ) {
173                                         pbuf_size = 1024;
174                                 } else {
175                                         pbuf_size = mlen * 2;
176                                 }
177                                 pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
178                         }
179                         memcpy( pbuf, payload, mlen );
180                         pbuf[mlen] = 0;                                                                         // don't depend on sender making this a legit string
181                         if( vlevel > 1 ) {
182                                 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: rt message: (%s)\n", pbuf );
183                         }
184
185                         curr = pbuf;
186                         while( curr ) {                                                                         // loop over each record in the buffer
187                                 nextr = strchr( (char *) curr, '\n' );                  // allow multiple newline records, find end of current and mark
188
189                                 if( nextr ) {
190                                         *(nextr++) = 0;
191                                 }
192
193                                 if( vlevel > 1 ) {
194                                         rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc_parse_msg: snarf_fd=%d processing (%s)\n", ctx ? ctx->snarf_rt_fd : -99, curr );
195                                 }
196                                 parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg );         // parse record and add to in progress table; ack using rts to msg
197
198                                 curr = nextr;
199                         }
200
201                         msg->len = 0;                           // force back into the listen loop
202                         break;
203
204                 default:
205                         rmr_vlog( RMR_VL_WARN, "rmr_rtc: invalid message type=%d len=%d\n", msg->mtype, (int) msg->len );
206                         break;
207         }
208 }
209
210 /*
211         Route Table Collector
212         A side thread which either attempts to connect and request a table
213         from the Route Manager, or opens a port and listens for Route Manager
214         to push table updates.
215
216         It may do other things along the way (latency measurements, alarms,
217         respond to RMR pings, etc.).
218
219         The behaviour with respect to listening for Route Manager updates vs
220         the initiation of the connection and sending a request depends on the
221         value of the ENV_RTG_ADDR (RMR_RTG_SVC)  environment variable. If
222         host:port, or IP:port, is given, then we assume that we make the connection
223         and send a request for the table (request mode).  If the variable is just
224         a port, then we assume Route Manager will connect and push updates (original
225         method).
226
227         If the variable is not defined, the default behaviour, in order to be
228         backwards compatable, depends on the presence of the ENV_CTL_PORT
229         (RMR_CTL_PORT) variable (new with the support for requesting a table).
230
231
232         ENV_CTL_PORT    ENV_RTG_ADDR    Behaviour
233         unset                   unset                   Open default CTL port (DEF_CTL_PORT) and
234                                                                         wait for Rt Mgr to push tables
235
236         set                             unset                   Use the default Rt Mgr wellknown addr
237                                                                         and port (DEF_RTG_WK_ADDR) to connect
238                                                                         and request a table. The control port
239                                                                         used is the value set by ENV_CTL_PORT.
240
241         unset                   set                             As described above. The default control
242                                                                         port (DEF_CTL_PORT) is used.
243
244         When we are running in request mode, then we will send the RMR message
245         RMRRM_REFRESH to this address (wormhole) as a request for the route manager
246         to send a new table. We will attempt to connect and send requests until
247         we have a table. Calls to rmr_ready() will report FALSE until a table is
248         loaded _unless_ a seed table was given.
249
250         Route table information is expected to arrive on RMR messages with type
251         RMRRM_TABLE_DATA.  There is NOT a specific message type for each possible
252         table record, so the payload is as it appears in the seed file or as
253         delivered in old versions.  It may take several RMRRM_TABLE_DATA messages
254         to completely supply a new table or table update. See the header for parse_rt_rec
255         in common for a description of possible message contents.
256
257         Buffers received from the route table generator can contain multiple newline terminated
258         records, but each buffer must be less than 4K in length, and the last record in a
259         buffer may NOT be split across buffers.
260
261         Other chores:
262         In addition to the primary task of getting, vetting, and installing a new route table, or
263         updates to the existing table, this thread will periodically cause the send counts for each
264         endpoint known to be written to standard error. The frequency is once every 180 seconds, and
265         more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
266 */
267 static void* rtc( void* vctx ) {
268         uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
269         uta_ctx_t*      pvt_cx;                                 // private context for session with rtg
270         rmr_mbuf_t*     msg = NULL;                             // message from rtg
271         route_table_t* rt;                                      // the routing table that will be traversed to print statistics
272         char*   my_port;                                        // the port number that we will listen on (4561 has been the default for this)
273         char*   rtg_addr;                                       // host:port address of route table generator (route manager)
274         char*   daddr;                                          // duplicated rtg address string to parse/trash
275         size_t  buf_size;                                       // nng needs var pointer not just size?
276         int             i;
277         long    blabber = 0;                            // time of last blabber so we don't flood if rtg goes bad
278         int             cstate = -1;                            // connection state to rtg
279         int             state;                                          // processing state of some nng function
280         char*   tokens[128];
281         char    wbuf[128];
282         int             ntoks;
283         int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
284         char*   eptr;
285         int             epfd = -1;                                      // fd for epoll so we can multi-task
286         struct  epoll_event events[1];          // list of events to give to epoll; we only have one we care about
287         struct  epoll_event epe;                        // event definition for event to listen to
288         int             count_delay = 30;                       // number of seconds between writing count info; initially every 30s
289         int             bump_freq = 0;                          // time at which we will bump count frequency to every 5 minutes
290         int             flags = 0;
291         int             rt_req_freq = DEF_RTREQ_FREQ;   // request frequency (sec) when wanting a new table
292         int             nxt_rt_req = 0;                                 // time of next request
293
294
295         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
296                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
297                 return NULL;
298         }
299
300         vlevel = refresh_vlevel( 0 );
301
302         if( (eptr = getenv( ENV_RTREQ_FREA )) != NULL ) {
303                 rt_req_freq = atoi( eptr );
304                 if( rt_req_freq < 1 || rt_req_freq > 300 ) {
305                         rt_req_freq = DEF_RTREQ_FREQ;
306                         rmr_vlog( RMR_VL_WARN, "rmr_rtc: RT request frequency (%d) out of range (1-300), using default (%d)\n", rt_req_freq, DEF_RTREQ_FREQ );
307                 }
308         }
309         rmr_vlog( RMR_VL_INFO, "rmr_rtc: RT request frequency set to: %d seconds", rt_req_freq );
310
311         ctx->flags |= CFL_NO_RTACK;                             // don't ack when reading from a file
312         read_static_rt( ctx, vlevel );                  // seed the route table if one provided
313         ctx->flags &= ~CFL_NO_RTACK;
314         ctx->flags &= ~CFL_FULLRT;                              // even though rmr-ready goes true, the seed doesn't count as a full RT from route generator
315
316
317         my_port = getenv( ENV_CTL_PORT );                               // default port to listen on (likely 4561)
318         if( my_port == NULL || ! *my_port ) {                   // if undefined, then go with default
319                 my_port = DEF_CTL_PORT;
320                 daddr = DEF_CTL_PORT;                                           // backwards compat; if ctl port not hard defined, default is to listen
321         } else {
322                 daddr = DEF_RTG_WK_ADDR;                                        // if ctl port is defined, then default changes to connecting to well known RM addr
323         }
324
325         if( (rtg_addr = getenv( ENV_RTG_ADDR )) == NULL || ! *rtg_addr ) {              // undefined, use default set above
326                 rtg_addr = daddr;
327         }
328
329         daddr = strdup( rtg_addr );                                                                     // dup to destroy during parse
330
331         ntoks = uta_tokenise( daddr, tokens, 120, ':' );                        // should be host:ip of rt mgr (could be port only which we assume is old listen port)
332         switch( ntoks ) {
333                 case 0:                                                                 // should not happen, but prevent accidents and allow default to ignore additional tokens
334                         break;
335
336                 case 1:
337                         my_port = tokens[0];                    // just port -- assume backlevel environment where we just listen
338                         flags |= RTCFL_HAVE_UPDATE;             // prevent sending update reqests
339                         break;
340
341                 default:
342                         if( strcmp( tokens[0], "tcp" ) == 0 ) {                 // old school nng tcp:xxxx so we listen on xxx
343                                 flags |= RTCFL_HAVE_UPDATE;                                     // and signal not to try to request an update
344                                 my_port = tokens[1];
345                         } else {
346                                 // rtg_addr points at rt mgr address and my port set from env or default stands as is
347                         }
348                         break;
349         }
350
351         if( (pvt_cx = init( my_port, MAX_RTC_BUF, FL_NOTHREAD )) == NULL ) {                            // open a private context (no RT listener!)
352                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
353
354                 while( TRUE ) {                                                                                         // no listen port, just dump counts now and then
355                         sleep( count_delay );
356                         rt_epcounts( ctx->rtable, ctx->my_name );
357                 }
358
359                 return NULL;
360         }
361
362         ctx->rtg_whid = -1;
363
364         cycle_snarfed_rt( ctx );                                // cause the nrt to be opened
365
366         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", my_port );
367
368         bump_freq = time( NULL ) + 300;                         // after 5 minutes we decrease the count frequency
369         blabber = 0;
370         while( 1 ) {                                                                                                            // until the cows return, pigs fly, or somesuch event likely not to happen
371                 while( msg == NULL || msg->len <= 0 ) {                                                 // until we actually have something from the other side
372                         if( (flags & RTCFL_HAVE_UPDATE) == 0 && time( NULL ) >= nxt_rt_req ) {                  // no route table updated from rt mgr; request one
373                                 if( ctx->rtg_whid < 0 ) {
374                                         ctx->rtg_whid = rmr_wh_open( pvt_cx, rtg_addr );
375                                 }
376                                 send_update_req( pvt_cx, ctx );
377                                 nxt_rt_req = time( NULL ) + rt_req_freq;
378                         }
379
380                         msg = rmr_torcv_msg( pvt_cx, msg, 1000 );
381
382                         if( time( NULL ) > blabber  ) {
383                                 vlevel = refresh_vlevel( 0 );
384                                 blabber = time( NULL ) + count_delay;                           // set next time to blabber, then do so
385                                 if( blabber > bump_freq ) {
386                                         count_delay = 300;
387                                 }
388                                 if( vlevel >= 0 ) {                                                                             // allow it to be forced off with -n in verbose file
389                                         rt = get_rt( ctx );                                                                     // get active route table and up ref count
390                                         rt_epcounts( rt, ctx->my_name );
391                                         release_rt( ctx, rt );                                                          // dec safely the ref counter
392                                 }
393                         }
394
395                         if( ctx->shutdown != 0 ) {
396                                 break;                                                  // mostly for unit test, but allows a forced stop
397                         }
398
399                                                                                                 // extra housekeeping chores can be added here...
400                         alarm_if_drops( ctx, pvt_cx );          // send an alarm if message are dropping, clear if we set one and thtings are better
401                 }
402
403                 vlevel = refresh_vlevel( 0 );                   // ensure it's fresh when we get a message
404
405                 if( msg != NULL && msg->len > 0 ) {
406                         rtc_parse_msg( ctx, pvt_cx, msg, vlevel, &flags );
407                 }
408
409                 if( ctx->shutdown ) {           // mostly for testing, but allows user app to close us down if rmr_*() function sets this
410                         return NULL;
411                 }
412
413         }
414
415         return NULL;    // unreachable, but some compilers don't see that and complain.
416 }
417
418 #ifndef SI95_BUILD
419 // this is nng specific inas much as we allow raw (non-RMR) messages
420
421 /*
422         NOTE:   This is the original rtc code when we supported "raw" nano/nng messages
423                         from the route manger.  It is deprecated in favour of managing all RM-RMR
424                         communications via an RMR session.
425
426                         The rtc() function above is the new and preferred function regardless
427                         of transport.
428
429         -----------------------------------------------------------------------------------
430         Route Table Collector
431         A side thread which opens a socket and subscribes to a routing table generator.
432         It may do other things along the way (latency measurements?).
433
434         The pointer is a pointer to the context.
435
436         Listens for records from the route table generation publisher, expecting
437         one of the following, newline terminated, ASCII records:
438                 rte|msg-type||]name:port,name:port,...;name:port,...                    // route table entry with one or more groups of endpoints
439                 new|start                                                               // start of new table
440                 new|end                                                                 // end of new table; complete
441
442                 Name must be a host name which can be looked up via gethostbyname() (DNS).
443
444                 Multiple endpoints (name:port) may be given separated by a comma; an endpoint is selected using round robin
445                         for each message of the type that is sent.
446
447                 Multiple endpoint groups can be given as a comma separated list of endpoints, separated by semicolons:
448                                 group1n1:port,group1n2:port,group1n3:port;group2n1:port,group2n2:port
449
450                 If multiple groups are given, when send() is called for the cooresponding message type,
451                 the message will be sent to one endpoint in each group.
452
453                 msg-type is the numeric message type (e.g. 102). If it is given as n,name then it is assumed
454                 that the entry applies only to the instance running with the hostname 'name.'
455
456         Buffers received from the route table generator can contain multiple newline terminated
457         records, but each buffer must be less than 4K in length, and the last record in a
458         buffer may NOT be split across buffers.
459
460         Other chores:
461         In addition to the primary task of getting, vetting, and installing a new route table, or
462         updates to the existing table, this thread will periodically cause the send counts for each
463         endpoint known to be written to standard error. The frequency is once every 180 seconds, and
464         more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
465 */
466 static void* raw_rtc( void* vctx ) {
467         uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
468         uta_ctx_t*      pvt_cx;                                 // private context for session with rtg
469         rmr_mbuf_t*     msg = NULL;                             // message from rtg
470         route_table_t* rt;                                      // the routing table that will be traversed to print statistics
471         char*   payload;                                        // payload in the message
472         size_t  mlen;
473         char*   port;                                           // a port number we listen/connect to
474         char*   fport;                                          // pointer to the real buffer to free
475         size_t  buf_size;                                       // nng needs var pointer not just size?
476         char*   nextr;                                          // pointer at next record in the message
477         char*   curr;                                           // current record
478         int             i;
479         long    blabber = 0;                            // time of last blabber so we don't flood if rtg goes bad
480         int             cstate = -1;                            // connection state to rtg
481         int             state;                                          // processing state of some nng function
482         char*   tokens[128];
483         char    wbuf[128];
484         char*   pbuf = NULL;
485         int             pbuf_size = 0;                          // number allocated in pbuf
486         int             ntoks;
487         int             raw_interface = 1;                      // rtg is using raw NNG/Nano not RMr to send updates
488         int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
489         char*   eptr;
490         int             epfd = -1;                                      // fd for epoll so we can multi-task
491         struct  epoll_event events[1];          // list of events to give to epoll; we only have one we care about
492         struct  epoll_event epe;                        // event definition for event to listen to
493         int             rcv_fd = -1;                            // pollable file des from NNG to use for timeout
494         int             count_delay = 30;                       // number of seconds between writing count info; initially every 30s
495         int             bump_freq = 0;                          // time at which we will bump count frequency to every 5 minutes
496
497
498         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
499                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
500                 return NULL;
501         }
502
503         vlevel = refresh_vlevel( 0 );
504         read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
505
506         if( (port = getenv( ENV_RTG_PORT )) == NULL || ! *port ) {              // port we need to open to listen for RTG connections
507                 port = strdup( DEF_RTG_PORT );
508         } else {
509                 port = strdup( port );
510         }
511
512         fport = port;           // must hold to free
513
514         ntoks = uta_tokenise( port, tokens, 120, ':' );                 // assume tcp:port, but it could be port or old style host:port
515         switch( ntoks ) {
516                 case 1:
517                                 port = tokens[0];                       // just the port
518                                 break;
519
520                 case 2:
521                                 port = tokens[1];                       // tcp:port or :port
522                                 break;
523
524                 default:
525                                 port = DEF_RTG_PORT;            // this shouldn't happen, but parnioia is good
526                                 break;
527         }
528
529         if( (pvt_cx = init( port, MAX_RTG_MSG_SZ, FL_NOTHREAD )) == NULL ) {                            // open a private context
530                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
531
532                 while( TRUE ) {                                                                                         // no listen port, just dump counts now and then
533                         sleep( count_delay );
534                         rt_epcounts( ctx->rtable, ctx->my_name );
535                 }
536
537                 free( fport );                                  // parinoid free and return
538                 return NULL;
539         }
540
541         if( (rcv_fd = rmr_get_rcvfd( pvt_cx )) >= 0 ) {            // get the epoll fd for the rtg socket
542                 if( rcv_fd < 0 ) {
543                         rmr_vlog( RMR_VL_WARN, "cannot get epoll fd for rtg session; stats will generate only after update from rt manager\n" );
544                 } else {
545                         if( (epfd = epoll_create1( 0 )) < 0 ) {
546                                 rmr_vlog( RMR_VL_WARN, "stats will generate only after rt manager update; unable to create epoll fd for rtg session: %s\n", strerror( errno ) );
547                                 rcv_fd = -1;
548                         } else {
549                                 epe.events = EPOLLIN;
550                                 epe.data.fd = rcv_fd;
551
552                                 if( epoll_ctl( epfd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
553                                         rmr_vlog( RMR_VL_WARN, "stats will generate only after rt manager update; unable to init epoll_ctl: %s\n", strerror( errno ) );
554                                         rcv_fd = -1;
555                                 }
556                         }
557                 }
558         }
559
560         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", port );
561         free( fport );
562
563         // future:  if we need to register with the rtg, then build a message and send it through a wormhole here
564
565         bump_freq = time( NULL ) + 300;                         // after 5 minutes we decrease the count frequency
566         blabber = 0;
567         while( 1 ) {                    // until the cows return, pigs fly, or somesuch event likely not to happen
568                 while( msg == NULL || msg->len <= 0 ) {                                                 // until we actually have something from the other side
569                         if( rcv_fd < 0 || epoll_wait( epfd, events, 1, 1000 ) > 0 )  {  // skip epoll if init failed, else block for max 1 sec
570                                 if( raw_interface ) {
571                                         msg = (rmr_mbuf_t *) rcv_payload( pvt_cx, msg );                // receive from non-RMr sender
572                                 } else {
573                                         msg = rmr_rcv_msg( pvt_cx, msg );               // receive from an RMr sender
574                                 }
575                         } else {                                                                                                        // no msg, do extra tasks
576                                 if( msg != NULL ) {                                                                             // if we were working with a message; ensure no len
577                                         msg->len = 0;
578                                         msg->state = RMR_ERR_TIMEOUT;
579                                 }
580                         }
581
582                         if( time( NULL ) > blabber  ) {
583                                 vlevel = refresh_vlevel( 0 );
584                                 if( vlevel >= 0 ) {                                                                             // allow it to be forced off with -n in verbose file
585                                         blabber = time( NULL ) + count_delay;                           // set next time to blabber, then do so
586                                         if( blabber > bump_freq ) {
587                                                 count_delay = 300;
588                                         }
589                                         rt = get_rt( ctx );                                                                     // get active route table and up ref count
590                                         rt_epcounts( rt, ctx->my_name );
591                                         release_rt( ctx, rt );                                                          // dec safely the ref counter
592                                 }
593                         }
594
595                         alarm_if_drops( ctx );                          // send an alarm if message are dropping, clear if we set one and thtings are better
596                 }
597
598                 vlevel = refresh_vlevel( 0 );                   // ensure it's fresh when we get a message
599
600                 if( msg != NULL && msg->len > 0 ) {
601                         payload = msg->payload;
602                         mlen = msg->len;                                        // usable bytes in the payload
603                         if( vlevel > 1 ) {
604                                 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes (%s)\n", (int) mlen, msg->payload );
605                         } else {
606                                 if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes\n", (int) mlen );
607                         }
608
609                         if( pbuf_size <= mlen ) {
610                                 if( pbuf ) {
611                                         free( pbuf );
612                                 }
613                                 if( mlen < 512 ) {
614                                         pbuf_size = 512;
615                                 } else {
616                                         pbuf_size = mlen * 2;
617                                 }
618                                 pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
619                         }
620                         memcpy( pbuf, payload, mlen );
621                         pbuf[mlen] = 0;                                                                         // don't depend on sender making this a legit string
622
623                         curr = pbuf;
624                         while( curr ) {                                                         // loop over each record in the buffer
625                                 nextr = strchr( curr, '\n' );                   // allow multiple newline records, find end of current and mark
626
627                                 if( nextr ) {
628                                         *(nextr++) = 0;
629                                 }
630
631                                 if( vlevel > 1 ) {
632                                         rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
633                                 }
634                                 if( raw_interface ) {
635                                         parse_rt_rec( ctx, NULL, curr, vlevel, NULL );          // nil pvt to parser as we can't ack messages
636                                 } else {
637                                         parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg );         // parse record and add to in progress table
638                                 }
639
640                                 curr = nextr;
641                         }
642
643                         if( ctx->shutdown ) {           // mostly for testing, but allows user app to close us down if rmr_*() function sets this
644                                 break;
645                         }
646
647                         msg->len = 0;                           // force back into the listen loop
648                 }
649         }
650
651         return NULL;    // unreachable, but some compilers don't see that and complain.
652 }
653 #endif
654
655
656 #endif