25253422cbe66f497a9d6489c0b20dc87971a64d
[ric-plt/lib/rmr.git] / src / rmr / common / src / rtc_static.c
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rt_collector.c
23         Abstract:       The route table collector is started as a separate pthread and
24                                 is responsible for listening for route table updates from a
25                                 route manager or route table generator process.
26
27         Author:         E. Scott Daniels
28         Date:           29 November 2018 (extracted to common 13 March 2019)
29 */
30
31 #ifndef _rt_collector_c
32 #define _rt_collector_c
33
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <netdb.h>
37 #include <errno.h>
38 #include <string.h>
39 #include <errno.h>
40 #include <fcntl.h>
41 #include <sys/types.h>
42 #include <sys/stat.h>
43 #include <unistd.h>
44
45 #include <RIC_message_types.h>          // needed for RMR/Rt Mgr msg types
46
47 // ---- local constants ------------------
48                                                                                 // flags
49 #define RTCFL_HAVE_UPDATE       0x01            // an update from RM was received
50
51 #define MAX_RTC_BUF     5 * 1024                        // max buffer size we'll expect is 4k, add some fudge room
52
53 // ------------------------------------------------------------------------------------------------
54
55 /*
56         Opens the vlevel control file if needed and reads the vlevel from it.
57         The file is rewound if already open so that external updates are captured.
58         The current level is returnd; 0 on error.
59
60         The environment variable (ENV_VERBOSE_FILE) is used to supply the file to
61         open and read. If missing, we will try /tmp/rmr.v.  We will try to open the file
62         on each call if not alrady open; this allows the value to be supplied after
63         start which helps debugging.
64
65         If close_file is true, then we will close the open vfd and return 0;
66 */
67 extern int refresh_vlevel( int close_file ) {
68         static int vfd = -1;
69
70         char*   eptr;
71         char    wbuf[128];                      // read buffer; MUST be 11 or greater
72         int             vlevel = 0;
73
74         if( close_file ) {
75                 if( vfd >= 0 ) {
76                         close( vfd );
77                         vfd = -1;
78                 }
79                 return 0;
80         }
81
82         if( vfd < 0 ) {                         // attempt to find/open on all calls if not open
83                 if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
84                         vfd = open( eptr, O_RDONLY );
85                 } else {
86                         vfd = open( "/tmp/rmr.v", O_RDONLY );
87                 }
88                 if( vfd < 0 ) {
89                         return 0;
90                 }
91         }
92
93         memset( wbuf, 0, sizeof( char ) * 11 );                 // ensure what we read will be nil terminated
94         if( lseek( vfd, 0, SEEK_SET ) == 0 && read( vfd, wbuf, 10 ) > 0 ) {
95                 vlevel = atoi( wbuf );
96         }
97
98         return vlevel;
99 }
100
101 /*
102         Loop forever (assuming we're running in a pthread reading the static table
103         every minute or so.
104 */
105 static void* rtc_file( void* vctx ) {
106         uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
107         char*   eptr;
108         int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
109         char    wbuf[256];
110
111
112         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
113                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
114                 return NULL;
115         }
116
117         ctx->flags |= CFL_NO_RTACK;                             // no attempt to ack when reading from a file
118         while( 1 ) {
119                 vlevel = refresh_vlevel( 0 );
120                 read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
121
122                 if( ctx->shutdown != 0 ) {                                                      // allow for graceful termination and unit testing
123                         refresh_vlevel( 1 );                                                            // close the verbose file if open
124                         return NULL;
125                 }
126                 sleep( 60 );
127         }
128 }
129
130 /*
131         Rtc_parse_msg parses a single message from the route manager. We allow multiple, newline terminated,
132         records in each message; it is required that the last record in the message be complete (we do not
133         reconstruct records split over multiple messages).  For each record, we call the record parser
134         to parse and add the information to the table being built.
135
136         This function was broken from the main rtc() function in order to be able to unit test it. Without
137         this as a standalone funciton, it was impossible to simulate a message arriving on the RTC's private
138         context.
139
140         To reduce malloc/free cycles, we allocate a static work buffer and expand it when needed; in other
141         words, this is not thread safe but it shouldn't need to be.
142 */
143 static void rtc_parse_msg( uta_ctx_t *ctx, uta_ctx_t* pvt_cx, rmr_mbuf_t* msg, int vlevel,  int* flags ) {
144         static  unsigned char* pbuf = NULL;
145         static  int pbuf_size = 0;
146
147         unsigned char* payload;
148         unsigned char* curr;
149         unsigned char* nextr;
150         int mlen;
151
152         payload = msg->payload;
153         mlen = msg->len;                                        // usable bytes in the payload
154
155         if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message type=%d len=%d\n", msg->mtype, (int) mlen );
156         switch( msg->mtype ) {
157                 case RMRRM_TABLE_DATA:
158                         if( (*flags & RTCFL_HAVE_UPDATE) == 0 ) {
159                                 *flags |= RTCFL_HAVE_UPDATE;
160                                 rmr_vlog( RMR_VL_INFO, "message flow from route manager starts\n" );
161                         }
162
163                         if( pbuf_size <= mlen ) {
164                                 if( pbuf ) {
165                                         free( pbuf );
166                                 }
167                                 if( mlen < 512 ) {
168                                         pbuf_size = 1024;
169                                 } else {
170                                         pbuf_size = mlen * 2;
171                                 }
172                                 pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
173                         }
174                         memcpy( pbuf, payload, mlen );
175                         pbuf[mlen] = 0;                                                                         // don't depend on sender making this a legit string
176                         if( vlevel > 1 ) {
177                                 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: rt message: (%s)\n", pbuf );
178                         }
179
180                         curr = pbuf;
181                         while( curr ) {                                                                         // loop over each record in the buffer
182                                 nextr = strchr( (char *) curr, '\n' );                  // allow multiple newline records, find end of current and mark
183
184                                 if( nextr ) {
185                                         *(nextr++) = 0;
186                                 }
187
188                                 if( vlevel > 1 ) {
189                                         rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc_parse_msg: processing (%s)\n", curr );
190                                 }
191                                 parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg );         // parse record and add to in progress table; ack using rts to msg
192
193                                 curr = nextr;
194                         }
195
196                         msg->len = 0;                           // force back into the listen loop
197                         break;
198
199                 default:
200                         rmr_vlog( RMR_VL_WARN, "rmr_rtc: invalid message type=%d len=%d\n", msg->mtype, (int) msg->len );
201                         break;
202         }
203 }
204
205 /*
206         Route Table Collector
207         A side thread which either attempts to connect and request a table
208         from the Route Manager, or opens a port and listens for Route Manager
209         to push table updates.
210
211         It may do other things along the way (latency measurements, alarms,
212         respond to RMR pings, etc.).
213
214         The behaviour with respect to listening for Route Manager updates vs
215         the initiation of the connection and sending a request depends on the
216         value of the ENV_RTG_ADDR (RMR_RTG_SVC)  environment variable. If
217         host:port, or IP:port, is given, then we assume that we make the connection
218         and send a request for the table (request mode).  If the variable is just
219         a port, then we assume Route Manager will connect and push updates (original
220         method).
221
222         If the variable is not defined, the default behaviour, in order to be
223         backwards compatable, depends on the presence of the ENV_CTL_PORT
224         (RMR_CTL_PORT) variable (new with the support for requesting a table).
225
226
227         ENV_CTL_PORT    ENV_RTG_ADDR    Behaviour
228         unset                   unset                   Open default CTL port (DEF_CTL_PORT) and
229                                                                         wait for Rt Mgr to push tables
230
231         set                             unset                   Use the default Rt Mgr wellknown addr
232                                                                         and port (DEF_RTG_WK_ADDR) to connect
233                                                                         and request a table. The control port
234                                                                         used is the value set by ENV_CTL_PORT.
235
236         unset                   set                             As described above. The default control
237                                                                         port (DEF_CTL_PORT) is used.
238
239         When we are running in request mode, then we will send the RMR message
240         RMRRM_REFRESH to this address (wormhole) as a request for the route manager
241         to send a new table. We will attempt to connect and send requests until
242         we have a table. Calls to rmr_ready() will report FALSE until a table is
243         loaded _unless_ a seed table was given.
244
245         Route table information is expected to arrive on RMR messages with type
246         RMRRM_TABLE_DATA.  There is NOT a specific message type for each possible
247         table record, so the payload is as it appears in the seed file or as
248         delivered in old versions.  It may take several RMRRM_TABLE_DATA messages
249         to completely supply a new table or table update. See the header for parse_rt_rec
250         in common for a description of possible message contents.
251
252         Buffers received from the route table generator can contain multiple newline terminated
253         records, but each buffer must be less than 4K in length, and the last record in a
254         buffer may NOT be split across buffers.
255
256         Other chores:
257         In addition to the primary task of getting, vetting, and installing a new route table, or
258         updates to the existing table, this thread will periodically cause the send counts for each
259         endpoint known to be written to standard error. The frequency is once every 180 seconds, and
260         more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
261 */
262 static void* rtc( void* vctx ) {
263         uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
264         uta_ctx_t*      pvt_cx;                                 // private context for session with rtg
265         rmr_mbuf_t*     msg = NULL;                             // message from rtg
266         char*   my_port;                                        // the port number that we will listen on (4561 has been the default for this)
267         char*   rtg_addr;                                       // host:port address of route table generator (route manager)
268         char*   daddr;                                          // duplicated rtg address string to parse/trash
269         size_t  buf_size;                                       // nng needs var pointer not just size?
270         int             i;
271         long    blabber = 0;                            // time of last blabber so we don't flood if rtg goes bad
272         int             cstate = -1;                            // connection state to rtg
273         int             state;                                          // processing state of some nng function
274         char*   tokens[128];
275         char    wbuf[128];
276         int             ntoks;
277         int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
278         char*   eptr;
279         int             epfd = -1;                                      // fd for epoll so we can multi-task
280         struct  epoll_event events[1];          // list of events to give to epoll; we only have one we care about
281         struct  epoll_event epe;                        // event definition for event to listen to
282         int             count_delay = 30;                       // number of seconds between writing count info; initially every 30s
283         int             bump_freq = 0;                          // time at which we will bump count frequency to every 5 minutes
284         int             flags = 0;
285         int             rt_req_freq = DEF_RTREQ_FREQ;   // request frequency (sec) when wanting a new table
286         int             nxt_rt_req = 0;                                 // time of next request
287
288
289         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
290                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
291                 return NULL;
292         }
293
294         vlevel = refresh_vlevel( 0 );
295
296         if( (eptr = getenv( ENV_RTREQ_FREA )) != NULL ) {
297                 rt_req_freq = atoi( eptr );
298                 if( rt_req_freq < 1 || rt_req_freq > 300 ) {
299                         rt_req_freq = DEF_RTREQ_FREQ;
300                         rmr_vlog( RMR_VL_WARN, "rmr_rtc: RT request frequency (%d) out of range (1-300), using default", DEF_RTREQ_FREQ );
301                 }
302         }
303         rmr_vlog( RMR_VL_INFO, "rmr_rtc: RT request frequency set to: %d seconds", rt_req_freq );
304
305         ctx->flags |= CFL_NO_RTACK;                             // don't ack when reading from a file
306         read_static_rt( ctx, vlevel );                  // seed the route table if one provided
307         ctx->flags &= ~CFL_NO_RTACK;
308
309
310         my_port = getenv( ENV_CTL_PORT );                               // default port to listen on (likely 4561)
311         if( my_port == NULL || ! *my_port ) {                   // if undefined, then go with default
312                 my_port = DEF_CTL_PORT;
313                 daddr = DEF_CTL_PORT;                                           // backwards compat; if ctl port not hard defined, default is to listen
314         } else {
315                 daddr = DEF_RTG_WK_ADDR;                                        // if ctl port is defined, then default changes to connecting to well known RM addr
316         }
317
318         if( (rtg_addr = getenv( ENV_RTG_ADDR )) == NULL || ! *rtg_addr ) {              // undefined, use default set above
319                 rtg_addr = daddr;
320         }
321
322         daddr = strdup( rtg_addr );                                                                     // dup to destroy during parse
323
324         ntoks = uta_tokenise( daddr, tokens, 120, ':' );                        // should be host:ip of rt mgr (could be port only which we assume is old listen port)
325         switch( ntoks ) {
326                 case 0:                                                                 // should not happen, but prevent accidents and allow default to ignore additional tokens
327                         break;
328
329                 case 1:
330                         my_port = tokens[0];                    // just port -- assume backlevel environment where we just listen
331                         flags |= RTCFL_HAVE_UPDATE;             // prevent sending update reqests
332                         break;
333
334                 default:
335                         if( strcmp( tokens[0], "tcp" ) == 0 ) {                 // old school nng tcp:xxxx so we listen on xxx
336                                 flags |= RTCFL_HAVE_UPDATE;                                     // and signal not to try to request an update
337                                 my_port = tokens[1];
338                         } else {
339                                 // rtg_addr points at rt mgr address and my port set from env or default stands as is
340                         }
341                         break;
342         }
343
344         if( (pvt_cx = init( my_port, MAX_RTC_BUF, FL_NOTHREAD )) == NULL ) {                            // open a private context (no RT listener!)
345                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
346
347                 while( TRUE ) {                                                                                         // no listen port, just dump counts now and then
348                         sleep( count_delay );
349                         rt_epcounts( ctx->rtable, ctx->my_name );
350                 }
351
352                 return NULL;
353         }
354
355         ctx->rtg_whid = -1;
356
357         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", my_port );
358
359         bump_freq = time( NULL ) + 300;                         // after 5 minutes we decrease the count frequency
360         blabber = 0;
361         while( 1 ) {                                                                                                            // until the cows return, pigs fly, or somesuch event likely not to happen
362                 while( msg == NULL || msg->len <= 0 ) {                                                 // until we actually have something from the other side
363                         if( (flags & RTCFL_HAVE_UPDATE) == 0 && time( NULL ) >= nxt_rt_req ) {                  // no route table updated from rt mgr; request one
364                                 if( ctx->rtg_whid < 0 ) {
365                                         ctx->rtg_whid = rmr_wh_open( pvt_cx, rtg_addr );
366                                 }
367                                 send_update_req( pvt_cx, ctx );
368                                 nxt_rt_req = time( NULL ) + rt_req_freq;
369                         }
370
371                         msg = rmr_torcv_msg( pvt_cx, msg, 1000 );
372
373                         if( time( NULL ) > blabber  ) {
374                                 vlevel = refresh_vlevel( 0 );
375                                 blabber = time( NULL ) + count_delay;                           // set next time to blabber, then do so
376                                 if( blabber > bump_freq ) {
377                                         count_delay = 300;
378                                 }
379                                 if( vlevel >= 0 ) {                                                                             // allow it to be forced off with -n in verbose file
380                                         rt_epcounts( ctx->rtable, ctx->my_name );
381                                 }
382                         }
383
384                         if( ctx->shutdown != 0 ) {
385                                 break;                                                  // mostly for unit test, but allows a forced stop
386                         }
387                 }
388
389                 vlevel = refresh_vlevel( 0 );                   // ensure it's fresh when we get a message
390
391                 if( msg != NULL && msg->len > 0 ) {
392                         rtc_parse_msg( ctx, pvt_cx, msg, vlevel, &flags );
393                 }
394
395                 if( ctx->shutdown ) {           // mostly for testing, but allows user app to close us down if rmr_*() function sets this
396                         return NULL;
397                 }
398
399         }
400
401         return NULL;    // unreachable, but some compilers don't see that and complain.
402 }
403
404 #ifndef SI95_BUILD
405 // this is nng specific inas much as we allow raw (non-RMR) messages
406
407 /*
408         NOTE:   This is the original rtc code when we supported "raw" nano/nng messages
409                         from the route manger.  It is deprecated in favour of managing all RM-RMR
410                         communications via an RMR session.
411
412                         The rtc() function above is the new and preferred function regardless
413                         of transport.
414
415         -----------------------------------------------------------------------------------
416         Route Table Collector
417         A side thread which opens a socket and subscribes to a routing table generator.
418         It may do other things along the way (latency measurements?).
419
420         The pointer is a pointer to the context.
421
422         Listens for records from the route table generation publisher, expecting
423         one of the following, newline terminated, ASCII records:
424                 rte|msg-type||]name:port,name:port,...;name:port,...                    // route table entry with one or more groups of endpoints
425                 new|start                                                               // start of new table
426                 new|end                                                                 // end of new table; complete
427
428                 Name must be a host name which can be looked up via gethostbyname() (DNS).
429
430                 Multiple endpoints (name:port) may be given separated by a comma; an endpoint is selected using round robin
431                         for each message of the type that is sent.
432
433                 Multiple endpoint groups can be given as a comma separated list of endpoints, separated by semicolons:
434                                 group1n1:port,group1n2:port,group1n3:port;group2n1:port,group2n2:port
435
436                 If multiple groups are given, when send() is called for the cooresponding message type,
437                 the message will be sent to one endpoint in each group.
438
439                 msg-type is the numeric message type (e.g. 102). If it is given as n,name then it is assumed
440                 that the entry applies only to the instance running with the hostname 'name.'
441
442         Buffers received from the route table generator can contain multiple newline terminated
443         records, but each buffer must be less than 4K in length, and the last record in a
444         buffer may NOT be split across buffers.
445
446         Other chores:
447         In addition to the primary task of getting, vetting, and installing a new route table, or
448         updates to the existing table, this thread will periodically cause the send counts for each
449         endpoint known to be written to standard error. The frequency is once every 180 seconds, and
450         more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
451 */
452 static void* raw_rtc( void* vctx ) {
453         uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
454         uta_ctx_t*      pvt_cx;                                 // private context for session with rtg
455         rmr_mbuf_t*     msg = NULL;                             // message from rtg
456         char*   payload;                                        // payload in the message
457         size_t  mlen;
458         char*   port;                                           // a port number we listen/connect to
459         char*   fport;                                          // pointer to the real buffer to free
460         size_t  buf_size;                                       // nng needs var pointer not just size?
461         char*   nextr;                                          // pointer at next record in the message
462         char*   curr;                                           // current record
463         int             i;
464         long    blabber = 0;                            // time of last blabber so we don't flood if rtg goes bad
465         int             cstate = -1;                            // connection state to rtg
466         int             state;                                          // processing state of some nng function
467         char*   tokens[128];
468         char    wbuf[128];
469         char*   pbuf = NULL;
470         int             pbuf_size = 0;                          // number allocated in pbuf
471         int             ntoks;
472         int             raw_interface = 1;                      // rtg is using raw NNG/Nano not RMr to send updates
473         int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
474         char*   eptr;
475         int             epfd = -1;                                      // fd for epoll so we can multi-task
476         struct  epoll_event events[1];          // list of events to give to epoll; we only have one we care about
477         struct  epoll_event epe;                        // event definition for event to listen to
478         int             rcv_fd = -1;                            // pollable file des from NNG to use for timeout
479         int             count_delay = 30;                       // number of seconds between writing count info; initially every 30s
480         int             bump_freq = 0;                          // time at which we will bump count frequency to every 5 minutes
481
482
483         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
484                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
485                 return NULL;
486         }
487
488         vlevel = refresh_vlevel( 0 );
489         read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
490
491         if( (port = getenv( ENV_RTG_PORT )) == NULL || ! *port ) {              // port we need to open to listen for RTG connections
492                 port = strdup( DEF_RTG_PORT );
493         } else {
494                 port = strdup( port );
495         }
496
497         fport = port;           // must hold to free
498
499         ntoks = uta_tokenise( port, tokens, 120, ':' );                 // assume tcp:port, but it could be port or old style host:port
500         switch( ntoks ) {
501                 case 1:
502                                 port = tokens[0];                       // just the port
503                                 break;
504
505                 case 2:
506                                 port = tokens[1];                       // tcp:port or :port
507                                 break;
508
509                 default:
510                                 port = DEF_RTG_PORT;            // this shouldn't happen, but parnioia is good
511                                 break;
512         }
513
514         if( (pvt_cx = init( port, MAX_RTG_MSG_SZ, FL_NOTHREAD )) == NULL ) {                            // open a private context
515                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
516
517                 while( TRUE ) {                                                                                         // no listen port, just dump counts now and then
518                         sleep( count_delay );
519                         rt_epcounts( ctx->rtable, ctx->my_name );
520                 }
521
522                 free( fport );                                  // parinoid free and return
523                 return NULL;
524         }
525
526         if( (rcv_fd = rmr_get_rcvfd( pvt_cx )) >= 0 ) {            // get the epoll fd for the rtg socket
527                 if( rcv_fd < 0 ) {
528                         rmr_vlog( RMR_VL_WARN, "cannot get epoll fd for rtg session; stats will generate only after update from rt manager\n" );
529                 } else {
530                         if( (epfd = epoll_create1( 0 )) < 0 ) {
531                                 rmr_vlog( RMR_VL_WARN, "stats will generate only after rt manager update; unable to create epoll fd for rtg session: %s\n", strerror( errno ) );
532                                 rcv_fd = -1;
533                         } else {
534                                 epe.events = EPOLLIN;
535                                 epe.data.fd = rcv_fd;
536
537                                 if( epoll_ctl( epfd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
538                                         rmr_vlog( RMR_VL_WARN, "stats will generate only after rt manager update; unable to init epoll_ctl: %s\n", strerror( errno ) );
539                                         rcv_fd = -1;
540                                 }
541                         }
542                 }
543         }
544
545         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", port );
546         free( fport );
547
548         // future:  if we need to register with the rtg, then build a message and send it through a wormhole here
549
550         bump_freq = time( NULL ) + 300;                         // after 5 minutes we decrease the count frequency
551         blabber = 0;
552         while( 1 ) {                    // until the cows return, pigs fly, or somesuch event likely not to happen
553                 while( msg == NULL || msg->len <= 0 ) {                                                 // until we actually have something from the other side
554                         if( rcv_fd < 0 || epoll_wait( epfd, events, 1, 1000 ) > 0 )  {  // skip epoll if init failed, else block for max 1 sec
555                                 if( raw_interface ) {
556                                         msg = (rmr_mbuf_t *) rcv_payload( pvt_cx, msg );                // receive from non-RMr sender
557                                 } else {
558                                         msg = rmr_rcv_msg( pvt_cx, msg );               // receive from an RMr sender
559                                 }
560                         } else {                                                                                                        // no msg, do extra tasks
561                                 if( msg != NULL ) {                                                                             // if we were working with a message; ensure no len
562                                         msg->len = 0;
563                                         msg->state = RMR_ERR_TIMEOUT;
564                                 }
565                         }
566
567                         if( time( NULL ) > blabber  ) {
568                                 vlevel = refresh_vlevel( 0 );
569                                 if( vlevel >= 0 ) {                                                                             // allow it to be forced off with -n in verbose file
570                                         blabber = time( NULL ) + count_delay;                           // set next time to blabber, then do so
571                                         if( blabber > bump_freq ) {
572                                                 count_delay = 300;
573                                         }
574                                         rt_epcounts( ctx->rtable, ctx->my_name );
575                                 }
576                         }
577                 }
578
579                 vlevel = refresh_vlevel( 0 );                   // ensure it's fresh when we get a message
580
581                 if( msg != NULL && msg->len > 0 ) {
582                         payload = msg->payload;
583                         mlen = msg->len;                                        // usable bytes in the payload
584                         if( vlevel > 1 ) {
585                                 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes (%s)\n", (int) mlen, msg->payload );
586                         } else {
587                                 if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes\n", (int) mlen );
588                         }
589
590                         if( pbuf_size <= mlen ) {
591                                 if( pbuf ) {
592                                         free( pbuf );
593                                 }
594                                 if( mlen < 512 ) {
595                                         pbuf_size = 512;
596                                 } else {
597                                         pbuf_size = mlen * 2;
598                                 }
599                                 pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
600                         }
601                         memcpy( pbuf, payload, mlen );
602                         pbuf[mlen] = 0;                                                                         // don't depend on sender making this a legit string
603
604                         curr = pbuf;
605                         while( curr ) {                                                         // loop over each record in the buffer
606                                 nextr = strchr( curr, '\n' );                   // allow multiple newline records, find end of current and mark
607
608                                 if( nextr ) {
609                                         *(nextr++) = 0;
610                                 }
611
612                                 if( vlevel > 1 ) {
613                                         rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
614                                 }
615                                 if( raw_interface ) {
616                                         parse_rt_rec( ctx, NULL, curr, vlevel, NULL );          // nil pvt to parser as we can't ack messages
617                                 } else {
618                                         parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg );         // parse record and add to in progress table
619                                 }
620
621                                 curr = nextr;
622                         }
623
624                         if( ctx->shutdown ) {           // mostly for testing, but allows user app to close us down if rmr_*() function sets this
625                                 break;
626                         }
627
628                         msg->len = 0;                           // force back into the listen loop
629                 }
630         }
631
632         return NULL;    // unreachable, but some compilers don't see that and complain.
633 }
634 #endif
635
636
637 #endif