cc4730e00c470cbadd27e75f9fb74864c42506ca
[ric-plt/lib/rmr.git] / src / rmr / common / src / rtc_static.c
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rt_collector.c
23         Abstract:       The route table collector is started as a separate pthread and
24                                 is responsible for listening for route table updates from a
25                                 route manager or route table generator process.
26
27         Author:         E. Scott Daniels
28         Date:           29 November 2018 (extracted to common 13 March 2019)
29 */
30
31 #ifndef _rt_collector_c
32 #define _rt_collector_c
33
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <netdb.h>
37 #include <errno.h>
38 #include <string.h>
39 #include <errno.h>
40 #include <fcntl.h>
41 #include <sys/types.h>
42 #include <sys/stat.h>
43 #include <unistd.h>
44
45 #include <RIC_message_types.h>          // needed for RMR/Rt Mgr msg types
46
47 // ---- local constants ------------------
48                                                                                 // flags
49 #define RTCFL_HAVE_UPDATE       0x01            // an update from RM was received
50
51 #define MAX_RTC_BUF     5 * 1024                        // max buffer size we'll expect is 4k, add some fudge room
52
53 // ------------------------------------------------------------------------------------------------
54
55 /*
56         Opens the vlevel control file if needed and reads the vlevel from it.
57         The file is rewound if already open so that external updates are captured.
58         The current level is returnd; 0 on error.
59
60         The environment variable (ENV_VERBOSE_FILE) is used to supply the file to
61         open and read. If missing, we will try /tmp/rmr.v.  We will try to open the file
62         on each call if not alrady open; this allows the value to be supplied after
63         start which helps debugging.
64
65         If close_file is true, then we will close the open vfd and return 0;
66 */
67 extern int refresh_vlevel( int close_file ) {
68         static int vfd = -1;
69
70         char*   eptr;
71         char    wbuf[128];                      // read buffer; MUST be 11 or greater
72         int             vlevel = 0;
73
74         if( close_file ) {
75                 if( vfd >= 0 ) {
76                         close( vfd );
77                         vfd = -1;
78                 }
79                 return 0;
80         }
81
82         if( vfd < 0 ) {                         // attempt to find/open on all calls if not open
83                 if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
84                         vfd = open( eptr, O_RDONLY );
85                 } else {
86                         vfd = open( "/tmp/rmr.v", O_RDONLY );
87                 }
88                 if( vfd < 0 ) {
89                         return 0;
90                 }
91         }
92
93         memset( wbuf, 0, sizeof( char ) * 11 );                 // ensure what we read will be nil terminated
94         if( lseek( vfd, 0, SEEK_SET ) == 0 && read( vfd, wbuf, 10 ) > 0 ) {
95                 vlevel = atoi( wbuf );
96         }
97
98         return vlevel;
99 }
100
101 /*
102         Loop forever (assuming we're running in a pthread reading the static table
103         every minute or so.
104 */
105 static void* rtc_file( void* vctx ) {
106         uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
107         char*   eptr;
108         int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
109         char    wbuf[256];
110
111
112         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
113                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
114                 return NULL;
115         }
116
117         ctx->flags |= CFL_NO_RTACK;                             // no attempt to ack when reading from a file
118         while( 1 ) {
119                 vlevel = refresh_vlevel( 0 );
120                 read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
121
122                 if( ctx->shutdown != 0 ) {                                                      // allow for graceful termination and unit testing
123                         refresh_vlevel( 1 );                                                            // close the verbose file if open
124                         return NULL;
125                 }
126                 sleep( 60 );
127         }
128 }
129
130 /*
131         Rtc_parse_msg parses a single message from the route manager. We allow multiple, newline terminated,
132         records in each message; it is required that the last record in the message be complete (we do not
133         reconstruct records split over multiple messages).  For each record, we call the record parser
134         to parse and add the information to the table being built.
135
136         This function was broken from the main rtc() function in order to be able to unit test it. Without
137         this as a standalone funciton, it was impossible to simulate a message arriving on the RTC's private
138         context.
139
140         To reduce malloc/free cycles, we allocate a static work buffer and expand it when needed; in other
141         words, this is not thread safe but it shouldn't need to be.
142 */
143 static void rtc_parse_msg( uta_ctx_t *ctx, uta_ctx_t* pvt_cx, rmr_mbuf_t* msg, int vlevel,  int* flags ) {
144         static  unsigned char* pbuf = NULL;
145         static  int pbuf_size = 0;
146
147         unsigned char* payload;
148         unsigned char* curr;
149         unsigned char* nextr;
150         int mlen;
151
152         payload = msg->payload;
153         mlen = msg->len;                                        // usable bytes in the payload
154
155         if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message type=%d len=%d\n", msg->mtype, (int) mlen );
156         switch( msg->mtype ) {
157                 case RMRRM_TABLE_DATA:
158                         if( (*flags & RTCFL_HAVE_UPDATE) == 0 ) {
159                                 *flags |= RTCFL_HAVE_UPDATE;
160                                 rmr_vlog( RMR_VL_INFO, "message flow from route manager starts\n" );
161                         }
162
163                         if( pbuf_size <= mlen ) {
164                                 if( pbuf ) {
165                                         free( pbuf );
166                                 }
167                                 if( mlen < 512 ) {
168                                         pbuf_size = 1024;
169                                 } else {
170                                         pbuf_size = mlen * 2;
171                                 }
172                                 pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
173                         }
174                         memcpy( pbuf, payload, mlen );
175                         pbuf[mlen] = 0;                                                                         // don't depend on sender making this a legit string
176                         if( vlevel > 1 ) {
177                                 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: rt message: (%s)\n", pbuf );
178                         }
179
180                         curr = pbuf;
181                         while( curr ) {                                                                         // loop over each record in the buffer
182                                 nextr = strchr( (char *) curr, '\n' );                  // allow multiple newline records, find end of current and mark
183
184                                 if( nextr ) {
185                                         *(nextr++) = 0;
186                                 }
187
188                                 if( vlevel > 1 ) {
189                                         rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc_parse_msg: processing (%s)\n", curr );
190                                 }
191                                 parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg );         // parse record and add to in progress table; ack using rts to msg
192
193                                 curr = nextr;
194                         }
195
196                         msg->len = 0;                           // force back into the listen loop
197                         break;
198
199                 default:
200                         rmr_vlog( RMR_VL_WARN, "rmr_rtc: invalid message type=%d len=%d\n", msg->mtype, (int) msg->len );
201                         break;
202         }
203 }
204
205 /*
206         Route Table Collector
207         A side thread which either attempts to connect and request a table
208         from the Route Manager, or opens a port and listens for Route Manager
209         to push table updates.
210
211         It may do other things along the way (latency measurements, alarms,
212         respond to RMR pings, etc.).
213
214         The behaviour with respect to listening for Route Manager updates vs
215         the initiation of the connection and sending a request depends on the
216         value of the ENV_RTG_ADDR (RMR_RTG_SVC)  environment variable. If
217         host:port, or IP:port, is given, then we assume that we make the connection
218         and send a request for the table (request mode).  If the variable is just
219         a port, then we assume Route Manager will connect and push updates (original
220         method).
221
222         If the variable is not defined, the default behaviour, in order to be
223         backwards compatable, depends on the presence of the ENV_CTL_PORT
224         (RMR_CTL_PORT) variable (new with the support for requesting a table).
225
226
227         ENV_CTL_PORT    ENV_RTG_ADDR    Behaviour
228         unset                   unset                   Open default CTL port (DEF_CTL_PORT) and
229                                                                         wait for Rt Mgr to push tables
230
231         set                             unset                   Use the default Rt Mgr wellknown addr
232                                                                         and port (DEF_RTG_WK_ADDR) to connect
233                                                                         and request a table. The control port
234                                                                         used is the value set by ENV_CTL_PORT.
235
236         unset                   set                             As described above. The default control
237                                                                         port (DEF_CTL_PORT) is used.
238
239         When we are running in request mode, then we will send the RMR message
240         RMRRM_REFRESH to this address (wormhole) as a request for the route manager
241         to send a new table. We will attempt to connect and send requests until
242         we have a table. Calls to rmr_ready() will report FALSE until a table is
243         loaded _unless_ a seed table was given.
244
245         Route table information is expected to arrive on RMR messages with type
246         RMRRM_TABLE_DATA.  There is NOT a specific message type for each possible
247         table record, so the payload is as it appears in the seed file or as
248         delivered in old versions.  It may take several RMRRM_TABLE_DATA messages
249         to completely supply a new table or table update. See the header for parse_rt_rec
250         in common for a description of possible message contents.
251
252         Buffers received from the route table generator can contain multiple newline terminated
253         records, but each buffer must be less than 4K in length, and the last record in a
254         buffer may NOT be split across buffers.
255
256         Other chores:
257         In addition to the primary task of getting, vetting, and installing a new route table, or
258         updates to the existing table, this thread will periodically cause the send counts for each
259         endpoint known to be written to standard error. The frequency is once every 180 seconds, and
260         more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
261 */
262 static void* rtc( void* vctx ) {
263         uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
264         uta_ctx_t*      pvt_cx;                                 // private context for session with rtg
265         rmr_mbuf_t*     msg = NULL;                             // message from rtg
266         char*   my_port;                                        // the port number that we will listen on (4561 has been the default for this)
267         char*   rtg_addr;                                       // host:port address of route table generator (route manager)
268         char*   daddr;                                          // duplicated rtg address string to parse/trash
269         size_t  buf_size;                                       // nng needs var pointer not just size?
270         int             i;
271         long    blabber = 0;                            // time of last blabber so we don't flood if rtg goes bad
272         int             cstate = -1;                            // connection state to rtg
273         int             state;                                          // processing state of some nng function
274         char*   tokens[128];
275         char    wbuf[128];
276         int             ntoks;
277         int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
278         char*   eptr;
279         int             epfd = -1;                                      // fd for epoll so we can multi-task
280         struct  epoll_event events[1];          // list of events to give to epoll; we only have one we care about
281         struct  epoll_event epe;                        // event definition for event to listen to
282         int             count_delay = 30;                       // number of seconds between writing count info; initially every 30s
283         int             bump_freq = 0;                          // time at which we will bump count frequency to every 5 minutes
284         int             flags = 0;
285         int             rt_req_freq = DEF_RTREQ_FREQ;   // request frequency (sec) when wanting a new table
286         int             nxt_rt_req = 0;                                 // time of next request
287
288
289         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
290                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
291                 return NULL;
292         }
293
294         vlevel = refresh_vlevel( 0 );
295
296         if( (eptr = getenv( ENV_RTREQ_FREA )) != NULL ) {
297                 rt_req_freq = atoi( eptr );
298                 if( rt_req_freq < 1 || rt_req_freq > 300 ) {
299                         rt_req_freq = DEF_RTREQ_FREQ;
300                         rmr_vlog( RMR_VL_WARN, "rmr_rtc: RT request frequency (%d) out of range (1-300), using default", DEF_RTREQ_FREQ );
301                 }
302         }
303         rmr_vlog( RMR_VL_INFO, "rmr_rtc: RT request frequency set to: %d seconds", rt_req_freq );
304
305         ctx->flags |= CFL_NO_RTACK;                             // don't ack when reading from a file
306         read_static_rt( ctx, vlevel );                  // seed the route table if one provided
307         ctx->flags &= ~CFL_NO_RTACK;
308
309
310         my_port = getenv( ENV_CTL_PORT );                               // default port to listen on (likely 4561)
311         if( my_port == NULL || ! *my_port ) {                   // if undefined, then go with default
312                 my_port = DEF_CTL_PORT;
313                 daddr = DEF_CTL_PORT;                                           // backwards compat; if ctl port not hard defined, default is to listen
314         } else {
315                 daddr = DEF_RTG_WK_ADDR;                                        // if ctl port is defined, then default changes to connecting to well known RM addr
316         }
317
318         if( (rtg_addr = getenv( ENV_RTG_ADDR )) == NULL || ! *rtg_addr ) {              // undefined, use default set above
319                 rtg_addr = daddr;
320         }
321
322         daddr = strdup( rtg_addr );                                                                     // dup to destroy during parse
323
324         ntoks = uta_tokenise( daddr, tokens, 120, ':' );                        // should be host:ip of rt mgr (could be port only which we assume is old listen port)
325         switch( ntoks ) {
326                 case 0:                                                                 // should not happen, but prevent accidents and allow default to ignore additional tokens
327                         break;
328
329                 case 1:
330                         my_port = tokens[0];                    // just port -- assume backlevel environment where we just listen
331                         flags |= RTCFL_HAVE_UPDATE;             // prevent sending update reqests
332                         break;
333
334                 default:
335                         if( strcmp( tokens[0], "tcp" ) == 0 ) {                 // old school nng tcp:xxxx so we listen on xxx
336                                 flags |= RTCFL_HAVE_UPDATE;                                     // and signal not to try to request an update
337                                 my_port = tokens[1];
338                         } else {
339                                 // rtg_addr points at rt mgr address and my port set from env or default stands as is
340                         }
341                         break;
342         }
343
344         if( (pvt_cx = init( my_port, MAX_RTC_BUF, FL_NOTHREAD )) == NULL ) {                            // open a private context (no RT listener!)
345                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
346
347                 while( TRUE ) {                                                                                         // no listen port, just dump counts now and then
348                         sleep( count_delay );
349                         rt_epcounts( ctx->rtable, ctx->my_name );
350                 }
351
352                 return NULL;
353         }
354
355         ctx->rtg_whid = -1;
356
357         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", my_port );
358
359         bump_freq = time( NULL ) + 300;                         // after 5 minutes we decrease the count frequency
360         blabber = 0;
361         while( 1 ) {                                                                                                            // until the cows return, pigs fly, or somesuch event likely not to happen
362                 while( msg == NULL || msg->len <= 0 ) {                                                 // until we actually have something from the other side
363                         if( (flags & RTCFL_HAVE_UPDATE) == 0 && time( NULL ) >= nxt_rt_req ) {                  // no route table updated from rt mgr; request one
364                                 if( ctx->rtg_whid < 0 ) {
365                                         ctx->rtg_whid = rmr_wh_open( pvt_cx, rtg_addr );
366                                 }
367                                 send_update_req( pvt_cx, ctx );
368                                 nxt_rt_req = time( NULL ) + rt_req_freq;
369                         }
370
371                         msg = rmr_torcv_msg( pvt_cx, msg, 1000 );
372
373                         if( time( NULL ) > blabber  ) {
374                                 vlevel = refresh_vlevel( 0 );
375                                 blabber = time( NULL ) + count_delay;                           // set next time to blabber, then do so
376                                 if( blabber > bump_freq ) {
377                                         count_delay = 300;
378                                 }
379                                 if( vlevel >= 0 ) {                                                                             // allow it to be forced off with -n in verbose file
380                                         rt_epcounts( ctx->rtable, ctx->my_name );
381                                 }
382                         }
383
384                         if( ctx->shutdown != 0 ) {
385                                 break;                                                  // mostly for unit test, but allows a forced stop
386                         }
387
388                                                                                                 // extra housekeeping chores can be added here...
389                         alarm_if_drops( ctx, pvt_cx );          // send an alarm if message are dropping, clear if we set one and thtings are better
390                 }
391
392                 vlevel = refresh_vlevel( 0 );                   // ensure it's fresh when we get a message
393
394                 if( msg != NULL && msg->len > 0 ) {
395                         rtc_parse_msg( ctx, pvt_cx, msg, vlevel, &flags );
396                 }
397
398                 if( ctx->shutdown ) {           // mostly for testing, but allows user app to close us down if rmr_*() function sets this
399                         return NULL;
400                 }
401
402         }
403
404         return NULL;    // unreachable, but some compilers don't see that and complain.
405 }
406
407 #ifndef SI95_BUILD
408 // this is nng specific inas much as we allow raw (non-RMR) messages
409
410 /*
411         NOTE:   This is the original rtc code when we supported "raw" nano/nng messages
412                         from the route manger.  It is deprecated in favour of managing all RM-RMR
413                         communications via an RMR session.
414
415                         The rtc() function above is the new and preferred function regardless
416                         of transport.
417
418         -----------------------------------------------------------------------------------
419         Route Table Collector
420         A side thread which opens a socket and subscribes to a routing table generator.
421         It may do other things along the way (latency measurements?).
422
423         The pointer is a pointer to the context.
424
425         Listens for records from the route table generation publisher, expecting
426         one of the following, newline terminated, ASCII records:
427                 rte|msg-type||]name:port,name:port,...;name:port,...                    // route table entry with one or more groups of endpoints
428                 new|start                                                               // start of new table
429                 new|end                                                                 // end of new table; complete
430
431                 Name must be a host name which can be looked up via gethostbyname() (DNS).
432
433                 Multiple endpoints (name:port) may be given separated by a comma; an endpoint is selected using round robin
434                         for each message of the type that is sent.
435
436                 Multiple endpoint groups can be given as a comma separated list of endpoints, separated by semicolons:
437                                 group1n1:port,group1n2:port,group1n3:port;group2n1:port,group2n2:port
438
439                 If multiple groups are given, when send() is called for the cooresponding message type,
440                 the message will be sent to one endpoint in each group.
441
442                 msg-type is the numeric message type (e.g. 102). If it is given as n,name then it is assumed
443                 that the entry applies only to the instance running with the hostname 'name.'
444
445         Buffers received from the route table generator can contain multiple newline terminated
446         records, but each buffer must be less than 4K in length, and the last record in a
447         buffer may NOT be split across buffers.
448
449         Other chores:
450         In addition to the primary task of getting, vetting, and installing a new route table, or
451         updates to the existing table, this thread will periodically cause the send counts for each
452         endpoint known to be written to standard error. The frequency is once every 180 seconds, and
453         more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
454 */
455 static void* raw_rtc( void* vctx ) {
456         uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
457         uta_ctx_t*      pvt_cx;                                 // private context for session with rtg
458         rmr_mbuf_t*     msg = NULL;                             // message from rtg
459         char*   payload;                                        // payload in the message
460         size_t  mlen;
461         char*   port;                                           // a port number we listen/connect to
462         char*   fport;                                          // pointer to the real buffer to free
463         size_t  buf_size;                                       // nng needs var pointer not just size?
464         char*   nextr;                                          // pointer at next record in the message
465         char*   curr;                                           // current record
466         int             i;
467         long    blabber = 0;                            // time of last blabber so we don't flood if rtg goes bad
468         int             cstate = -1;                            // connection state to rtg
469         int             state;                                          // processing state of some nng function
470         char*   tokens[128];
471         char    wbuf[128];
472         char*   pbuf = NULL;
473         int             pbuf_size = 0;                          // number allocated in pbuf
474         int             ntoks;
475         int             raw_interface = 1;                      // rtg is using raw NNG/Nano not RMr to send updates
476         int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
477         char*   eptr;
478         int             epfd = -1;                                      // fd for epoll so we can multi-task
479         struct  epoll_event events[1];          // list of events to give to epoll; we only have one we care about
480         struct  epoll_event epe;                        // event definition for event to listen to
481         int             rcv_fd = -1;                            // pollable file des from NNG to use for timeout
482         int             count_delay = 30;                       // number of seconds between writing count info; initially every 30s
483         int             bump_freq = 0;                          // time at which we will bump count frequency to every 5 minutes
484
485
486         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
487                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
488                 return NULL;
489         }
490
491         vlevel = refresh_vlevel( 0 );
492         read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
493
494         if( (port = getenv( ENV_RTG_PORT )) == NULL || ! *port ) {              // port we need to open to listen for RTG connections
495                 port = strdup( DEF_RTG_PORT );
496         } else {
497                 port = strdup( port );
498         }
499
500         fport = port;           // must hold to free
501
502         ntoks = uta_tokenise( port, tokens, 120, ':' );                 // assume tcp:port, but it could be port or old style host:port
503         switch( ntoks ) {
504                 case 1:
505                                 port = tokens[0];                       // just the port
506                                 break;
507
508                 case 2:
509                                 port = tokens[1];                       // tcp:port or :port
510                                 break;
511
512                 default:
513                                 port = DEF_RTG_PORT;            // this shouldn't happen, but parnioia is good
514                                 break;
515         }
516
517         if( (pvt_cx = init( port, MAX_RTG_MSG_SZ, FL_NOTHREAD )) == NULL ) {                            // open a private context
518                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
519
520                 while( TRUE ) {                                                                                         // no listen port, just dump counts now and then
521                         sleep( count_delay );
522                         rt_epcounts( ctx->rtable, ctx->my_name );
523                 }
524
525                 free( fport );                                  // parinoid free and return
526                 return NULL;
527         }
528
529         if( (rcv_fd = rmr_get_rcvfd( pvt_cx )) >= 0 ) {            // get the epoll fd for the rtg socket
530                 if( rcv_fd < 0 ) {
531                         rmr_vlog( RMR_VL_WARN, "cannot get epoll fd for rtg session; stats will generate only after update from rt manager\n" );
532                 } else {
533                         if( (epfd = epoll_create1( 0 )) < 0 ) {
534                                 rmr_vlog( RMR_VL_WARN, "stats will generate only after rt manager update; unable to create epoll fd for rtg session: %s\n", strerror( errno ) );
535                                 rcv_fd = -1;
536                         } else {
537                                 epe.events = EPOLLIN;
538                                 epe.data.fd = rcv_fd;
539
540                                 if( epoll_ctl( epfd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
541                                         rmr_vlog( RMR_VL_WARN, "stats will generate only after rt manager update; unable to init epoll_ctl: %s\n", strerror( errno ) );
542                                         rcv_fd = -1;
543                                 }
544                         }
545                 }
546         }
547
548         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", port );
549         free( fport );
550
551         // future:  if we need to register with the rtg, then build a message and send it through a wormhole here
552
553         bump_freq = time( NULL ) + 300;                         // after 5 minutes we decrease the count frequency
554         blabber = 0;
555         while( 1 ) {                    // until the cows return, pigs fly, or somesuch event likely not to happen
556                 while( msg == NULL || msg->len <= 0 ) {                                                 // until we actually have something from the other side
557                         if( rcv_fd < 0 || epoll_wait( epfd, events, 1, 1000 ) > 0 )  {  // skip epoll if init failed, else block for max 1 sec
558                                 if( raw_interface ) {
559                                         msg = (rmr_mbuf_t *) rcv_payload( pvt_cx, msg );                // receive from non-RMr sender
560                                 } else {
561                                         msg = rmr_rcv_msg( pvt_cx, msg );               // receive from an RMr sender
562                                 }
563                         } else {                                                                                                        // no msg, do extra tasks
564                                 if( msg != NULL ) {                                                                             // if we were working with a message; ensure no len
565                                         msg->len = 0;
566                                         msg->state = RMR_ERR_TIMEOUT;
567                                 }
568                         }
569
570                         if( time( NULL ) > blabber  ) {
571                                 vlevel = refresh_vlevel( 0 );
572                                 if( vlevel >= 0 ) {                                                                             // allow it to be forced off with -n in verbose file
573                                         blabber = time( NULL ) + count_delay;                           // set next time to blabber, then do so
574                                         if( blabber > bump_freq ) {
575                                                 count_delay = 300;
576                                         }
577                                         rt_epcounts( ctx->rtable, ctx->my_name );
578                                 }
579                         }
580
581                         alarm_if_drops( ctx );                          // send an alarm if message are dropping, clear if we set one and thtings are better
582                 }
583
584                 vlevel = refresh_vlevel( 0 );                   // ensure it's fresh when we get a message
585
586                 if( msg != NULL && msg->len > 0 ) {
587                         payload = msg->payload;
588                         mlen = msg->len;                                        // usable bytes in the payload
589                         if( vlevel > 1 ) {
590                                 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes (%s)\n", (int) mlen, msg->payload );
591                         } else {
592                                 if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes\n", (int) mlen );
593                         }
594
595                         if( pbuf_size <= mlen ) {
596                                 if( pbuf ) {
597                                         free( pbuf );
598                                 }
599                                 if( mlen < 512 ) {
600                                         pbuf_size = 512;
601                                 } else {
602                                         pbuf_size = mlen * 2;
603                                 }
604                                 pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
605                         }
606                         memcpy( pbuf, payload, mlen );
607                         pbuf[mlen] = 0;                                                                         // don't depend on sender making this a legit string
608
609                         curr = pbuf;
610                         while( curr ) {                                                         // loop over each record in the buffer
611                                 nextr = strchr( curr, '\n' );                   // allow multiple newline records, find end of current and mark
612
613                                 if( nextr ) {
614                                         *(nextr++) = 0;
615                                 }
616
617                                 if( vlevel > 1 ) {
618                                         rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
619                                 }
620                                 if( raw_interface ) {
621                                         parse_rt_rec( ctx, NULL, curr, vlevel, NULL );          // nil pvt to parser as we can't ack messages
622                                 } else {
623                                         parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg );         // parse record and add to in progress table
624                                 }
625
626                                 curr = nextr;
627                         }
628
629                         if( ctx->shutdown ) {           // mostly for testing, but allows user app to close us down if rmr_*() function sets this
630                                 break;
631                         }
632
633                         msg->len = 0;                           // force back into the listen loop
634                 }
635         }
636
637         return NULL;    // unreachable, but some compilers don't see that and complain.
638 }
639 #endif
640
641
642 #endif