Fix route table clone core dump
[ric-plt/lib/rmr.git] / src / rmr / common / src / rtc_static.c
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rt_collector.c
23         Abstract:       The route table collector is started as a separate pthread and
24                                 is responsible for listening for route table updates from a
25                                 route manager or route table generator process.
26
27         Author:         E. Scott Daniels
28         Date:           29 November 2018 (extracted to common 13 March 2019)
29 */
30
31 #ifndef _rt_collector_c
32 #define _rt_collector_c
33
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <netdb.h>
37 #include <errno.h>
38 #include <string.h>
39 #include <errno.h>
40 #include <fcntl.h>
41 #include <sys/types.h>
42 #include <sys/stat.h>
43 #include <unistd.h>
44
45 #include <RIC_message_types.h>          // needed for RMR/Rt Mgr msg types
46
47 // ---- local constants ------------------
48                                                                                 // flags
49 #define RTCFL_HAVE_UPDATE       0x01            // an update from RM was received
50
51 #define MAX_RTC_BUF     5 * 1024                        // max buffer size we'll expect is 4k, add some fudge room
52
53 // ------------------------------------------------------------------------------------------------
54
55 /*
56         Loop forever (assuming we're running in a pthread reading the static table
57         every minute or so.
58 */
59 static void* rtc_file( void* vctx ) {
60         uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
61         char*   eptr;
62         int             vfd = -1;                                       // verbose file des if we have one
63         int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
64         char    wbuf[256];
65
66
67         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
68                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
69                 return NULL;
70         }
71
72         if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
73                 vfd = open( eptr, O_RDONLY );
74         }
75
76         ctx->flags |= CFL_NO_RTACK;                             // no attempt to ack when reading from a file
77         while( 1 ) {
78                 if( vfd >= 0 ) {
79                         memset( wbuf, 0, sizeof( char ) * 11 );
80                         if( lseek( vfd, 0, SEEK_SET ) == 0 && read( vfd, wbuf, 10 ) > 0 ) {
81                                 vlevel = atoi( wbuf );
82                         }
83                 }
84
85                 read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
86
87                 if( ctx->shutdown != 0 ) {                                                      // allow for graceful termination and unit testing
88                         if( vfd >= 0 ) {
89                                 close( vfd );
90                         }
91                         return NULL;
92                 }
93                 sleep( 60 );
94         }
95 }
96
97 static int refresh_vlevel( int vfd ) {
98         int vlevel = 0;
99         char    rbuf[128];
100
101         if( vfd >= 0 ) {                                        // if file is open, read current value
102                 rbuf[0] = 0;
103                 memset( rbuf, 0, sizeof( char ) * 11 );
104                 if( lseek( vfd, 0, SEEK_SET ) == 0 && read( vfd, rbuf, 10 ) > 0 ) {
105                         vlevel = atoi( rbuf );
106                 }
107         }
108
109         return vlevel;
110 }
111
112 /*
113         Rtc_parse_msg parses a single message from the route manager. We allow multiple, newline terminated,
114         records in each message; it is required that the last record in the message be complete (we do not
115         reconstruct records split over multiple messages).  For each record, we call the record parser
116         to parse and add the information to the table being built.
117
118         This function was broken from the main rtc() function in order to be able to unit test it. Without
119         this as a standalone funciton, it was impossible to simulate a message arriving on the RTC's private
120         context.
121
122         To reduce malloc/free cycles, we allocate a static work buffer and expand it when needed; in other
123         words, this is not thread safe but it shouldn't need to be.
124 */
125 static void rtc_parse_msg( uta_ctx_t *ctx, uta_ctx_t* pvt_cx, rmr_mbuf_t* msg, int vlevel,  int* flags ) {
126         static  unsigned char* pbuf = NULL;
127         static  int pbuf_size = 0;
128
129         unsigned char* payload;
130         unsigned char* curr;
131         unsigned char* nextr;
132         int mlen;
133
134         payload = msg->payload;
135         mlen = msg->len;                                        // usable bytes in the payload
136
137         if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message type=%d len=%d\n", msg->mtype, (int) mlen );
138         switch( msg->mtype ) {
139                 case RMRRM_TABLE_DATA:
140                         if( (*flags & RTCFL_HAVE_UPDATE) == 0 ) {
141                                 *flags |= RTCFL_HAVE_UPDATE;
142                                 rmr_vlog( RMR_VL_INFO, "message flow from route manager starts\n" );
143                         }
144
145                         if( pbuf_size <= mlen ) {
146                                 if( pbuf ) {
147                                         free( pbuf );
148                                 }
149                                 if( mlen < 512 ) {
150                                         pbuf_size = 1024;
151                                 } else {
152                                         pbuf_size = mlen * 2;
153                                 }
154                                 pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
155                         }
156                         memcpy( pbuf, payload, mlen );
157                         pbuf[mlen] = 0;                                                                         // don't depend on sender making this a legit string
158                         if( vlevel > 1 ) {
159                                 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: rt message: (%s)\n", pbuf );
160                         }
161
162                         curr = pbuf;
163                         while( curr ) {                                                                         // loop over each record in the buffer
164                                 nextr = strchr( (char *) curr, '\n' );                  // allow multiple newline records, find end of current and mark
165
166                                 if( nextr ) {
167                                         *(nextr++) = 0;
168                                 }
169
170                                 if( vlevel > 1 ) {
171                                         rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc_parse_msg: processing (%s)\n", curr );
172                                 }
173                                 parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg );         // parse record and add to in progress table; ack using rts to msg
174
175                                 curr = nextr;
176                         }
177
178                         msg->len = 0;                           // force back into the listen loop
179                         break;
180
181                 default:
182                         rmr_vlog( RMR_VL_WARN, "rmr_rtc: invalid message type=%d len=%d\n", msg->mtype, (int) msg->len );
183                         break;
184         }
185 }
186
187 /*
188         Route Table Collector
189         A side thread which either attempts to connect and request a table
190         from the Route Manager, or opens a port and listens for Route Manager
191         to push table updates.
192
193         It may do other things along the way (latency measurements, alarms,
194         respond to RMR pings, etc.).
195
196         The behaviour with respect to listening for Route Manager updates vs
197         the initiation of the connection and sending a request depends on the
198         value of the ENV_RTG_ADDR (RMR_RTG_SVC)  environment variable. If
199         host:port, or IP:port, is given, then we assume that we make the connection
200         and send a request for the table (request mode).  If the variable is just
201         a port, then we assume Route Manager will connect and push updates (original
202         method).
203
204         If the variable is not defined, the default behaviour, in order to be
205         backwards compatable, depends on the presence of the ENV_CTL_PORT
206         (RMR_CTL_PORT) variable (new with the support for requesting a table).
207
208
209         ENV_CTL_PORT    ENV_RTG_ADDR    Behaviour
210         unset                   unset                   Open default CTL port (DEF_CTL_PORT) and
211                                                                         wait for Rt Mgr to push tables
212
213         set                             unset                   Use the default Rt Mgr wellknown addr
214                                                                         and port (DEF_RTG_WK_ADDR) to connect
215                                                                         and request a table. The control port
216                                                                         used is the value set by ENV_CTL_PORT.
217
218         unset                   set                             As described above. The default control
219                                                                         port (DEF_CTL_PORT) is used.
220
221         When we are running in request mode, then we will send the RMR message
222         RMRRM_REFRESH to this address (wormhole) as a request for the route manager
223         to send a new table. We will attempt to connect and send requests until
224         we have a table. Calls to rmr_ready() will report FALSE until a table is
225         loaded _unless_ a seed table was given.
226
227         Route table information is expected to arrive on RMR messages with type
228         RMRRM_TABLE_DATA.  There is NOT a specific message type for each possible
229         table record, so the payload is as it appears in the seed file or as
230         delivered in old versions.  It may take several RMRRM_TABLE_DATA messages
231         to completely supply a new table or table update. See the header for parse_rt_rec
232         in common for a description of possible message contents.
233
234         Buffers received from the route table generator can contain multiple newline terminated
235         records, but each buffer must be less than 4K in length, and the last record in a
236         buffer may NOT be split across buffers.
237
238         Other chores:
239         In addition to the primary task of getting, vetting, and installing a new route table, or
240         updates to the existing table, this thread will periodically cause the send counts for each
241         endpoint known to be written to standard error. The frequency is once every 180 seconds, and
242         more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
243 */
244 static void* rtc( void* vctx ) {
245         uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
246         uta_ctx_t*      pvt_cx;                                 // private context for session with rtg
247         rmr_mbuf_t*     msg = NULL;                             // message from rtg
248         char*   my_port;                                        // the port number that we will listen on (4561 has been the default for this)
249         char*   rtg_addr;                                       // host:port address of route table generator (route manager)
250         char*   daddr;                                          // duplicated rtg address string to parse/trash
251         size_t  buf_size;                                       // nng needs var pointer not just size?
252         int             i;
253         long    blabber = 0;                            // time of last blabber so we don't flood if rtg goes bad
254         int             cstate = -1;                            // connection state to rtg
255         int             state;                                          // processing state of some nng function
256         char*   tokens[128];
257         char    wbuf[128];
258         int             ntoks;
259         int             vfd = -1;                                       // verbose file des if we have one
260         int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
261         char*   eptr;
262         int             epfd = -1;                                      // fd for epoll so we can multi-task
263         struct  epoll_event events[1];          // list of events to give to epoll; we only have one we care about
264         struct  epoll_event epe;                        // event definition for event to listen to
265         int             count_delay = 30;                       // number of seconds between writing count info; initially every 30s
266         int             bump_freq = 0;                          // time at which we will bump count frequency to every 5 minutes
267         int             flags = 0;
268         int             rt_req_freq = DEF_RTREQ_FREQ;   // request frequency (sec) when wanting a new table
269         int             nxt_rt_req = 0;                                 // time of next request
270
271
272         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
273                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
274                 return NULL;
275         }
276
277         if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
278                 vfd = open( eptr, O_RDONLY );
279                 vlevel = refresh_vlevel( vfd );
280         }
281
282         if( (eptr = getenv( ENV_RTREQ_FREA )) != NULL ) {
283                 rt_req_freq = atoi( eptr );
284                 if( rt_req_freq < 1 || rt_req_freq > 300 ) {
285                         rt_req_freq = DEF_RTREQ_FREQ;
286                         rmr_vlog( RMR_VL_WARN, "rmr_rtc: RT request frequency (%d) out of range (1-300), using default", DEF_RTREQ_FREQ );
287                 }
288         }
289         rmr_vlog( RMR_VL_INFO, "rmr_rtc: RT request frequency set to: %d seconds", rt_req_freq );
290
291         ctx->flags |= CFL_NO_RTACK;                             // don't ack when reading from a file
292         read_static_rt( ctx, vlevel );                  // seed the route table if one provided
293         ctx->flags &= ~CFL_NO_RTACK;
294
295
296         my_port = getenv( ENV_CTL_PORT );                               // default port to listen on (likely 4561)
297         if( my_port == NULL || ! *my_port ) {                   // if undefined, then go with default
298                 my_port = DEF_CTL_PORT;
299                 daddr = DEF_CTL_PORT;                                           // backwards compat; if ctl port not hard defined, default is to listen
300         } else {
301                 daddr = DEF_RTG_WK_ADDR;                                        // if ctl port is defined, then default changes to connecting to well known RM addr
302         }
303
304         if( (rtg_addr = getenv( ENV_RTG_ADDR )) == NULL || ! *rtg_addr ) {              // undefined, use default set above
305                 rtg_addr = daddr;
306         }
307
308         daddr = strdup( rtg_addr );                                                                     // dup to destroy during parse
309
310         ntoks = uta_tokenise( daddr, tokens, 120, ':' );                        // should be host:ip of rt mgr (could be port only which we assume is old listen port)
311         switch( ntoks ) {
312                 case 0:                                                                 // should not happen, but prevent accidents and allow default to ignore additional tokens
313                         break;
314
315                 case 1:
316                         my_port = tokens[0];                    // just port -- assume backlevel environment where we just listen
317                         flags |= RTCFL_HAVE_UPDATE;             // prevent sending update reqests
318                         break;
319
320                 default:
321                         if( strcmp( tokens[0], "tcp" ) == 0 ) {                 // old school nng tcp:xxxx so we listen on xxx
322                                 flags |= RTCFL_HAVE_UPDATE;                                     // and signal not to try to request an update
323                                 my_port = tokens[1];
324                         } else {
325                                 // rtg_addr points at rt mgr address and my port set from env or default stands as is
326                         }
327                         break;
328         }
329
330         if( (pvt_cx = init( my_port, MAX_RTC_BUF, FL_NOTHREAD )) == NULL ) {                            // open a private context (no RT listener!)
331                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
332
333                 while( TRUE ) {                                                                                         // no listen port, just dump counts now and then
334                         sleep( count_delay );
335                         rt_epcounts( ctx->rtable, ctx->my_name );
336                 }
337
338                 return NULL;
339         }
340
341         ctx->rtg_whid = -1;
342
343         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", my_port );
344
345         bump_freq = time( NULL ) + 300;                         // after 5 minutes we decrease the count frequency
346         blabber = 0;
347         while( 1 ) {                                                                                                            // until the cows return, pigs fly, or somesuch event likely not to happen
348                 while( msg == NULL || msg->len <= 0 ) {                                                 // until we actually have something from the other side
349                         if( (flags & RTCFL_HAVE_UPDATE) == 0 && time( NULL ) >= nxt_rt_req ) {                  // no route table updated from rt mgr; request one
350                                 if( ctx->rtg_whid < 0 ) {
351                                         ctx->rtg_whid = rmr_wh_open( pvt_cx, rtg_addr );
352                                 }
353                                 send_update_req( pvt_cx, ctx );
354                                 nxt_rt_req = time( NULL ) + rt_req_freq;
355                         }
356
357                         msg = rmr_torcv_msg( pvt_cx, msg, 1000 );
358
359                         if( time( NULL ) > blabber  ) {
360                                 vlevel = refresh_vlevel( vfd );
361                                 blabber = time( NULL ) + count_delay;                           // set next time to blabber, then do so
362                                 if( blabber > bump_freq ) {
363                                         count_delay = 300;
364                                 }
365                                 if( vlevel >= 0 ) {                                                                             // allow it to be forced off with -n in verbose file
366                                         rt_epcounts( ctx->rtable, ctx->my_name );
367                                 }
368                         }
369
370                         if( ctx->shutdown != 0 ) {
371                                 break;                                                  // mostly for unit test, but allows a forced stop
372                         }
373                 }
374
375                 vlevel = refresh_vlevel( vfd );                 // ensure it's fresh when we get a message
376
377                 if( msg != NULL && msg->len > 0 ) {
378                         rtc_parse_msg( ctx, pvt_cx, msg, vlevel, &flags );
379                 }
380
381                 if( ctx->shutdown ) {           // mostly for testing, but allows user app to close us down if rmr_*() function sets this
382                         return NULL;
383                 }
384
385         }
386
387         return NULL;    // unreachable, but some compilers don't see that and complain.
388 }
389
390 #ifndef SI95_BUILD
391 // this is nng specific inas much as we allow raw (non-RMR) messages
392
393 /*
394         NOTE:   This is the original rtc code when we supported "raw" nano/nng messages
395                         from the route manger.  It is deprecated in favour of managing all RM-RMR
396                         communications via an RMR session.
397
398                         The rtc() function above is the new and preferred function regardless
399                         of transport.
400
401         -----------------------------------------------------------------------------------
402         Route Table Collector
403         A side thread which opens a socket and subscribes to a routing table generator.
404         It may do other things along the way (latency measurements?).
405
406         The pointer is a pointer to the context.
407
408         Listens for records from the route table generation publisher, expecting
409         one of the following, newline terminated, ASCII records:
410                 rte|msg-type||]name:port,name:port,...;name:port,...                    // route table entry with one or more groups of endpoints
411                 new|start                                                               // start of new table
412                 new|end                                                                 // end of new table; complete
413
414                 Name must be a host name which can be looked up via gethostbyname() (DNS).
415
416                 Multiple endpoints (name:port) may be given separated by a comma; an endpoint is selected using round robin
417                         for each message of the type that is sent.
418
419                 Multiple endpoint groups can be given as a comma separated list of endpoints, separated by semicolons:
420                                 group1n1:port,group1n2:port,group1n3:port;group2n1:port,group2n2:port
421
422                 If multiple groups are given, when send() is called for the cooresponding message type,
423                 the message will be sent to one endpoint in each group.
424
425                 msg-type is the numeric message type (e.g. 102). If it is given as n,name then it is assumed
426                 that the entry applies only to the instance running with the hostname 'name.'
427
428         Buffers received from the route table generator can contain multiple newline terminated
429         records, but each buffer must be less than 4K in length, and the last record in a
430         buffer may NOT be split across buffers.
431
432         Other chores:
433         In addition to the primary task of getting, vetting, and installing a new route table, or
434         updates to the existing table, this thread will periodically cause the send counts for each
435         endpoint known to be written to standard error. The frequency is once every 180 seconds, and
436         more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
437 */
438 static void* raw_rtc( void* vctx ) {
439         uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
440         uta_ctx_t*      pvt_cx;                                 // private context for session with rtg
441         rmr_mbuf_t*     msg = NULL;                             // message from rtg
442         char*   payload;                                        // payload in the message
443         size_t  mlen;
444         char*   port;                                           // a port number we listen/connect to
445         char*   fport;                                          // pointer to the real buffer to free
446         size_t  buf_size;                                       // nng needs var pointer not just size?
447         char*   nextr;                                          // pointer at next record in the message
448         char*   curr;                                           // current record
449         int             i;
450         long    blabber = 0;                            // time of last blabber so we don't flood if rtg goes bad
451         int             cstate = -1;                            // connection state to rtg
452         int             state;                                          // processing state of some nng function
453         char*   tokens[128];
454         char    wbuf[128];
455         char*   pbuf = NULL;
456         int             pbuf_size = 0;                          // number allocated in pbuf
457         int             ntoks;
458         int             raw_interface = 1;                      // rtg is using raw NNG/Nano not RMr to send updates
459         int             vfd = -1;                                       // verbose file des if we have one
460         int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
461         char*   eptr;
462         int             epfd = -1;                                      // fd for epoll so we can multi-task
463         struct  epoll_event events[1];          // list of events to give to epoll; we only have one we care about
464         struct  epoll_event epe;                        // event definition for event to listen to
465         int             rcv_fd = -1;                            // pollable file des from NNG to use for timeout
466         int             count_delay = 30;                       // number of seconds between writing count info; initially every 30s
467         int             bump_freq = 0;                          // time at which we will bump count frequency to every 5 minutes
468
469
470         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
471                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
472                 return NULL;
473         }
474
475         if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
476                 vfd = open( eptr, O_RDONLY );
477                 vlevel = refresh_vlevel( vfd );
478         }
479
480         read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
481
482         if( (port = getenv( ENV_RTG_PORT )) == NULL || ! *port ) {              // port we need to open to listen for RTG connections
483                 port = strdup( DEF_RTG_PORT );
484         } else {
485                 port = strdup( port );
486         }
487
488         fport = port;           // must hold to free
489
490         ntoks = uta_tokenise( port, tokens, 120, ':' );                 // assume tcp:port, but it could be port or old style host:port
491         switch( ntoks ) {
492                 case 1:
493                                 port = tokens[0];                       // just the port
494                                 break;
495
496                 case 2:
497                                 port = tokens[1];                       // tcp:port or :port
498                                 break;
499
500                 default:
501                                 port = DEF_RTG_PORT;            // this shouldn't happen, but parnioia is good
502                                 break;
503         }
504
505         if( (pvt_cx = init( port, MAX_RTG_MSG_SZ, FL_NOTHREAD )) == NULL ) {                            // open a private context
506                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
507
508                 while( TRUE ) {                                                                                         // no listen port, just dump counts now and then
509                         sleep( count_delay );
510                         rt_epcounts( ctx->rtable, ctx->my_name );
511                 }
512
513                 free( fport );                                  // parinoid free and return
514                 return NULL;
515         }
516
517         if( (rcv_fd = rmr_get_rcvfd( pvt_cx )) >= 0 ) {            // get the epoll fd for the rtg socket
518                 if( rcv_fd < 0 ) {
519                         rmr_vlog( RMR_VL_WARN, "cannot get epoll fd for rtg session; stats will generate only after update from rt manager\n" );
520                 } else {
521                         if( (epfd = epoll_create1( 0 )) < 0 ) {
522                                 rmr_vlog( RMR_VL_WARN, "stats will generate only after rt manager update; unable to create epoll fd for rtg session: %s\n", strerror( errno ) );
523                                 rcv_fd = -1;
524                         } else {
525                                 epe.events = EPOLLIN;
526                                 epe.data.fd = rcv_fd;
527
528                                 if( epoll_ctl( epfd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
529                                         rmr_vlog( RMR_VL_WARN, "stats will generate only after rt manager update; unable to init epoll_ctl: %s\n", strerror( errno ) );
530                                         rcv_fd = -1;
531                                 }
532                         }
533                 }
534         }
535
536         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", port );
537         free( fport );
538
539         // future:  if we need to register with the rtg, then build a message and send it through a wormhole here
540
541         bump_freq = time( NULL ) + 300;                         // after 5 minutes we decrease the count frequency
542         blabber = 0;
543         while( 1 ) {                    // until the cows return, pigs fly, or somesuch event likely not to happen
544                 while( msg == NULL || msg->len <= 0 ) {                                                 // until we actually have something from the other side
545                         if( rcv_fd < 0 || epoll_wait( epfd, events, 1, 1000 ) > 0 )  {  // skip epoll if init failed, else block for max 1 sec
546                                 if( raw_interface ) {
547                                         msg = (rmr_mbuf_t *) rcv_payload( pvt_cx, msg );                // receive from non-RMr sender
548                                 } else {
549                                         msg = rmr_rcv_msg( pvt_cx, msg );               // receive from an RMr sender
550                                 }
551                         } else {                                                                                                        // no msg, do extra tasks
552                                 if( msg != NULL ) {                                                                             // if we were working with a message; ensure no len
553                                         msg->len = 0;
554                                         msg->state = RMR_ERR_TIMEOUT;
555                                 }
556                         }
557
558                         if( time( NULL ) > blabber  ) {
559                                 vlevel = refresh_vlevel( vfd );
560                                 if( vlevel >= 0 ) {                                                                             // allow it to be forced off with -n in verbose file
561                                         blabber = time( NULL ) + count_delay;                           // set next time to blabber, then do so
562                                         if( blabber > bump_freq ) {
563                                                 count_delay = 300;
564                                         }
565                                         rt_epcounts( ctx->rtable, ctx->my_name );
566                                 }
567                         }
568                 }
569
570                 vlevel = refresh_vlevel( vfd );                 // ensure it's fresh when we get a message
571
572                 if( msg != NULL && msg->len > 0 ) {
573                         payload = msg->payload;
574                         mlen = msg->len;                                        // usable bytes in the payload
575                         if( vlevel > 1 ) {
576                                 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes (%s)\n", (int) mlen, msg->payload );
577                         } else {
578                                 if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes\n", (int) mlen );
579                         }
580
581                         if( pbuf_size <= mlen ) {
582                                 if( pbuf ) {
583                                         free( pbuf );
584                                 }
585                                 if( mlen < 512 ) {
586                                         pbuf_size = 512;
587                                 } else {
588                                         pbuf_size = mlen * 2;
589                                 }
590                                 pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
591                         }
592                         memcpy( pbuf, payload, mlen );
593                         pbuf[mlen] = 0;                                                                         // don't depend on sender making this a legit string
594
595                         curr = pbuf;
596                         while( curr ) {                                                         // loop over each record in the buffer
597                                 nextr = strchr( curr, '\n' );                   // allow multiple newline records, find end of current and mark
598
599                                 if( nextr ) {
600                                         *(nextr++) = 0;
601                                 }
602
603                                 if( vlevel > 1 ) {
604                                         rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
605                                 }
606                                 if( raw_interface ) {
607                                         parse_rt_rec( ctx, NULL, curr, vlevel, NULL );          // nil pvt to parser as we can't ack messages
608                                 } else {
609                                         parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg );         // parse record and add to in progress table
610                                 }
611
612                                 curr = nextr;
613                         }
614
615                         if( ctx->shutdown ) {           // mostly for testing, but allows user app to close us down if rmr_*() function sets this
616                                 break;
617                         }
618
619                         msg->len = 0;                           // force back into the listen loop
620                 }
621         }
622
623         return NULL;    // unreachable, but some compilers don't see that and complain.
624 }
625 #endif
626
627
628 #endif