2deef8ee02cb1584fb8668fd9ab8befcf02a67f4
[ric-plt/lib/rmr.git] / src / rmr / common / src / rtc_static.c
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rt_collector.c
23         Abstract:       The route table collector is started as a separate pthread and
24                                 is responsible for listening for route table updates from a
25                                 route manager or route table generator process.
26
27         Author:         E. Scott Daniels
28         Date:           29 November 2018 (extracted to common 13 March 2019)
29 */
30
31 #ifndef _rt_collector_c
32 #define _rt_collector_c
33
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <netdb.h>
37 #include <errno.h>
38 #include <string.h>
39 #include <errno.h>
40 #include <fcntl.h>
41 #include <sys/types.h>
42 #include <sys/stat.h>
43 #include <unistd.h>
44
45 #include <RIC_message_types.h>          // needed for RMR/Rt Mgr msg types
46
47 // ---- local constants ------------------
48                                                                                 // flags
49 #define RTCFL_HAVE_UPDATE       0x01            // an update from RM was received
50
51 #define MAX_RTC_BUF     5 * 1024                        // max buffer size we'll expect is 4k, add some fudge room
52
53 // ------------------------------------------------------------------------------------------------
54
55 /*
56         Loop forever (assuming we're running in a pthread reading the static table
57         every minute or so.
58 */
59 static void* rtc_file( void* vctx ) {
60         uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
61         char*   eptr;
62         int             vfd = -1;                                       // verbose file des if we have one
63         int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
64         char    wbuf[256];
65
66
67         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
68                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
69                 return NULL;
70         }
71
72         if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
73                 vfd = open( eptr, O_RDONLY );
74         }
75
76         ctx->flags |= CFL_NO_RTACK;                             // no attempt to ack when reading from a file
77         while( 1 ) {
78                 if( vfd >= 0 ) {
79                         wbuf[0] = 0;
80                         lseek( vfd, 0, 0 );
81                         read( vfd, wbuf, 10 );
82                         vlevel = atoi( wbuf );
83                 }
84
85                 read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
86
87                 if( ctx->shutdown != 0 ) {                                                      // allow for graceful termination and unit testing
88                         return NULL;
89                 }
90                 sleep( 60 );
91         }
92 }
93
94 static int refresh_vlevel( int vfd ) {
95         int vlevel = 0;
96         char    rbuf[128];
97
98         if( vfd >= 0 ) {                                        // if file is open, read current value
99                 rbuf[0] = 0;
100                 lseek( vfd, 0, 0 );
101                 read( vfd, rbuf, 10 );
102                 vlevel = atoi( rbuf );
103         }
104
105         return vlevel;
106 }
107
108 /*
109         Rtc_parse_msg parses a single message from the route manager. We allow multiple, newline terminated,
110         records in each message; it is required that the last record in the message be complete (we do not
111         reconstruct records split over multiple messages).  For each record, we call the record parser
112         to parse and add the information to the table being built.
113
114         This function was broken from the main rtc() function in order to be able to unit test it. Without
115         this as a standalone funciton, it was impossible to simulate a message arriving on the RTC's private
116         context.
117
118         To reduce malloc/free cycles, we allocate a static work buffer and expand it when needed; in other
119         words, this is not thread safe but it shouldn't need to be.
120 */
121 static void rtc_parse_msg( uta_ctx_t *ctx, uta_ctx_t* pvt_cx, rmr_mbuf_t* msg, int vlevel,  int* flags ) {
122         static  unsigned char* pbuf = NULL;
123         static  int pbuf_size = 0;
124
125         unsigned char* payload;
126         unsigned char* curr;
127         unsigned char* nextr;
128         int mlen;
129
130         payload = msg->payload;
131         mlen = msg->len;                                        // usable bytes in the payload
132
133         if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message type=%d len=%d\n", msg->mtype, (int) mlen );
134         switch( msg->mtype ) {
135                 case RMRRM_TABLE_DATA:
136                         if( (*flags & RTCFL_HAVE_UPDATE) == 0 ) {
137                                 *flags |= RTCFL_HAVE_UPDATE;
138                                 rmr_vlog( RMR_VL_INFO, "message flow from route manager starts\n" );
139                         }
140
141                         if( pbuf_size <= mlen ) {
142                                 if( pbuf ) {
143                                         free( pbuf );
144                                 }
145                                 if( mlen < 512 ) {
146                                         pbuf_size = 1024;
147                                 } else {
148                                         pbuf_size = mlen * 2;
149                                 }
150                                 pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
151                         }
152                         memcpy( pbuf, payload, mlen );
153                         pbuf[mlen] = 0;                                                                         // don't depend on sender making this a legit string
154                         if( vlevel > 1 ) {
155                                 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: rt message: (%s)\n", pbuf );
156                         }
157
158                         curr = pbuf;
159                         while( curr ) {                                                                         // loop over each record in the buffer
160                                 nextr = strchr( (char *) curr, '\n' );                  // allow multiple newline records, find end of current and mark
161
162                                 if( nextr ) {
163                                         *(nextr++) = 0;
164                                 }
165
166                                 if( vlevel > 1 ) {
167                                         rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
168                                 }
169                                 parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg );         // parse record and add to in progress table; ack using rts to msg
170
171                                 curr = nextr;
172                         }
173
174                         msg->len = 0;                           // force back into the listen loop
175                         break;
176
177                 default:
178                         rmr_vlog( RMR_VL_WARN, "rmr_rtc: invalid message type=%d len=%d\n", msg->mtype, (int) msg->len );
179                         break;
180         }
181 }
182
183 /*
184         Route Table Collector
185         A side thread which either attempts to connect and request a table
186         from the Route Manager, or opens a port and listens for Route Manager
187         to push table updates.
188
189         It may do other things along the way (latency measurements, alarms,
190         respond to RMR pings, etc.).
191
192         The behaviour with respect to listening for Route Manager updates vs
193         the initiation of the connection and sending a request depends on the
194         value of the ENV_RTG_ADDR (RMR_RTG_SVC)  environment variable. If
195         host:port, or IP:port, is given, then we assume that we make the connection
196         and send a request for the table (request mode).  If the variable is just
197         a port, then we assume Route Manager will connect and push updates (original
198         method).
199
200         If the variable is not defined, the default behaviour, in order to be
201         backwards compatable, depends on the presence of the ENV_CTL_PORT
202         (RMR_CTL_PORT) variable (new with the support for requesting a table).
203
204
205         ENV_CTL_PORT    ENV_RTG_ADDR    Behaviour
206         unset                   unset                   Open default CTL port (DEF_CTL_PORT) and
207                                                                         wait for Rt Mgr to push tables
208
209         set                             unset                   Use the default Rt Mgr wellknown addr
210                                                                         and port (DEF_RTG_WK_ADDR) to connect
211                                                                         and request a table. The control port
212                                                                         used is the value set by ENV_CTL_PORT.
213
214         unset                   set                             As described above. The default control
215                                                                         port (DEF_CTL_PORT) is used.
216
217         When we are running in request mode, then we will send the RMR message
218         RMRRM_REFRESH to this address (wormhole) as a request for the route manager
219         to send a new table. We will attempt to connect and send requests until
220         we have a table. Calls to rmr_ready() will report FALSE until a table is
221         loaded _unless_ a seed table was given.
222
223         Route table information is expected to arrive on RMR messages with type
224         RMRRM_TABLE_DATA.  There is NOT a specific message type for each possible
225         table record, so the payload is as it appears in the seed file or as
226         delivered in old versions.  It may take several RMRRM_TABLE_DATA messages
227         to completely supply a new table or table update. See the header for parse_rt_rec
228         in common for a description of possible message contents.
229
230         Buffers received from the route table generator can contain multiple newline terminated
231         records, but each buffer must be less than 4K in length, and the last record in a
232         buffer may NOT be split across buffers.
233
234         Other chores:
235         In addition to the primary task of getting, vetting, and installing a new route table, or
236         updates to the existing table, this thread will periodically cause the send counts for each
237         endpoint known to be written to standard error. The frequency is once every 180 seconds, and
238         more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
239 */
240 static void* rtc( void* vctx ) {
241         uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
242         uta_ctx_t*      pvt_cx;                                 // private context for session with rtg
243         rmr_mbuf_t*     msg = NULL;                             // message from rtg
244         char*   my_port;                                        // the port number that we will listen on (4561 has been the default for this)
245         char*   rtg_addr;                                       // host:port address of route table generator (route manager)
246         char*   daddr;                                          // duplicated rtg address string to parse/trash
247         size_t  buf_size;                                       // nng needs var pointer not just size?
248         int             i;
249         long    blabber = 0;                            // time of last blabber so we don't flood if rtg goes bad
250         int             cstate = -1;                            // connection state to rtg
251         int             state;                                          // processing state of some nng function
252         char*   tokens[128];
253         char    wbuf[128];
254         int             ntoks;
255         int             vfd = -1;                                       // verbose file des if we have one
256         int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
257         char*   eptr;
258         int             epfd = -1;                                      // fd for epoll so we can multi-task
259         struct  epoll_event events[1];          // list of events to give to epoll; we only have one we care about
260         struct  epoll_event epe;                        // event definition for event to listen to
261         int             count_delay = 30;                       // number of seconds between writing count info; initially every 30s
262         int             bump_freq = 0;                          // time at which we will bump count frequency to every 5 minutes
263         int             flags = 0;
264         int             rt_req_freq = DEF_RTREQ_FREQ;   // request frequency (sec) when wanting a new table
265         int             nxt_rt_req = 0;                                 // time of next request
266
267
268         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
269                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
270                 return NULL;
271         }
272
273         if( (ctx->ephash = rmr_sym_alloc( RT_SIZE )) == NULL ) {                // master hash table for endpoints (each rt will reference this)
274                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: unable to allocate an endpoint hash table\n" );
275                 return NULL;
276         }
277
278         if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
279                 vfd = open( eptr, O_RDONLY );
280                 vlevel = refresh_vlevel( vfd );
281         }
282
283         if( (eptr = getenv( ENV_RTREQ_FREA )) != NULL ) {
284                 rt_req_freq = atoi( eptr );
285                 if( rt_req_freq < 1 || rt_req_freq > 300 ) {
286                         rt_req_freq = DEF_RTREQ_FREQ;
287                         rmr_vlog( RMR_VL_WARN, "rmr_rtc: RT request frequency (%d) out of range (1-300), using default", DEF_RTREQ_FREQ );
288                 }
289         }
290         rmr_vlog( RMR_VL_INFO, "rmr_rtc: RT request frequency set to: %d seconds", rt_req_freq );
291
292         ctx->flags |= CFL_NO_RTACK;                             // don't ack when reading from a file
293         read_static_rt( ctx, vlevel );                  // seed the route table if one provided
294         ctx->flags &= ~CFL_NO_RTACK;
295
296
297         my_port = getenv( ENV_CTL_PORT );                               // default port to listen on (likely 4561)
298         if( my_port == NULL || ! *my_port ) {                   // if undefined, then go with default
299                 my_port = DEF_CTL_PORT;
300                 daddr = DEF_CTL_PORT;                                           // backwards compat; if ctl port not hard defined, default is to listen
301         } else {
302                 daddr = DEF_RTG_WK_ADDR;                                        // if ctl port is defined, then default changes to connecting to well known RM addr
303         }
304
305         if( (rtg_addr = getenv( ENV_RTG_ADDR )) == NULL || ! *rtg_addr ) {              // undefined, use default set above
306                 rtg_addr = daddr;
307         }
308
309         daddr = strdup( rtg_addr );                                                                     // dup to destroy during parse
310
311         ntoks = uta_tokenise( daddr, tokens, 120, ':' );                        // should be host:ip of rt mgr (could be port only which we assume is old listen port)
312         switch( ntoks ) {
313                 case 0:                                                                 // should not happen, but prevent accidents and allow default to ignore additional tokens
314                         break;
315
316                 case 1:
317                         my_port = tokens[0];                    // just port -- assume backlevel environment where we just listen
318                         flags |= RTCFL_HAVE_UPDATE;             // prevent sending update reqests
319                         break;
320
321                 default:
322                         if( strcmp( tokens[0], "tcp" ) == 0 ) {                 // old school nng tcp:xxxx so we listen on xxx
323                                 flags |= RTCFL_HAVE_UPDATE;                                     // and signal not to try to request an update
324                                 my_port = tokens[1];
325                         } else {
326                                 // rtg_addr points at rt mgr address and my port set from env or default stands as is
327                         }
328                         break;
329         }
330
331         if( (pvt_cx = init( my_port, MAX_RTC_BUF, FL_NOTHREAD )) == NULL ) {                            // open a private context (no RT listener!)
332                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
333
334                 while( TRUE ) {                                                                                         // no listen port, just dump counts now and then
335                         sleep( count_delay );
336                         rt_epcounts( ctx->rtable, ctx->my_name );
337                 }
338
339                 return NULL;
340         }
341
342         ctx->rtg_whid = -1;
343
344         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", my_port );
345
346         bump_freq = time( NULL ) + 300;                         // after 5 minutes we decrease the count frequency
347         blabber = 0;
348         while( 1 ) {                                                                                                            // until the cows return, pigs fly, or somesuch event likely not to happen
349                 while( msg == NULL || msg->len <= 0 ) {                                                 // until we actually have something from the other side
350                         if( (flags & RTCFL_HAVE_UPDATE) == 0 && time( NULL ) >= nxt_rt_req ) {                  // no route table updated from rt mgr; request one
351                                 if( ctx->rtg_whid < 0 ) {
352                                         ctx->rtg_whid = rmr_wh_open( pvt_cx, rtg_addr );
353                                 }
354                                 send_update_req( pvt_cx, ctx );
355                                 nxt_rt_req = time( NULL ) + rt_req_freq;
356                         }
357
358                         msg = rmr_torcv_msg( pvt_cx, msg, 1000 );
359
360                         if( time( NULL ) > blabber  ) {
361                                 vlevel = refresh_vlevel( vfd );
362                                 if( vlevel >= 0 ) {                                                                             // allow it to be forced off with -n in verbose file
363                                         blabber = time( NULL ) + count_delay;                           // set next time to blabber, then do so
364                                         if( blabber > bump_freq ) {
365                                                 count_delay = 300;
366                                         }
367                                         rt_epcounts( ctx->rtable, ctx->my_name );
368                                 }
369                         }
370
371                         if( ctx->shutdown != 0 ) {
372                                 break;                                                  // mostly for unit test, but allows a forced stop
373                         }
374                 }
375
376                 vlevel = refresh_vlevel( vfd );                 // ensure it's fresh when we get a message
377
378                 if( msg != NULL && msg->len > 0 ) {
379                         rtc_parse_msg( ctx, pvt_cx, msg, vlevel, &flags );
380                 }
381
382                 if( ctx->shutdown ) {           // mostly for testing, but allows user app to close us down if rmr_*() function sets this
383                         return NULL;
384                 }
385
386         }
387
388         return NULL;    // unreachable, but some compilers don't see that and complain.
389 }
390
391 #ifndef SI95_BUILD
392 // this is nng specific inas much as we allow raw (non-RMR) messages
393
394 /*
395         NOTE:   This is the original rtc code when we supported "raw" nano/nng messages
396                         from the route manger.  It is deprecated in favour of managing all RM-RMR
397                         communications via an RMR session.
398
399                         The rtc() function above is the new and preferred function regardless
400                         of transport.
401
402         -----------------------------------------------------------------------------------
403         Route Table Collector
404         A side thread which opens a socket and subscribes to a routing table generator.
405         It may do other things along the way (latency measurements?).
406
407         The pointer is a pointer to the context.
408
409         Listens for records from the route table generation publisher, expecting
410         one of the following, newline terminated, ASCII records:
411                 rte|msg-type||]name:port,name:port,...;name:port,...                    // route table entry with one or more groups of endpoints
412                 new|start                                                               // start of new table
413                 new|end                                                                 // end of new table; complete
414
415                 Name must be a host name which can be looked up via gethostbyname() (DNS).
416
417                 Multiple endpoints (name:port) may be given separated by a comma; an endpoint is selected using round robin
418                         for each message of the type that is sent.
419
420                 Multiple endpoint groups can be given as a comma separated list of endpoints, separated by semicolons:
421                                 group1n1:port,group1n2:port,group1n3:port;group2n1:port,group2n2:port
422
423                 If multiple groups are given, when send() is called for the cooresponding message type,
424                 the message will be sent to one endpoint in each group.
425
426                 msg-type is the numeric message type (e.g. 102). If it is given as n,name then it is assumed
427                 that the entry applies only to the instance running with the hostname 'name.'
428
429         Buffers received from the route table generator can contain multiple newline terminated
430         records, but each buffer must be less than 4K in length, and the last record in a
431         buffer may NOT be split across buffers.
432
433         Other chores:
434         In addition to the primary task of getting, vetting, and installing a new route table, or
435         updates to the existing table, this thread will periodically cause the send counts for each
436         endpoint known to be written to standard error. The frequency is once every 180 seconds, and
437         more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
438 */
439 static void* raw_rtc( void* vctx ) {
440         uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
441         uta_ctx_t*      pvt_cx;                                 // private context for session with rtg
442         rmr_mbuf_t*     msg = NULL;                             // message from rtg
443         char*   payload;                                        // payload in the message
444         size_t  mlen;
445         char*   port;                                           // a port number we listen/connect to
446         char*   fport;                                          // pointer to the real buffer to free
447         size_t  buf_size;                                       // nng needs var pointer not just size?
448         char*   nextr;                                          // pointer at next record in the message
449         char*   curr;                                           // current record
450         int             i;
451         long    blabber = 0;                            // time of last blabber so we don't flood if rtg goes bad
452         int             cstate = -1;                            // connection state to rtg
453         int             state;                                          // processing state of some nng function
454         char*   tokens[128];
455         char    wbuf[128];
456         char*   pbuf = NULL;
457         int             pbuf_size = 0;                          // number allocated in pbuf
458         int             ntoks;
459         int             raw_interface = 1;                      // rtg is using raw NNG/Nano not RMr to send updates
460         int             vfd = -1;                                       // verbose file des if we have one
461         int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
462         char*   eptr;
463         int             epfd = -1;                                      // fd for epoll so we can multi-task
464         struct  epoll_event events[1];          // list of events to give to epoll; we only have one we care about
465         struct  epoll_event epe;                        // event definition for event to listen to
466         int             rcv_fd = -1;                            // pollable file des from NNG to use for timeout
467         int             count_delay = 30;                       // number of seconds between writing count info; initially every 30s
468         int             bump_freq = 0;                          // time at which we will bump count frequency to every 5 minutes
469
470
471         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
472                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
473                 return NULL;
474         }
475
476         if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
477                 vfd = open( eptr, O_RDONLY );
478                 vlevel = refresh_vlevel( vfd );
479         }
480
481         read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
482
483         if( (port = getenv( ENV_RTG_PORT )) == NULL || ! *port ) {              // port we need to open to listen for RTG connections
484                 port = strdup( DEF_RTG_PORT );
485         } else {
486                 port = strdup( port );
487         }
488
489 /*
490         this test is now done in init and this function is started _only_ if the value was 1
491         if( (curr = getenv( ENV_RTG_RAW )) != NULL ) {
492                 raw_interface = atoi( curr ) > 0;                               // if > 0 we assume that rtg messages are NOT coming from an RMr based process
493         }
494 */
495
496         fport = port;           // must hold to free
497
498         ntoks = uta_tokenise( port, tokens, 120, ':' );                 // assume tcp:port, but it could be port or old style host:port
499         switch( ntoks ) {
500                 case 1:
501                                 port = tokens[0];                       // just the port
502                                 break;
503
504                 case 2:
505                                 port = tokens[1];                       // tcp:port or :port
506                                 break;
507
508                 default:
509                                 port = DEF_RTG_PORT;            // this shouldn't happen, but parnioia is good
510                                 break;
511         }
512
513         if( (pvt_cx = init( port, MAX_RTG_MSG_SZ, FL_NOTHREAD )) == NULL ) {                            // open a private context
514                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
515
516                 while( TRUE ) {                                                                                         // no listen port, just dump counts now and then
517                         sleep( count_delay );
518                         rt_epcounts( ctx->rtable, ctx->my_name );
519                 }
520
521                 free( fport );                                  // parinoid free and return
522                 return NULL;
523         }
524
525         if( (rcv_fd = rmr_get_rcvfd( pvt_cx )) >= 0 ) {            // get the epoll fd for the rtg socket
526                 if( rcv_fd < 0 ) {
527                         rmr_vlog( RMR_VL_WARN, "cannot get epoll fd for rtg session; stats will generate only after update from rt manager\n" );
528                 } else {
529                         if( (epfd = epoll_create1( 0 )) < 0 ) {
530                                 rmr_vlog( RMR_VL_WARN, "stats will generate only after rt manager update; unable to create epoll fd for rtg session: %s\n", strerror( errno ) );
531                                 rcv_fd = -1;
532                         } else {
533                                 epe.events = EPOLLIN;
534                                 epe.data.fd = rcv_fd;
535
536                                 if( epoll_ctl( epfd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
537                                         rmr_vlog( RMR_VL_WARN, "stats will generate only after rt manager update; unable to init epoll_ctl: %s\n", strerror( errno ) );
538                                         rcv_fd = -1;
539                                 }
540                         }
541                 }
542         }
543
544         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", port );
545         free( fport );
546
547         // future:  if we need to register with the rtg, then build a message and send it through a wormhole here
548
549         bump_freq = time( NULL ) + 300;                         // after 5 minutes we decrease the count frequency
550         blabber = 0;
551         while( 1 ) {                    // until the cows return, pigs fly, or somesuch event likely not to happen
552                 while( msg == NULL || msg->len <= 0 ) {                                                 // until we actually have something from the other side
553                         if( rcv_fd < 0 || epoll_wait( epfd, events, 1, 1000 ) > 0 )  {  // skip epoll if init failed, else block for max 1 sec
554                                 if( raw_interface ) {
555                                         msg = (rmr_mbuf_t *) rcv_payload( pvt_cx, msg );                // receive from non-RMr sender
556                                 } else {
557                                         msg = rmr_rcv_msg( pvt_cx, msg );               // receive from an RMr sender
558                                 }
559                         } else {                                                                                                        // no msg, do extra tasks
560                                 if( msg != NULL ) {                                                                             // if we were working with a message; ensure no len
561                                         msg->len = 0;
562                                         msg->state = RMR_ERR_TIMEOUT;
563                                 }
564                         }
565
566                         if( time( NULL ) > blabber  ) {
567                                 vlevel = refresh_vlevel( vfd );
568                                 if( vlevel >= 0 ) {                                                                             // allow it to be forced off with -n in verbose file
569                                         blabber = time( NULL ) + count_delay;                           // set next time to blabber, then do so
570                                         if( blabber > bump_freq ) {
571                                                 count_delay = 300;
572                                         }
573                                         rt_epcounts( ctx->rtable, ctx->my_name );
574                                 }
575                         }
576                 }
577
578                 vlevel = refresh_vlevel( vfd );                 // ensure it's fresh when we get a message
579
580                 if( msg != NULL && msg->len > 0 ) {
581                         payload = msg->payload;
582                         mlen = msg->len;                                        // usable bytes in the payload
583                         if( vlevel > 1 ) {
584                                 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes (%s)\n", (int) mlen, msg->payload );
585                         } else {
586                                 if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes\n", (int) mlen );
587                         }
588
589                         if( pbuf_size <= mlen ) {
590                                 if( pbuf ) {
591                                         free( pbuf );
592                                 }
593                                 if( mlen < 512 ) {
594                                         pbuf_size = 512;
595                                 } else {
596                                         pbuf_size = mlen * 2;
597                                 }
598                                 pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
599                         }
600                         memcpy( pbuf, payload, mlen );
601                         pbuf[mlen] = 0;                                                                         // don't depend on sender making this a legit string
602
603                         curr = pbuf;
604                         while( curr ) {                                                         // loop over each record in the buffer
605                                 nextr = strchr( curr, '\n' );                   // allow multiple newline records, find end of current and mark
606
607                                 if( nextr ) {
608                                         *(nextr++) = 0;
609                                 }
610
611                                 if( vlevel > 1 ) {
612                                         rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
613                                 }
614                                 if( raw_interface ) {
615                                         parse_rt_rec( ctx, NULL, curr, vlevel, NULL );          // nil pvt to parser as we can't ack messages
616                                 } else {
617                                         parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg );         // parse record and add to in progress table
618                                 }
619
620                                 curr = nextr;
621                         }
622
623                         if( ctx->shutdown ) {           // mostly for testing, but allows user app to close us down if rmr_*() function sets this
624                                 break;
625                         }
626
627                         msg->len = 0;                           // force back into the listen loop
628                 }
629         }
630
631         return NULL;    // unreachable, but some compilers don't see that and complain.
632 }
633 #endif
634
635
636 #endif