9aca09218c7f62412afc8c0d9a342ba37e0f8aa3
[ric-plt/lib/rmr.git] / src / rmr / common / src / rtc_static.c
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rt_collector.c
23         Abstract:       The route table collector is started as a separate pthread and
24                                 is responsible for listening for route table updates from a
25                                 route manager or route table generator process.
26
27         Author:         E. Scott Daniels
28         Date:           29 November 2018 (extracted to common 13 March 2019)
29 */
30
31 #ifndef _rt_collector_c
32 #define _rt_collector_c
33
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <netdb.h>
37 #include <errno.h>
38 #include <string.h>
39 #include <errno.h>
40 #include <fcntl.h>
41 #include <sys/types.h>
42 #include <sys/stat.h>
43 #include <unistd.h>
44
45 #include <RIC_message_types.h>          // needed for RMR/Rt Mgr msg types
46
47 // ---- local constants ------------------
48                                                                                 // flags
49 #define RTCFL_HAVE_UPDATE       0x01            // an update from RM was received
50
51 #define MAX_RTC_BUF     5 * 1024                        // max buffer size we'll expect is 4k, add some fudge room
52
53 // ------------------------------------------------------------------------------------------------
54
55 /*
56         Loop forever (assuming we're running in a pthread reading the static table
57         every minute or so.
58 */
59 static void* rtc_file( void* vctx ) {
60         uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
61         char*   eptr;
62         int             vfd = -1;                                       // verbose file des if we have one
63         int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
64         char    wbuf[256];
65
66
67         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
68                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
69                 return NULL;
70         }
71
72         if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
73                 vfd = open( eptr, O_RDONLY );
74         }
75
76         ctx->flags |= CFL_NO_RTACK;                             // no attempt to ack when reading from a file
77         while( 1 ) {
78                 if( vfd >= 0 ) {
79                         wbuf[0] = 0;
80                         lseek( vfd, 0, 0 );
81                         if( read( vfd, wbuf, 10 ) > 0 ) {
82                                 vlevel = atoi( wbuf );
83                         }
84                 }
85
86                 read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
87
88                 if( ctx->shutdown != 0 ) {                                                      // allow for graceful termination and unit testing
89                         return NULL;
90                 }
91                 sleep( 60 );
92         }
93 }
94
95 static int refresh_vlevel( int vfd ) {
96         int vlevel = 0;
97         char    rbuf[128];
98
99         if( vfd >= 0 ) {                                        // if file is open, read current value
100                 rbuf[0] = 0;
101                 lseek( vfd, 0, 0 );
102                 if( read( vfd, rbuf, 10 ) > 0 ) {
103                         vlevel = atoi( rbuf );
104                 }
105         }
106
107         return vlevel;
108 }
109
110 /*
111         Rtc_parse_msg parses a single message from the route manager. We allow multiple, newline terminated,
112         records in each message; it is required that the last record in the message be complete (we do not
113         reconstruct records split over multiple messages).  For each record, we call the record parser
114         to parse and add the information to the table being built.
115
116         This function was broken from the main rtc() function in order to be able to unit test it. Without
117         this as a standalone funciton, it was impossible to simulate a message arriving on the RTC's private
118         context.
119
120         To reduce malloc/free cycles, we allocate a static work buffer and expand it when needed; in other
121         words, this is not thread safe but it shouldn't need to be.
122 */
123 static void rtc_parse_msg( uta_ctx_t *ctx, uta_ctx_t* pvt_cx, rmr_mbuf_t* msg, int vlevel,  int* flags ) {
124         static  unsigned char* pbuf = NULL;
125         static  int pbuf_size = 0;
126
127         unsigned char* payload;
128         unsigned char* curr;
129         unsigned char* nextr;
130         int mlen;
131
132         payload = msg->payload;
133         mlen = msg->len;                                        // usable bytes in the payload
134
135         if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message type=%d len=%d\n", msg->mtype, (int) mlen );
136         switch( msg->mtype ) {
137                 case RMRRM_TABLE_DATA:
138                         if( (*flags & RTCFL_HAVE_UPDATE) == 0 ) {
139                                 *flags |= RTCFL_HAVE_UPDATE;
140                                 rmr_vlog( RMR_VL_INFO, "message flow from route manager starts\n" );
141                         }
142
143                         if( pbuf_size <= mlen ) {
144                                 if( pbuf ) {
145                                         free( pbuf );
146                                 }
147                                 if( mlen < 512 ) {
148                                         pbuf_size = 1024;
149                                 } else {
150                                         pbuf_size = mlen * 2;
151                                 }
152                                 pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
153                         }
154                         memcpy( pbuf, payload, mlen );
155                         pbuf[mlen] = 0;                                                                         // don't depend on sender making this a legit string
156                         if( vlevel > 1 ) {
157                                 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: rt message: (%s)\n", pbuf );
158                         }
159
160                         curr = pbuf;
161                         while( curr ) {                                                                         // loop over each record in the buffer
162                                 nextr = strchr( (char *) curr, '\n' );                  // allow multiple newline records, find end of current and mark
163
164                                 if( nextr ) {
165                                         *(nextr++) = 0;
166                                 }
167
168                                 if( vlevel > 1 ) {
169                                         rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
170                                 }
171                                 parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg );         // parse record and add to in progress table; ack using rts to msg
172
173                                 curr = nextr;
174                         }
175
176                         msg->len = 0;                           // force back into the listen loop
177                         break;
178
179                 default:
180                         rmr_vlog( RMR_VL_WARN, "rmr_rtc: invalid message type=%d len=%d\n", msg->mtype, (int) msg->len );
181                         break;
182         }
183 }
184
185 /*
186         Route Table Collector
187         A side thread which either attempts to connect and request a table
188         from the Route Manager, or opens a port and listens for Route Manager
189         to push table updates.
190
191         It may do other things along the way (latency measurements, alarms,
192         respond to RMR pings, etc.).
193
194         The behaviour with respect to listening for Route Manager updates vs
195         the initiation of the connection and sending a request depends on the
196         value of the ENV_RTG_ADDR (RMR_RTG_SVC)  environment variable. If
197         host:port, or IP:port, is given, then we assume that we make the connection
198         and send a request for the table (request mode).  If the variable is just
199         a port, then we assume Route Manager will connect and push updates (original
200         method).
201
202         If the variable is not defined, the default behaviour, in order to be
203         backwards compatable, depends on the presence of the ENV_CTL_PORT
204         (RMR_CTL_PORT) variable (new with the support for requesting a table).
205
206
207         ENV_CTL_PORT    ENV_RTG_ADDR    Behaviour
208         unset                   unset                   Open default CTL port (DEF_CTL_PORT) and
209                                                                         wait for Rt Mgr to push tables
210
211         set                             unset                   Use the default Rt Mgr wellknown addr
212                                                                         and port (DEF_RTG_WK_ADDR) to connect
213                                                                         and request a table. The control port
214                                                                         used is the value set by ENV_CTL_PORT.
215
216         unset                   set                             As described above. The default control
217                                                                         port (DEF_CTL_PORT) is used.
218
219         When we are running in request mode, then we will send the RMR message
220         RMRRM_REFRESH to this address (wormhole) as a request for the route manager
221         to send a new table. We will attempt to connect and send requests until
222         we have a table. Calls to rmr_ready() will report FALSE until a table is
223         loaded _unless_ a seed table was given.
224
225         Route table information is expected to arrive on RMR messages with type
226         RMRRM_TABLE_DATA.  There is NOT a specific message type for each possible
227         table record, so the payload is as it appears in the seed file or as
228         delivered in old versions.  It may take several RMRRM_TABLE_DATA messages
229         to completely supply a new table or table update. See the header for parse_rt_rec
230         in common for a description of possible message contents.
231
232         Buffers received from the route table generator can contain multiple newline terminated
233         records, but each buffer must be less than 4K in length, and the last record in a
234         buffer may NOT be split across buffers.
235
236         Other chores:
237         In addition to the primary task of getting, vetting, and installing a new route table, or
238         updates to the existing table, this thread will periodically cause the send counts for each
239         endpoint known to be written to standard error. The frequency is once every 180 seconds, and
240         more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
241 */
242 static void* rtc( void* vctx ) {
243         uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
244         uta_ctx_t*      pvt_cx;                                 // private context for session with rtg
245         rmr_mbuf_t*     msg = NULL;                             // message from rtg
246         char*   my_port;                                        // the port number that we will listen on (4561 has been the default for this)
247         char*   rtg_addr;                                       // host:port address of route table generator (route manager)
248         char*   daddr;                                          // duplicated rtg address string to parse/trash
249         size_t  buf_size;                                       // nng needs var pointer not just size?
250         int             i;
251         long    blabber = 0;                            // time of last blabber so we don't flood if rtg goes bad
252         int             cstate = -1;                            // connection state to rtg
253         int             state;                                          // processing state of some nng function
254         char*   tokens[128];
255         char    wbuf[128];
256         int             ntoks;
257         int             vfd = -1;                                       // verbose file des if we have one
258         int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
259         char*   eptr;
260         int             epfd = -1;                                      // fd for epoll so we can multi-task
261         struct  epoll_event events[1];          // list of events to give to epoll; we only have one we care about
262         struct  epoll_event epe;                        // event definition for event to listen to
263         int             count_delay = 30;                       // number of seconds between writing count info; initially every 30s
264         int             bump_freq = 0;                          // time at which we will bump count frequency to every 5 minutes
265         int             flags = 0;
266         int             rt_req_freq = DEF_RTREQ_FREQ;   // request frequency (sec) when wanting a new table
267         int             nxt_rt_req = 0;                                 // time of next request
268
269
270         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
271                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
272                 return NULL;
273         }
274
275         if( (ctx->ephash = rmr_sym_alloc( RT_SIZE )) == NULL ) {                // master hash table for endpoints (each rt will reference this)
276                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: unable to allocate an endpoint hash table\n" );
277                 return NULL;
278         }
279
280         if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
281                 vfd = open( eptr, O_RDONLY );
282                 vlevel = refresh_vlevel( vfd );
283         }
284
285         if( (eptr = getenv( ENV_RTREQ_FREA )) != NULL ) {
286                 rt_req_freq = atoi( eptr );
287                 if( rt_req_freq < 1 || rt_req_freq > 300 ) {
288                         rt_req_freq = DEF_RTREQ_FREQ;
289                         rmr_vlog( RMR_VL_WARN, "rmr_rtc: RT request frequency (%d) out of range (1-300), using default", DEF_RTREQ_FREQ );
290                 }
291         }
292         rmr_vlog( RMR_VL_INFO, "rmr_rtc: RT request frequency set to: %d seconds", rt_req_freq );
293
294         ctx->flags |= CFL_NO_RTACK;                             // don't ack when reading from a file
295         read_static_rt( ctx, vlevel );                  // seed the route table if one provided
296         ctx->flags &= ~CFL_NO_RTACK;
297
298
299         my_port = getenv( ENV_CTL_PORT );                               // default port to listen on (likely 4561)
300         if( my_port == NULL || ! *my_port ) {                   // if undefined, then go with default
301                 my_port = DEF_CTL_PORT;
302                 daddr = DEF_CTL_PORT;                                           // backwards compat; if ctl port not hard defined, default is to listen
303         } else {
304                 daddr = DEF_RTG_WK_ADDR;                                        // if ctl port is defined, then default changes to connecting to well known RM addr
305         }
306
307         if( (rtg_addr = getenv( ENV_RTG_ADDR )) == NULL || ! *rtg_addr ) {              // undefined, use default set above
308                 rtg_addr = daddr;
309         }
310
311         daddr = strdup( rtg_addr );                                                                     // dup to destroy during parse
312
313         ntoks = uta_tokenise( daddr, tokens, 120, ':' );                        // should be host:ip of rt mgr (could be port only which we assume is old listen port)
314         switch( ntoks ) {
315                 case 0:                                                                 // should not happen, but prevent accidents and allow default to ignore additional tokens
316                         break;
317
318                 case 1:
319                         my_port = tokens[0];                    // just port -- assume backlevel environment where we just listen
320                         flags |= RTCFL_HAVE_UPDATE;             // prevent sending update reqests
321                         break;
322
323                 default:
324                         if( strcmp( tokens[0], "tcp" ) == 0 ) {                 // old school nng tcp:xxxx so we listen on xxx
325                                 flags |= RTCFL_HAVE_UPDATE;                                     // and signal not to try to request an update
326                                 my_port = tokens[1];
327                         } else {
328                                 // rtg_addr points at rt mgr address and my port set from env or default stands as is
329                         }
330                         break;
331         }
332
333         if( (pvt_cx = init( my_port, MAX_RTC_BUF, FL_NOTHREAD )) == NULL ) {                            // open a private context (no RT listener!)
334                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
335
336                 while( TRUE ) {                                                                                         // no listen port, just dump counts now and then
337                         sleep( count_delay );
338                         rt_epcounts( ctx->rtable, ctx->my_name );
339                 }
340
341                 return NULL;
342         }
343
344         ctx->rtg_whid = -1;
345
346         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", my_port );
347
348         bump_freq = time( NULL ) + 300;                         // after 5 minutes we decrease the count frequency
349         blabber = 0;
350         while( 1 ) {                                                                                                            // until the cows return, pigs fly, or somesuch event likely not to happen
351                 while( msg == NULL || msg->len <= 0 ) {                                                 // until we actually have something from the other side
352                         if( (flags & RTCFL_HAVE_UPDATE) == 0 && time( NULL ) >= nxt_rt_req ) {                  // no route table updated from rt mgr; request one
353                                 if( ctx->rtg_whid < 0 ) {
354                                         ctx->rtg_whid = rmr_wh_open( pvt_cx, rtg_addr );
355                                 }
356                                 send_update_req( pvt_cx, ctx );
357                                 nxt_rt_req = time( NULL ) + rt_req_freq;
358                         }
359
360                         msg = rmr_torcv_msg( pvt_cx, msg, 1000 );
361
362                         if( time( NULL ) > blabber  ) {
363                                 vlevel = refresh_vlevel( vfd );
364                                 if( vlevel >= 0 ) {                                                                             // allow it to be forced off with -n in verbose file
365                                         blabber = time( NULL ) + count_delay;                           // set next time to blabber, then do so
366                                         if( blabber > bump_freq ) {
367                                                 count_delay = 300;
368                                         }
369                                         rt_epcounts( ctx->rtable, ctx->my_name );
370                                 }
371                         }
372
373                         if( ctx->shutdown != 0 ) {
374                                 break;                                                  // mostly for unit test, but allows a forced stop
375                         }
376                 }
377
378                 vlevel = refresh_vlevel( vfd );                 // ensure it's fresh when we get a message
379
380                 if( msg != NULL && msg->len > 0 ) {
381                         rtc_parse_msg( ctx, pvt_cx, msg, vlevel, &flags );
382                 }
383
384                 if( ctx->shutdown ) {           // mostly for testing, but allows user app to close us down if rmr_*() function sets this
385                         return NULL;
386                 }
387
388         }
389
390         return NULL;    // unreachable, but some compilers don't see that and complain.
391 }
392
393 #ifndef SI95_BUILD
394 // this is nng specific inas much as we allow raw (non-RMR) messages
395
396 /*
397         NOTE:   This is the original rtc code when we supported "raw" nano/nng messages
398                         from the route manger.  It is deprecated in favour of managing all RM-RMR
399                         communications via an RMR session.
400
401                         The rtc() function above is the new and preferred function regardless
402                         of transport.
403
404         -----------------------------------------------------------------------------------
405         Route Table Collector
406         A side thread which opens a socket and subscribes to a routing table generator.
407         It may do other things along the way (latency measurements?).
408
409         The pointer is a pointer to the context.
410
411         Listens for records from the route table generation publisher, expecting
412         one of the following, newline terminated, ASCII records:
413                 rte|msg-type||]name:port,name:port,...;name:port,...                    // route table entry with one or more groups of endpoints
414                 new|start                                                               // start of new table
415                 new|end                                                                 // end of new table; complete
416
417                 Name must be a host name which can be looked up via gethostbyname() (DNS).
418
419                 Multiple endpoints (name:port) may be given separated by a comma; an endpoint is selected using round robin
420                         for each message of the type that is sent.
421
422                 Multiple endpoint groups can be given as a comma separated list of endpoints, separated by semicolons:
423                                 group1n1:port,group1n2:port,group1n3:port;group2n1:port,group2n2:port
424
425                 If multiple groups are given, when send() is called for the cooresponding message type,
426                 the message will be sent to one endpoint in each group.
427
428                 msg-type is the numeric message type (e.g. 102). If it is given as n,name then it is assumed
429                 that the entry applies only to the instance running with the hostname 'name.'
430
431         Buffers received from the route table generator can contain multiple newline terminated
432         records, but each buffer must be less than 4K in length, and the last record in a
433         buffer may NOT be split across buffers.
434
435         Other chores:
436         In addition to the primary task of getting, vetting, and installing a new route table, or
437         updates to the existing table, this thread will periodically cause the send counts for each
438         endpoint known to be written to standard error. The frequency is once every 180 seconds, and
439         more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
440 */
441 static void* raw_rtc( void* vctx ) {
442         uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
443         uta_ctx_t*      pvt_cx;                                 // private context for session with rtg
444         rmr_mbuf_t*     msg = NULL;                             // message from rtg
445         char*   payload;                                        // payload in the message
446         size_t  mlen;
447         char*   port;                                           // a port number we listen/connect to
448         char*   fport;                                          // pointer to the real buffer to free
449         size_t  buf_size;                                       // nng needs var pointer not just size?
450         char*   nextr;                                          // pointer at next record in the message
451         char*   curr;                                           // current record
452         int             i;
453         long    blabber = 0;                            // time of last blabber so we don't flood if rtg goes bad
454         int             cstate = -1;                            // connection state to rtg
455         int             state;                                          // processing state of some nng function
456         char*   tokens[128];
457         char    wbuf[128];
458         char*   pbuf = NULL;
459         int             pbuf_size = 0;                          // number allocated in pbuf
460         int             ntoks;
461         int             raw_interface = 1;                      // rtg is using raw NNG/Nano not RMr to send updates
462         int             vfd = -1;                                       // verbose file des if we have one
463         int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
464         char*   eptr;
465         int             epfd = -1;                                      // fd for epoll so we can multi-task
466         struct  epoll_event events[1];          // list of events to give to epoll; we only have one we care about
467         struct  epoll_event epe;                        // event definition for event to listen to
468         int             rcv_fd = -1;                            // pollable file des from NNG to use for timeout
469         int             count_delay = 30;                       // number of seconds between writing count info; initially every 30s
470         int             bump_freq = 0;                          // time at which we will bump count frequency to every 5 minutes
471
472
473         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
474                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
475                 return NULL;
476         }
477
478         if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
479                 vfd = open( eptr, O_RDONLY );
480                 vlevel = refresh_vlevel( vfd );
481         }
482
483         read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
484
485         if( (port = getenv( ENV_RTG_PORT )) == NULL || ! *port ) {              // port we need to open to listen for RTG connections
486                 port = strdup( DEF_RTG_PORT );
487         } else {
488                 port = strdup( port );
489         }
490
491 /*
492         this test is now done in init and this function is started _only_ if the value was 1
493         if( (curr = getenv( ENV_RTG_RAW )) != NULL ) {
494                 raw_interface = atoi( curr ) > 0;                               // if > 0 we assume that rtg messages are NOT coming from an RMr based process
495         }
496 */
497
498         fport = port;           // must hold to free
499
500         ntoks = uta_tokenise( port, tokens, 120, ':' );                 // assume tcp:port, but it could be port or old style host:port
501         switch( ntoks ) {
502                 case 1:
503                                 port = tokens[0];                       // just the port
504                                 break;
505
506                 case 2:
507                                 port = tokens[1];                       // tcp:port or :port
508                                 break;
509
510                 default:
511                                 port = DEF_RTG_PORT;            // this shouldn't happen, but parnioia is good
512                                 break;
513         }
514
515         if( (pvt_cx = init( port, MAX_RTG_MSG_SZ, FL_NOTHREAD )) == NULL ) {                            // open a private context
516                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
517
518                 while( TRUE ) {                                                                                         // no listen port, just dump counts now and then
519                         sleep( count_delay );
520                         rt_epcounts( ctx->rtable, ctx->my_name );
521                 }
522
523                 free( fport );                                  // parinoid free and return
524                 return NULL;
525         }
526
527         if( (rcv_fd = rmr_get_rcvfd( pvt_cx )) >= 0 ) {            // get the epoll fd for the rtg socket
528                 if( rcv_fd < 0 ) {
529                         rmr_vlog( RMR_VL_WARN, "cannot get epoll fd for rtg session; stats will generate only after update from rt manager\n" );
530                 } else {
531                         if( (epfd = epoll_create1( 0 )) < 0 ) {
532                                 rmr_vlog( RMR_VL_WARN, "stats will generate only after rt manager update; unable to create epoll fd for rtg session: %s\n", strerror( errno ) );
533                                 rcv_fd = -1;
534                         } else {
535                                 epe.events = EPOLLIN;
536                                 epe.data.fd = rcv_fd;
537
538                                 if( epoll_ctl( epfd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
539                                         rmr_vlog( RMR_VL_WARN, "stats will generate only after rt manager update; unable to init epoll_ctl: %s\n", strerror( errno ) );
540                                         rcv_fd = -1;
541                                 }
542                         }
543                 }
544         }
545
546         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", port );
547         free( fport );
548
549         // future:  if we need to register with the rtg, then build a message and send it through a wormhole here
550
551         bump_freq = time( NULL ) + 300;                         // after 5 minutes we decrease the count frequency
552         blabber = 0;
553         while( 1 ) {                    // until the cows return, pigs fly, or somesuch event likely not to happen
554                 while( msg == NULL || msg->len <= 0 ) {                                                 // until we actually have something from the other side
555                         if( rcv_fd < 0 || epoll_wait( epfd, events, 1, 1000 ) > 0 )  {  // skip epoll if init failed, else block for max 1 sec
556                                 if( raw_interface ) {
557                                         msg = (rmr_mbuf_t *) rcv_payload( pvt_cx, msg );                // receive from non-RMr sender
558                                 } else {
559                                         msg = rmr_rcv_msg( pvt_cx, msg );               // receive from an RMr sender
560                                 }
561                         } else {                                                                                                        // no msg, do extra tasks
562                                 if( msg != NULL ) {                                                                             // if we were working with a message; ensure no len
563                                         msg->len = 0;
564                                         msg->state = RMR_ERR_TIMEOUT;
565                                 }
566                         }
567
568                         if( time( NULL ) > blabber  ) {
569                                 vlevel = refresh_vlevel( vfd );
570                                 if( vlevel >= 0 ) {                                                                             // allow it to be forced off with -n in verbose file
571                                         blabber = time( NULL ) + count_delay;                           // set next time to blabber, then do so
572                                         if( blabber > bump_freq ) {
573                                                 count_delay = 300;
574                                         }
575                                         rt_epcounts( ctx->rtable, ctx->my_name );
576                                 }
577                         }
578                 }
579
580                 vlevel = refresh_vlevel( vfd );                 // ensure it's fresh when we get a message
581
582                 if( msg != NULL && msg->len > 0 ) {
583                         payload = msg->payload;
584                         mlen = msg->len;                                        // usable bytes in the payload
585                         if( vlevel > 1 ) {
586                                 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes (%s)\n", (int) mlen, msg->payload );
587                         } else {
588                                 if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes\n", (int) mlen );
589                         }
590
591                         if( pbuf_size <= mlen ) {
592                                 if( pbuf ) {
593                                         free( pbuf );
594                                 }
595                                 if( mlen < 512 ) {
596                                         pbuf_size = 512;
597                                 } else {
598                                         pbuf_size = mlen * 2;
599                                 }
600                                 pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
601                         }
602                         memcpy( pbuf, payload, mlen );
603                         pbuf[mlen] = 0;                                                                         // don't depend on sender making this a legit string
604
605                         curr = pbuf;
606                         while( curr ) {                                                         // loop over each record in the buffer
607                                 nextr = strchr( curr, '\n' );                   // allow multiple newline records, find end of current and mark
608
609                                 if( nextr ) {
610                                         *(nextr++) = 0;
611                                 }
612
613                                 if( vlevel > 1 ) {
614                                         rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
615                                 }
616                                 if( raw_interface ) {
617                                         parse_rt_rec( ctx, NULL, curr, vlevel, NULL );          // nil pvt to parser as we can't ack messages
618                                 } else {
619                                         parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg );         // parse record and add to in progress table
620                                 }
621
622                                 curr = nextr;
623                         }
624
625                         if( ctx->shutdown ) {           // mostly for testing, but allows user app to close us down if rmr_*() function sets this
626                                 break;
627                         }
628
629                         msg->len = 0;                           // force back into the listen loop
630                 }
631         }
632
633         return NULL;    // unreachable, but some compilers don't see that and complain.
634 }
635 #endif
636
637
638 #endif