More changes for scan corrections and unit test coverage
[ric-plt/lib/rmr.git] / src / rmr / common / src / rtc_static.c
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rt_collector.c
23         Abstract:       The route table collector is started as a separate pthread and
24                                 is responsible for listening for route table updates from a
25                                 route manager or route table generator process.
26
27         Author:         E. Scott Daniels
28         Date:           29 November 2018 (extracted to common 13 March 2019)
29 */
30
31 #ifndef _rt_collector_c
32 #define _rt_collector_c
33
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <netdb.h>
37 #include <errno.h>
38 #include <string.h>
39 #include <errno.h>
40 #include <fcntl.h>
41 #include <sys/types.h>
42 #include <sys/stat.h>
43 #include <unistd.h>
44
45 #include <RIC_message_types.h>          // needed for RMR/Rt Mgr msg types
46
47 // ---- local constants ------------------
48                                                                                 // flags
49 #define RTCFL_HAVE_UPDATE       0x01            // an update from RM was received
50
51 #define MAX_RTC_BUF     5 * 1024                        // max buffer size we'll expect is 4k, add some fudge room
52
53 // ------------------------------------------------------------------------------------------------
54
55 /*
56         Loop forever (assuming we're running in a pthread reading the static table
57         every minute or so.
58 */
59 static void* rtc_file( void* vctx ) {
60         uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
61         char*   eptr;
62         int             vfd = -1;                                       // verbose file des if we have one
63         int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
64         char    wbuf[256];
65
66
67         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
68                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
69                 return NULL;
70         }
71
72         if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
73                 vfd = open( eptr, O_RDONLY );
74         }
75
76         ctx->flags |= CFL_NO_RTACK;                             // no attempt to ack when reading from a file
77         while( 1 ) {
78                 if( vfd >= 0 ) {
79                         memset( wbuf, 0, sizeof( char ) * 11 );
80                         if( lseek( vfd, 0, SEEK_SET ) == 0 && read( vfd, wbuf, 10 ) > 0 ) {
81                                 vlevel = atoi( wbuf );
82                         }
83                 }
84
85                 read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
86
87                 if( ctx->shutdown != 0 ) {                                                      // allow for graceful termination and unit testing
88                         if( vfd >= 0 ) {
89                                 close( vfd );
90                         }
91                         return NULL;
92                 }
93                 sleep( 60 );
94         }
95 }
96
97 static int refresh_vlevel( int vfd ) {
98         int vlevel = 0;
99         char    rbuf[128];
100
101         if( vfd >= 0 ) {                                        // if file is open, read current value
102                 rbuf[0] = 0;
103                 memset( rbuf, 0, sizeof( char ) * 11 );
104                 if( lseek( vfd, 0, SEEK_SET ) == 0 && read( vfd, rbuf, 10 ) > 0 ) {
105                         vlevel = atoi( rbuf );
106                 }
107         }
108
109         return vlevel;
110 }
111
112 /*
113         Rtc_parse_msg parses a single message from the route manager. We allow multiple, newline terminated,
114         records in each message; it is required that the last record in the message be complete (we do not
115         reconstruct records split over multiple messages).  For each record, we call the record parser
116         to parse and add the information to the table being built.
117
118         This function was broken from the main rtc() function in order to be able to unit test it. Without
119         this as a standalone funciton, it was impossible to simulate a message arriving on the RTC's private
120         context.
121
122         To reduce malloc/free cycles, we allocate a static work buffer and expand it when needed; in other
123         words, this is not thread safe but it shouldn't need to be.
124 */
125 static void rtc_parse_msg( uta_ctx_t *ctx, uta_ctx_t* pvt_cx, rmr_mbuf_t* msg, int vlevel,  int* flags ) {
126         static  unsigned char* pbuf = NULL;
127         static  int pbuf_size = 0;
128
129         unsigned char* payload;
130         unsigned char* curr;
131         unsigned char* nextr;
132         int mlen;
133
134         payload = msg->payload;
135         mlen = msg->len;                                        // usable bytes in the payload
136
137         if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message type=%d len=%d\n", msg->mtype, (int) mlen );
138         switch( msg->mtype ) {
139                 case RMRRM_TABLE_DATA:
140                         if( (*flags & RTCFL_HAVE_UPDATE) == 0 ) {
141                                 *flags |= RTCFL_HAVE_UPDATE;
142                                 rmr_vlog( RMR_VL_INFO, "message flow from route manager starts\n" );
143                         }
144
145                         if( pbuf_size <= mlen ) {
146                                 if( pbuf ) {
147                                         free( pbuf );
148                                 }
149                                 if( mlen < 512 ) {
150                                         pbuf_size = 1024;
151                                 } else {
152                                         pbuf_size = mlen * 2;
153                                 }
154                                 pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
155                         }
156                         memcpy( pbuf, payload, mlen );
157                         pbuf[mlen] = 0;                                                                         // don't depend on sender making this a legit string
158                         if( vlevel > 1 ) {
159                                 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: rt message: (%s)\n", pbuf );
160                         }
161
162                         curr = pbuf;
163                         while( curr ) {                                                                         // loop over each record in the buffer
164                                 nextr = strchr( (char *) curr, '\n' );                  // allow multiple newline records, find end of current and mark
165
166                                 if( nextr ) {
167                                         *(nextr++) = 0;
168                                 }
169
170                                 if( vlevel > 1 ) {
171                                         rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
172                                 }
173                                 parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg );         // parse record and add to in progress table; ack using rts to msg
174
175                                 curr = nextr;
176                         }
177
178                         msg->len = 0;                           // force back into the listen loop
179                         break;
180
181                 default:
182                         rmr_vlog( RMR_VL_WARN, "rmr_rtc: invalid message type=%d len=%d\n", msg->mtype, (int) msg->len );
183                         break;
184         }
185 }
186
187 /*
188         Route Table Collector
189         A side thread which either attempts to connect and request a table
190         from the Route Manager, or opens a port and listens for Route Manager
191         to push table updates.
192
193         It may do other things along the way (latency measurements, alarms,
194         respond to RMR pings, etc.).
195
196         The behaviour with respect to listening for Route Manager updates vs
197         the initiation of the connection and sending a request depends on the
198         value of the ENV_RTG_ADDR (RMR_RTG_SVC)  environment variable. If
199         host:port, or IP:port, is given, then we assume that we make the connection
200         and send a request for the table (request mode).  If the variable is just
201         a port, then we assume Route Manager will connect and push updates (original
202         method).
203
204         If the variable is not defined, the default behaviour, in order to be
205         backwards compatable, depends on the presence of the ENV_CTL_PORT
206         (RMR_CTL_PORT) variable (new with the support for requesting a table).
207
208
209         ENV_CTL_PORT    ENV_RTG_ADDR    Behaviour
210         unset                   unset                   Open default CTL port (DEF_CTL_PORT) and
211                                                                         wait for Rt Mgr to push tables
212
213         set                             unset                   Use the default Rt Mgr wellknown addr
214                                                                         and port (DEF_RTG_WK_ADDR) to connect
215                                                                         and request a table. The control port
216                                                                         used is the value set by ENV_CTL_PORT.
217
218         unset                   set                             As described above. The default control
219                                                                         port (DEF_CTL_PORT) is used.
220
221         When we are running in request mode, then we will send the RMR message
222         RMRRM_REFRESH to this address (wormhole) as a request for the route manager
223         to send a new table. We will attempt to connect and send requests until
224         we have a table. Calls to rmr_ready() will report FALSE until a table is
225         loaded _unless_ a seed table was given.
226
227         Route table information is expected to arrive on RMR messages with type
228         RMRRM_TABLE_DATA.  There is NOT a specific message type for each possible
229         table record, so the payload is as it appears in the seed file or as
230         delivered in old versions.  It may take several RMRRM_TABLE_DATA messages
231         to completely supply a new table or table update. See the header for parse_rt_rec
232         in common for a description of possible message contents.
233
234         Buffers received from the route table generator can contain multiple newline terminated
235         records, but each buffer must be less than 4K in length, and the last record in a
236         buffer may NOT be split across buffers.
237
238         Other chores:
239         In addition to the primary task of getting, vetting, and installing a new route table, or
240         updates to the existing table, this thread will periodically cause the send counts for each
241         endpoint known to be written to standard error. The frequency is once every 180 seconds, and
242         more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
243 */
244 static void* rtc( void* vctx ) {
245         uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
246         uta_ctx_t*      pvt_cx;                                 // private context for session with rtg
247         rmr_mbuf_t*     msg = NULL;                             // message from rtg
248         char*   my_port;                                        // the port number that we will listen on (4561 has been the default for this)
249         char*   rtg_addr;                                       // host:port address of route table generator (route manager)
250         char*   daddr;                                          // duplicated rtg address string to parse/trash
251         size_t  buf_size;                                       // nng needs var pointer not just size?
252         int             i;
253         long    blabber = 0;                            // time of last blabber so we don't flood if rtg goes bad
254         int             cstate = -1;                            // connection state to rtg
255         int             state;                                          // processing state of some nng function
256         char*   tokens[128];
257         char    wbuf[128];
258         int             ntoks;
259         int             vfd = -1;                                       // verbose file des if we have one
260         int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
261         char*   eptr;
262         int             epfd = -1;                                      // fd for epoll so we can multi-task
263         struct  epoll_event events[1];          // list of events to give to epoll; we only have one we care about
264         struct  epoll_event epe;                        // event definition for event to listen to
265         int             count_delay = 30;                       // number of seconds between writing count info; initially every 30s
266         int             bump_freq = 0;                          // time at which we will bump count frequency to every 5 minutes
267         int             flags = 0;
268         int             rt_req_freq = DEF_RTREQ_FREQ;   // request frequency (sec) when wanting a new table
269         int             nxt_rt_req = 0;                                 // time of next request
270
271
272         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
273                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
274                 return NULL;
275         }
276
277         if( (ctx->ephash = rmr_sym_alloc( RT_SIZE )) == NULL ) {                // master hash table for endpoints (each rt will reference this)
278                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: unable to allocate an endpoint hash table\n" );
279                 return NULL;
280         }
281
282         if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
283                 vfd = open( eptr, O_RDONLY );
284                 vlevel = refresh_vlevel( vfd );
285         }
286
287         if( (eptr = getenv( ENV_RTREQ_FREA )) != NULL ) {
288                 rt_req_freq = atoi( eptr );
289                 if( rt_req_freq < 1 || rt_req_freq > 300 ) {
290                         rt_req_freq = DEF_RTREQ_FREQ;
291                         rmr_vlog( RMR_VL_WARN, "rmr_rtc: RT request frequency (%d) out of range (1-300), using default", DEF_RTREQ_FREQ );
292                 }
293         }
294         rmr_vlog( RMR_VL_INFO, "rmr_rtc: RT request frequency set to: %d seconds", rt_req_freq );
295
296         ctx->flags |= CFL_NO_RTACK;                             // don't ack when reading from a file
297         read_static_rt( ctx, vlevel );                  // seed the route table if one provided
298         ctx->flags &= ~CFL_NO_RTACK;
299
300
301         my_port = getenv( ENV_CTL_PORT );                               // default port to listen on (likely 4561)
302         if( my_port == NULL || ! *my_port ) {                   // if undefined, then go with default
303                 my_port = DEF_CTL_PORT;
304                 daddr = DEF_CTL_PORT;                                           // backwards compat; if ctl port not hard defined, default is to listen
305         } else {
306                 daddr = DEF_RTG_WK_ADDR;                                        // if ctl port is defined, then default changes to connecting to well known RM addr
307         }
308
309         if( (rtg_addr = getenv( ENV_RTG_ADDR )) == NULL || ! *rtg_addr ) {              // undefined, use default set above
310                 rtg_addr = daddr;
311         }
312
313         daddr = strdup( rtg_addr );                                                                     // dup to destroy during parse
314
315         ntoks = uta_tokenise( daddr, tokens, 120, ':' );                        // should be host:ip of rt mgr (could be port only which we assume is old listen port)
316         switch( ntoks ) {
317                 case 0:                                                                 // should not happen, but prevent accidents and allow default to ignore additional tokens
318                         break;
319
320                 case 1:
321                         my_port = tokens[0];                    // just port -- assume backlevel environment where we just listen
322                         flags |= RTCFL_HAVE_UPDATE;             // prevent sending update reqests
323                         break;
324
325                 default:
326                         if( strcmp( tokens[0], "tcp" ) == 0 ) {                 // old school nng tcp:xxxx so we listen on xxx
327                                 flags |= RTCFL_HAVE_UPDATE;                                     // and signal not to try to request an update
328                                 my_port = tokens[1];
329                         } else {
330                                 // rtg_addr points at rt mgr address and my port set from env or default stands as is
331                         }
332                         break;
333         }
334
335         if( (pvt_cx = init( my_port, MAX_RTC_BUF, FL_NOTHREAD )) == NULL ) {                            // open a private context (no RT listener!)
336                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
337
338                 while( TRUE ) {                                                                                         // no listen port, just dump counts now and then
339                         sleep( count_delay );
340                         rt_epcounts( ctx->rtable, ctx->my_name );
341                 }
342
343                 return NULL;
344         }
345
346         ctx->rtg_whid = -1;
347
348         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", my_port );
349
350         bump_freq = time( NULL ) + 300;                         // after 5 minutes we decrease the count frequency
351         blabber = 0;
352         while( 1 ) {                                                                                                            // until the cows return, pigs fly, or somesuch event likely not to happen
353                 while( msg == NULL || msg->len <= 0 ) {                                                 // until we actually have something from the other side
354                         if( (flags & RTCFL_HAVE_UPDATE) == 0 && time( NULL ) >= nxt_rt_req ) {                  // no route table updated from rt mgr; request one
355                                 if( ctx->rtg_whid < 0 ) {
356                                         ctx->rtg_whid = rmr_wh_open( pvt_cx, rtg_addr );
357                                 }
358                                 send_update_req( pvt_cx, ctx );
359                                 nxt_rt_req = time( NULL ) + rt_req_freq;
360                         }
361
362                         msg = rmr_torcv_msg( pvt_cx, msg, 1000 );
363
364                         if( time( NULL ) > blabber  ) {
365                                 vlevel = refresh_vlevel( vfd );
366                                 if( vlevel >= 0 ) {                                                                             // allow it to be forced off with -n in verbose file
367                                         blabber = time( NULL ) + count_delay;                           // set next time to blabber, then do so
368                                         if( blabber > bump_freq ) {
369                                                 count_delay = 300;
370                                         }
371                                         rt_epcounts( ctx->rtable, ctx->my_name );
372                                 }
373                         }
374
375                         if( ctx->shutdown != 0 ) {
376                                 break;                                                  // mostly for unit test, but allows a forced stop
377                         }
378                 }
379
380                 vlevel = refresh_vlevel( vfd );                 // ensure it's fresh when we get a message
381
382                 if( msg != NULL && msg->len > 0 ) {
383                         rtc_parse_msg( ctx, pvt_cx, msg, vlevel, &flags );
384                 }
385
386                 if( ctx->shutdown ) {           // mostly for testing, but allows user app to close us down if rmr_*() function sets this
387                         return NULL;
388                 }
389
390         }
391
392         return NULL;    // unreachable, but some compilers don't see that and complain.
393 }
394
395 #ifndef SI95_BUILD
396 // this is nng specific inas much as we allow raw (non-RMR) messages
397
398 /*
399         NOTE:   This is the original rtc code when we supported "raw" nano/nng messages
400                         from the route manger.  It is deprecated in favour of managing all RM-RMR
401                         communications via an RMR session.
402
403                         The rtc() function above is the new and preferred function regardless
404                         of transport.
405
406         -----------------------------------------------------------------------------------
407         Route Table Collector
408         A side thread which opens a socket and subscribes to a routing table generator.
409         It may do other things along the way (latency measurements?).
410
411         The pointer is a pointer to the context.
412
413         Listens for records from the route table generation publisher, expecting
414         one of the following, newline terminated, ASCII records:
415                 rte|msg-type||]name:port,name:port,...;name:port,...                    // route table entry with one or more groups of endpoints
416                 new|start                                                               // start of new table
417                 new|end                                                                 // end of new table; complete
418
419                 Name must be a host name which can be looked up via gethostbyname() (DNS).
420
421                 Multiple endpoints (name:port) may be given separated by a comma; an endpoint is selected using round robin
422                         for each message of the type that is sent.
423
424                 Multiple endpoint groups can be given as a comma separated list of endpoints, separated by semicolons:
425                                 group1n1:port,group1n2:port,group1n3:port;group2n1:port,group2n2:port
426
427                 If multiple groups are given, when send() is called for the cooresponding message type,
428                 the message will be sent to one endpoint in each group.
429
430                 msg-type is the numeric message type (e.g. 102). If it is given as n,name then it is assumed
431                 that the entry applies only to the instance running with the hostname 'name.'
432
433         Buffers received from the route table generator can contain multiple newline terminated
434         records, but each buffer must be less than 4K in length, and the last record in a
435         buffer may NOT be split across buffers.
436
437         Other chores:
438         In addition to the primary task of getting, vetting, and installing a new route table, or
439         updates to the existing table, this thread will periodically cause the send counts for each
440         endpoint known to be written to standard error. The frequency is once every 180 seconds, and
441         more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
442 */
443 static void* raw_rtc( void* vctx ) {
444         uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
445         uta_ctx_t*      pvt_cx;                                 // private context for session with rtg
446         rmr_mbuf_t*     msg = NULL;                             // message from rtg
447         char*   payload;                                        // payload in the message
448         size_t  mlen;
449         char*   port;                                           // a port number we listen/connect to
450         char*   fport;                                          // pointer to the real buffer to free
451         size_t  buf_size;                                       // nng needs var pointer not just size?
452         char*   nextr;                                          // pointer at next record in the message
453         char*   curr;                                           // current record
454         int             i;
455         long    blabber = 0;                            // time of last blabber so we don't flood if rtg goes bad
456         int             cstate = -1;                            // connection state to rtg
457         int             state;                                          // processing state of some nng function
458         char*   tokens[128];
459         char    wbuf[128];
460         char*   pbuf = NULL;
461         int             pbuf_size = 0;                          // number allocated in pbuf
462         int             ntoks;
463         int             raw_interface = 1;                      // rtg is using raw NNG/Nano not RMr to send updates
464         int             vfd = -1;                                       // verbose file des if we have one
465         int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
466         char*   eptr;
467         int             epfd = -1;                                      // fd for epoll so we can multi-task
468         struct  epoll_event events[1];          // list of events to give to epoll; we only have one we care about
469         struct  epoll_event epe;                        // event definition for event to listen to
470         int             rcv_fd = -1;                            // pollable file des from NNG to use for timeout
471         int             count_delay = 30;                       // number of seconds between writing count info; initially every 30s
472         int             bump_freq = 0;                          // time at which we will bump count frequency to every 5 minutes
473
474
475         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
476                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
477                 return NULL;
478         }
479
480         if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
481                 vfd = open( eptr, O_RDONLY );
482                 vlevel = refresh_vlevel( vfd );
483         }
484
485         read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
486
487         if( (port = getenv( ENV_RTG_PORT )) == NULL || ! *port ) {              // port we need to open to listen for RTG connections
488                 port = strdup( DEF_RTG_PORT );
489         } else {
490                 port = strdup( port );
491         }
492
493 /*
494         this test is now done in init and this function is started _only_ if the value was 1
495         if( (curr = getenv( ENV_RTG_RAW )) != NULL ) {
496                 raw_interface = atoi( curr ) > 0;                               // if > 0 we assume that rtg messages are NOT coming from an RMr based process
497         }
498 */
499
500         fport = port;           // must hold to free
501
502         ntoks = uta_tokenise( port, tokens, 120, ':' );                 // assume tcp:port, but it could be port or old style host:port
503         switch( ntoks ) {
504                 case 1:
505                                 port = tokens[0];                       // just the port
506                                 break;
507
508                 case 2:
509                                 port = tokens[1];                       // tcp:port or :port
510                                 break;
511
512                 default:
513                                 port = DEF_RTG_PORT;            // this shouldn't happen, but parnioia is good
514                                 break;
515         }
516
517         if( (pvt_cx = init( port, MAX_RTG_MSG_SZ, FL_NOTHREAD )) == NULL ) {                            // open a private context
518                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
519
520                 while( TRUE ) {                                                                                         // no listen port, just dump counts now and then
521                         sleep( count_delay );
522                         rt_epcounts( ctx->rtable, ctx->my_name );
523                 }
524
525                 free( fport );                                  // parinoid free and return
526                 return NULL;
527         }
528
529         if( (rcv_fd = rmr_get_rcvfd( pvt_cx )) >= 0 ) {            // get the epoll fd for the rtg socket
530                 if( rcv_fd < 0 ) {
531                         rmr_vlog( RMR_VL_WARN, "cannot get epoll fd for rtg session; stats will generate only after update from rt manager\n" );
532                 } else {
533                         if( (epfd = epoll_create1( 0 )) < 0 ) {
534                                 rmr_vlog( RMR_VL_WARN, "stats will generate only after rt manager update; unable to create epoll fd for rtg session: %s\n", strerror( errno ) );
535                                 rcv_fd = -1;
536                         } else {
537                                 epe.events = EPOLLIN;
538                                 epe.data.fd = rcv_fd;
539
540                                 if( epoll_ctl( epfd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
541                                         rmr_vlog( RMR_VL_WARN, "stats will generate only after rt manager update; unable to init epoll_ctl: %s\n", strerror( errno ) );
542                                         rcv_fd = -1;
543                                 }
544                         }
545                 }
546         }
547
548         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", port );
549         free( fport );
550
551         // future:  if we need to register with the rtg, then build a message and send it through a wormhole here
552
553         bump_freq = time( NULL ) + 300;                         // after 5 minutes we decrease the count frequency
554         blabber = 0;
555         while( 1 ) {                    // until the cows return, pigs fly, or somesuch event likely not to happen
556                 while( msg == NULL || msg->len <= 0 ) {                                                 // until we actually have something from the other side
557                         if( rcv_fd < 0 || epoll_wait( epfd, events, 1, 1000 ) > 0 )  {  // skip epoll if init failed, else block for max 1 sec
558                                 if( raw_interface ) {
559                                         msg = (rmr_mbuf_t *) rcv_payload( pvt_cx, msg );                // receive from non-RMr sender
560                                 } else {
561                                         msg = rmr_rcv_msg( pvt_cx, msg );               // receive from an RMr sender
562                                 }
563                         } else {                                                                                                        // no msg, do extra tasks
564                                 if( msg != NULL ) {                                                                             // if we were working with a message; ensure no len
565                                         msg->len = 0;
566                                         msg->state = RMR_ERR_TIMEOUT;
567                                 }
568                         }
569
570                         if( time( NULL ) > blabber  ) {
571                                 vlevel = refresh_vlevel( vfd );
572                                 if( vlevel >= 0 ) {                                                                             // allow it to be forced off with -n in verbose file
573                                         blabber = time( NULL ) + count_delay;                           // set next time to blabber, then do so
574                                         if( blabber > bump_freq ) {
575                                                 count_delay = 300;
576                                         }
577                                         rt_epcounts( ctx->rtable, ctx->my_name );
578                                 }
579                         }
580                 }
581
582                 vlevel = refresh_vlevel( vfd );                 // ensure it's fresh when we get a message
583
584                 if( msg != NULL && msg->len > 0 ) {
585                         payload = msg->payload;
586                         mlen = msg->len;                                        // usable bytes in the payload
587                         if( vlevel > 1 ) {
588                                 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes (%s)\n", (int) mlen, msg->payload );
589                         } else {
590                                 if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes\n", (int) mlen );
591                         }
592
593                         if( pbuf_size <= mlen ) {
594                                 if( pbuf ) {
595                                         free( pbuf );
596                                 }
597                                 if( mlen < 512 ) {
598                                         pbuf_size = 512;
599                                 } else {
600                                         pbuf_size = mlen * 2;
601                                 }
602                                 pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
603                         }
604                         memcpy( pbuf, payload, mlen );
605                         pbuf[mlen] = 0;                                                                         // don't depend on sender making this a legit string
606
607                         curr = pbuf;
608                         while( curr ) {                                                         // loop over each record in the buffer
609                                 nextr = strchr( curr, '\n' );                   // allow multiple newline records, find end of current and mark
610
611                                 if( nextr ) {
612                                         *(nextr++) = 0;
613                                 }
614
615                                 if( vlevel > 1 ) {
616                                         rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
617                                 }
618                                 if( raw_interface ) {
619                                         parse_rt_rec( ctx, NULL, curr, vlevel, NULL );          // nil pvt to parser as we can't ack messages
620                                 } else {
621                                         parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg );         // parse record and add to in progress table
622                                 }
623
624                                 curr = nextr;
625                         }
626
627                         if( ctx->shutdown ) {           // mostly for testing, but allows user app to close us down if rmr_*() function sets this
628                                 break;
629                         }
630
631                         msg->len = 0;                           // force back into the listen loop
632                 }
633         }
634
635         return NULL;    // unreachable, but some compilers don't see that and complain.
636 }
637 #endif
638
639
640 #endif