aa829eff81dec1419089d8e571d62b76c4329974
[ric-plt/lib/rmr.git] / src / rmr / common / src / rtc_static.c
1 // : vi ts=4 sw=4 noet :
2 /*
3 ==================================================================================
4         Copyright (c) 2019-2020 Nokia
5         Copyright (c) 2018-2020 AT&T Intellectual Property.
6
7    Licensed under the Apache License, Version 2.0 (the "License");
8    you may not use this file except in compliance with the License.
9    You may obtain a copy of the License at
10
11            http://www.apache.org/licenses/LICENSE-2.0
12
13    Unless required by applicable law or agreed to in writing, software
14    distributed under the License is distributed on an "AS IS" BASIS,
15    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16    See the License for the specific language governing permissions and
17    limitations under the License.
18 ==================================================================================
19 */
20
21 /*
22         Mnemonic:       rt_collector.c
23         Abstract:       The route table collector is started as a separate pthread and
24                                 is responsible for listening for route table updates from a
25                                 route manager or route table generator process.
26
27         Author:         E. Scott Daniels
28         Date:           29 November 2018 (extracted to common 13 March 2019)
29 */
30
31 #ifndef _rt_collector_c
32 #define _rt_collector_c
33
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <netdb.h>
37 #include <errno.h>
38 #include <string.h>
39 #include <errno.h>
40 #include <fcntl.h>
41 #include <sys/types.h>
42 #include <sys/stat.h>
43 #include <unistd.h>
44
45 #include <RIC_message_types.h>          // needed for RMR/Rt Mgr msg types
46
47 // ---- local constants ------------------
48                                                                                 // flags
49 #define RTCFL_HAVE_UPDATE       0x01            // an update from RM was received
50
51 #define MAX_RTC_BUF     5 * 1024                        // max buffer size we'll expect is 4k, add some fudge room
52
53 // ------------------------------------------------------------------------------------------------
54
55 /*
56         Loop forever (assuming we're running in a pthread reading the static table
57         every minute or so.
58 */
59 static void* rtc_file( void* vctx ) {
60         uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
61         char*   eptr;
62         int             vfd = -1;                                       // verbose file des if we have one
63         int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
64         char    wbuf[256];
65
66
67         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
68                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
69                 return NULL;
70         }
71
72         if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
73                 vfd = open( eptr, O_RDONLY );
74         }
75
76         ctx->flags |= CFL_NO_RTACK;                             // no attempt to ack when reading from a file
77         while( 1 ) {
78                 if( vfd >= 0 ) {
79                         wbuf[0] = 0;
80                         lseek( vfd, 0, 0 );
81                         read( vfd, wbuf, 10 );
82                         vlevel = atoi( wbuf );
83                 }
84
85                 read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
86
87                 sleep( 60 );
88         }
89 }
90
91 static int refresh_vlevel( int vfd ) {
92         int vlevel = 0;
93         char    rbuf[128];
94
95         if( vfd >= 0 ) {                                        // if file is open, read current value
96                 rbuf[0] = 0;
97                 lseek( vfd, 0, 0 );
98                 read( vfd, rbuf, 10 );
99                 vlevel = atoi( rbuf );
100         }
101
102         return vlevel;
103 }
104
105 /*
106         Route Table Collector
107         A side thread which either attempts to connect and request a table
108         from the Route Manager, or opens a port and listens for Route Manager
109         to push table updates.
110
111         It may do other things along the way (latency measurements, alarms, 
112         respond to RMR pings, etc.).
113
114         The behaviour with respect to listening for Route Manager updates vs
115         the initiation of the connection and sending a request depends on the
116         value of the ENV_RTG_ADDR (RMR_RTG_SVC)  environment variable. If
117         host:port, or IP:port, is given, then we assume that we make the connection
118         and send a request for the table (request mode).  If the variable is just 
119         a port, then we assume Route Manager will connect and push updates (original 
120         method).
121
122         If the variable is not defined, the default behaviour, in order to be
123         backwards compatable, depends on the presence of the ENV_CTL_PORT 
124         (RMR_CTL_PORT) variable (new with the support for requesting a table).
125
126         
127         ENV_CTL_PORT    ENV_RTG_ADDR    Behaviour
128         unset                   unset                   Open default CTL port (DEF_CTL_PORT) and
129                                                                         wait for Rt Mgr to push tables
130
131         set                             unset                   Use the default Rt Mgr wellknown addr
132                                                                         and port (DEF_RTG_WK_ADDR) to connect
133                                                                         and request a table. The control port
134                                                                         used is the value set by ENV_CTL_PORT.
135
136         unset                   set                             As described above. The default control
137                                                                         port (DEF_CTL_PORT) is used. 
138
139         When we are running in request mode, then we will send the RMR message 
140         RMRRM_REFRESH to this address (wormhole) as a request for the route manager 
141         to send a new table. We will attempt to connect and send requests until
142         we have a table. Calls to rmr_ready() will report FALSE until a table is
143         loaded _unless_ a seed table was given.
144
145         Route table information is expected to arrive on RMR messages with type 
146         RMRRM_TABLE_DATA.  There is NOT a specific message type for each possible
147         table record, so the payload is as it appears in the seed file or as
148         delivered in old versions.  It may take several RMRRM_TABLE_DATA messages
149         to completely supply a new table or table update. See the header for parse_rt_rec
150         in common for a description of possible message contents.
151
152         Buffers received from the route table generator can contain multiple newline terminated
153         records, but each buffer must be less than 4K in length, and the last record in a
154         buffer may NOT be split across buffers.
155
156         Other chores:
157         In addition to the primary task of getting, vetting, and installing a new route table, or
158         updates to the existing table, this thread will periodically cause the send counts for each
159         endpoint known to be written to standard error. The frequency is once every 180 seconds, and
160         more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
161 */
162 static void* rtc( void* vctx ) {
163         uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
164         uta_ctx_t*      pvt_cx;                                 // private context for session with rtg
165         rmr_mbuf_t*     msg = NULL;                             // message from rtg
166         char*   payload;                                        // payload in the message
167         size_t  mlen;
168         char*   my_port;                                        // the port number that we will listen on (4561 has been the default for this)
169         char*   rtg_addr;                                       // host:port address of route table generator (route manager)
170         char*   daddr;                                          // duplicated rtg address string to parse/trash
171         size_t  buf_size;                                       // nng needs var pointer not just size?
172         char*   nextr;                                          // pointer at next record in the message
173         char*   curr;                                           // current record
174         int     i;
175         long    blabber = 0;                            // time of last blabber so we don't flood if rtg goes bad
176         int             cstate = -1;                            // connection state to rtg
177         int             state;                                          // processing state of some nng function
178         char*   tokens[128];
179         char    wbuf[128];
180         char*   pbuf = NULL;
181         int             pbuf_size = 0;                          // number allocated in pbuf
182         int             ntoks;
183         int             vfd = -1;                                       // verbose file des if we have one
184         int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
185         char*   eptr;
186         int             epfd = -1;                                      // fd for epoll so we can multi-task
187         struct  epoll_event events[1];          // list of events to give to epoll; we only have one we care about
188         struct  epoll_event epe;                        // event definition for event to listen to
189         int             count_delay = 30;                       // number of seconds between writing count info; initially every 30s
190         int             bump_freq = 0;                          // time at which we will bump count frequency to every 5 minutes
191         int             flags = 0;
192
193
194         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
195                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
196                 return NULL;
197         }
198
199         if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
200                 vfd = open( eptr, O_RDONLY );
201                 vlevel = refresh_vlevel( vfd );
202         }
203
204         ctx->flags |= CFL_NO_RTACK;                     // don't ack when reading from a file
205         read_static_rt( ctx, vlevel );                  // seed the route table if one provided
206         ctx->flags &= ~CFL_NO_RTACK;
207
208
209         my_port = getenv( ENV_CTL_PORT );                               // default port to listen on (likely 4561)
210         if( my_port == NULL || ! *my_port ) {                   // if undefined, then go with default
211                 my_port = DEF_CTL_PORT; 
212                 daddr = DEF_CTL_PORT;                                           // backwards compat; if ctl port not hard defined, default is to listen
213         } else {
214                 daddr = DEF_RTG_WK_ADDR;                                        // if ctl port is defined, then default changes to connecting to well known RM addr
215         }
216
217         if( (rtg_addr = getenv( ENV_RTG_ADDR )) == NULL || ! *rtg_addr ) {              // undefined, use default set above
218                 rtg_addr = daddr;
219         }
220
221         daddr = strdup( rtg_addr );                                                                     // dup to destroy during parse
222
223         ntoks = uta_tokenise( daddr, tokens, 120, ':' );                        // should be host:ip of rt mgr (could be port only which we assume is old listen port)
224         switch( ntoks ) {
225                 case 0:                                                                 // should not happen, but prevent accidents and allow default to ignore additional tokens
226                         break;
227
228                 case 1:
229                         my_port = tokens[0];                    // just port -- assume backlevel environment where we just listen
230                         flags |= RTCFL_HAVE_UPDATE;             // prevent sending update reqests
231                         break;
232
233                 default:
234                         if( strcmp( tokens[0], "tcp" ) == 0 ) {                 // old school nng tcp:xxxx so we listen on xxx
235                                 flags |= RTCFL_HAVE_UPDATE;                                     // and signal not to try to request an update
236                                 my_port = tokens[1];
237                         } else {
238                                 // rtg_addr points at rt mgr address and my port set from env or default stands as is   
239                         }
240                         break;
241         }
242
243         if( (pvt_cx = init( my_port, MAX_RTC_BUF, FL_NOTHREAD )) == NULL ) {                            // open a private context (no RT listener!)
244                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
245
246                 while( TRUE ) {                                                                                         // no listen port, just dump counts now and then
247                         sleep( count_delay );
248                         rt_epcounts( ctx->rtable, ctx->my_name );
249                 }
250
251                 return NULL;
252         }
253
254         ctx->rtg_whid = -1;
255
256         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", my_port );
257
258         bump_freq = time( NULL ) + 300;                         // after 5 minutes we decrease the count frequency
259         blabber = 0;
260         while( 1 ) {                                                                                                            // until the cows return, pigs fly, or somesuch event likely not to happen
261                 while( msg == NULL || msg->len <= 0 ) {                                                 // until we actually have something from the other side
262                         if( (flags & RTCFL_HAVE_UPDATE) == 0 ) {                                                // no route table updated from rt mgr; request one
263                                 if( ctx->rtg_whid < 0 ) {
264                                         ctx->rtg_whid = rmr_wh_open( pvt_cx, rtg_addr );
265                                 }
266                                 send_update_req( pvt_cx, ctx );
267                         }
268
269                         msg = rmr_torcv_msg( pvt_cx, msg, 1000 );
270
271                         if( time( NULL ) > blabber  ) {
272                                 vlevel = refresh_vlevel( vfd );
273                                 if( vlevel >= 0 ) {                                                                             // allow it to be forced off with -n in verbose file
274                                         blabber = time( NULL ) + count_delay;                           // set next time to blabber, then do so
275                                         if( blabber > bump_freq ) {
276                                                 count_delay = 300;
277                                         }
278                                         rt_epcounts( ctx->rtable, ctx->my_name );
279                                 }
280                         }
281                 }
282
283                 vlevel = refresh_vlevel( vfd );                 // ensure it's fresh when we get a message
284
285                 if( msg != NULL && msg->len > 0 ) {
286                         payload = msg->payload;
287                         mlen = msg->len;                                        // usable bytes in the payload
288                         if( vlevel > 1 ) {
289                                 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message type=%d len=%d bytes (%s)\n", msg->mtype, (int) mlen, msg->payload );
290                         } else {
291                                 if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message type=%d len=%d\n", msg->mtype, (int) mlen );
292                         }
293
294                         switch( msg->mtype ) {
295                                 case RMRRM_TABLE_DATA:
296                                         if( (flags & RTCFL_HAVE_UPDATE) == 0 ) {
297                                                 flags |= RTCFL_HAVE_UPDATE;
298                                                 rmr_vlog( RMR_VL_INFO, "message flow from route manager starts\n" );
299                                         }
300
301                                         if( pbuf_size <= mlen ) {
302                                                 if( pbuf ) {
303                                                         free( pbuf );
304                                                 }
305                                                 if( mlen < 512 ) {
306                                                         pbuf_size = 512;
307                                                 } else {
308                                                         pbuf_size = mlen * 2;
309                                                 }
310                                                 pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
311                                         }
312                                         memcpy( pbuf, payload, mlen );
313                                         pbuf[mlen] = 0;                                                                         // don't depend on sender making this a legit string
314
315                                         curr = pbuf;
316                                         while( curr ) {                                                         // loop over each record in the buffer
317                                                 nextr = strchr( curr, '\n' );                   // allow multiple newline records, find end of current and mark
318
319                                                 if( nextr ) {
320                                                         *(nextr++) = 0;
321                                                 }
322
323                                                 if( vlevel > 1 ) {
324                                                         rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
325                                                 }
326                                                 parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg );         // parse record and add to in progress table; ack using rts to msg
327
328                                                 curr = nextr;
329                                         }
330
331                                         msg->len = 0;                           // force back into the listen loop
332                                         break;
333
334                                 default:
335                                         rmr_vlog( RMR_VL_WARN, "rmr_rtc: invalid message type=%d len=%d\n", msg->mtype, (int) msg->len );
336                                         break;
337                         }
338                 }
339
340                 if( ctx->shutdown ) {           // mostly for testing, but allows user app to close us down if rmr_*() function sets this
341                         return NULL;
342                 }
343
344         }
345
346         return NULL;    // unreachable, but some compilers don't see that and complain.
347 }
348
349 #ifndef SI95_BUILD
350 // this is nng specific inas much as we allow raw (non-RMR) messages 
351
352 /*
353         NOTE:   This is the original rtc code when we supported "raw" nano/nng messages
354                         from the route manger.  It is deprecated in favour of managing all RM-RMR
355                         communications via an RMR session.
356
357                         The rtc() function above is the new and preferred function regardless
358                         of transport.
359
360         -----------------------------------------------------------------------------------
361         Route Table Collector
362         A side thread which opens a socket and subscribes to a routing table generator.
363         It may do other things along the way (latency measurements?).
364
365         The pointer is a pointer to the context.
366
367         Listens for records from the route table generation publisher, expecting
368         one of the following, newline terminated, ASCII records:
369                 rte|msg-type||]name:port,name:port,...;name:port,...                    // route table entry with one or more groups of endpoints
370                 new|start                                                               // start of new table
371                 new|end                                                                 // end of new table; complete
372
373                 Name must be a host name which can be looked up via gethostbyname() (DNS).
374
375                 Multiple endpoints (name:port) may be given separated by a comma; an endpoint is selected using round robin
376                         for each message of the type that is sent.
377
378                 Multiple endpoint groups can be given as a comma separated list of endpoints, separated by semicolons:
379                                 group1n1:port,group1n2:port,group1n3:port;group2n1:port,group2n2:port
380
381                 If multiple groups are given, when send() is called for the cooresponding message type,
382                 the message will be sent to one endpoint in each group.
383
384                 msg-type is the numeric message type (e.g. 102). If it is given as n,name then it is assumed
385                 that the entry applies only to the instance running with the hostname 'name.'
386
387         Buffers received from the route table generator can contain multiple newline terminated
388         records, but each buffer must be less than 4K in length, and the last record in a
389         buffer may NOT be split across buffers.
390
391         Other chores:
392         In addition to the primary task of getting, vetting, and installing a new route table, or
393         updates to the existing table, this thread will periodically cause the send counts for each
394         endpoint known to be written to standard error. The frequency is once every 180 seconds, and
395         more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
396 */
397 static void* raw_rtc( void* vctx ) {
398         uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
399         uta_ctx_t*      pvt_cx;                                 // private context for session with rtg
400         rmr_mbuf_t*     msg = NULL;                             // message from rtg
401         char*   payload;                                        // payload in the message
402         size_t  mlen;
403         char*   port;                                           // a port number we listen/connect to
404         char*   fport;                                          // pointer to the real buffer to free
405         size_t  buf_size;                                       // nng needs var pointer not just size?
406         char*   nextr;                                          // pointer at next record in the message
407         char*   curr;                                           // current record
408         int     i;
409         long    blabber = 0;                            // time of last blabber so we don't flood if rtg goes bad
410         int             cstate = -1;                            // connection state to rtg
411         int             state;                                          // processing state of some nng function
412         char*   tokens[128];
413         char    wbuf[128];
414         char*   pbuf = NULL;
415         int             pbuf_size = 0;                          // number allocated in pbuf
416         int             ntoks;
417         int             raw_interface = 1;                      // rtg is using raw NNG/Nano not RMr to send updates
418         int             vfd = -1;                                       // verbose file des if we have one
419         int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
420         char*   eptr;
421         int             epfd = -1;                                      // fd for epoll so we can multi-task
422         struct  epoll_event events[1];          // list of events to give to epoll; we only have one we care about
423         struct  epoll_event epe;                        // event definition for event to listen to
424         int             rcv_fd = -1;                            // pollable file des from NNG to use for timeout
425         int             count_delay = 30;                       // number of seconds between writing count info; initially every 30s
426         int             bump_freq = 0;                          // time at which we will bump count frequency to every 5 minutes
427
428
429         if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
430                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
431                 return NULL;
432         }
433
434         if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
435                 vfd = open( eptr, O_RDONLY );
436                 vlevel = refresh_vlevel( vfd );
437         }                
438
439         read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
440
441         if( (port = getenv( ENV_RTG_PORT )) == NULL || ! *port ) {              // port we need to open to listen for RTG connections
442                 port = strdup( DEF_RTG_PORT );
443         } else {
444                 port = strdup( port );
445         }
446
447 /*
448         this test is now done in init and this function is started _only_ if the value was 1
449         if( (curr = getenv( ENV_RTG_RAW )) != NULL ) {
450                 raw_interface = atoi( curr ) > 0;                               // if > 0 we assume that rtg messages are NOT coming from an RMr based process
451         }
452 */
453
454         fport = port;           // must hold to free
455
456         ntoks = uta_tokenise( port, tokens, 120, ':' );                 // assume tcp:port, but it could be port or old style host:port
457         switch( ntoks ) {
458                 case 1:
459                                 port = tokens[0];                       // just the port
460                                 break;
461
462                 case 2:
463                                 port = tokens[1];                       // tcp:port or :port
464                                 break;
465
466                 default:
467                                 port = DEF_RTG_PORT;            // this shouldn't happen, but parnioia is good
468                                 break;
469         }
470
471         if( (pvt_cx = init( port, MAX_RTG_MSG_SZ, FL_NOTHREAD )) == NULL ) {                            // open a private context
472                 rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
473
474                 while( TRUE ) {                                                                                         // no listen port, just dump counts now and then
475                         sleep( count_delay );
476                         rt_epcounts( ctx->rtable, ctx->my_name );
477                 }
478                 
479                 free( fport );                                  // parinoid free and return
480                 return NULL;
481         }
482
483         if( (rcv_fd = rmr_get_rcvfd( pvt_cx )) >= 0 ) {            // get the epoll fd for the rtg socket
484                 if( rcv_fd < 0 ) {
485                         rmr_vlog( RMR_VL_WARN, "cannot get epoll fd for rtg session; stats will generate only after update from rt manager\n" );
486                 } else {
487                         if( (epfd = epoll_create1( 0 )) < 0 ) {
488                                 rmr_vlog( RMR_VL_WARN, "stats will generate only after rt manager update; unable to create epoll fd for rtg session: %s\n", strerror( errno ) );
489                                 rcv_fd = -1;
490                         } else {
491                                 epe.events = EPOLLIN;
492                                 epe.data.fd = rcv_fd;
493
494                                 if( epoll_ctl( epfd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
495                                         rmr_vlog( RMR_VL_WARN, "stats will generate only after rt manager update; unable to init epoll_ctl: %s\n", strerror( errno ) );
496                                         rcv_fd = -1;
497                                 }
498                         }
499                 }
500         }
501
502         if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", port );
503         free( fport );
504
505         // future:  if we need to register with the rtg, then build a message and send it through a wormhole here
506
507         bump_freq = time( NULL ) + 300;                         // after 5 minutes we decrease the count frequency
508         blabber = 0;
509         while( 1 ) {                    // until the cows return, pigs fly, or somesuch event likely not to happen
510                 while( msg == NULL || msg->len <= 0 ) {                                                 // until we actually have something from the other side
511                         if( rcv_fd < 0 || epoll_wait( epfd, events, 1, 1000 ) > 0 )  {  // skip epoll if init failed, else block for max 1 sec
512                                 if( raw_interface ) {
513                                         msg = (rmr_mbuf_t *) rcv_payload( pvt_cx, msg );                // receive from non-RMr sender
514                                 } else {
515                                         msg = rmr_rcv_msg( pvt_cx, msg );               // receive from an RMr sender
516                                 }
517                         } else {                                                                                                        // no msg, do extra tasks
518                                 if( msg != NULL ) {                                                                             // if we were working with a message; ensure no len
519                                         msg->len = 0;
520                                         msg->state = RMR_ERR_TIMEOUT;
521                                 }
522                         }
523
524                         if( time( NULL ) > blabber  ) {
525                                 vlevel = refresh_vlevel( vfd );
526                                 if( vlevel >= 0 ) {                                                                             // allow it to be forced off with -n in verbose file
527                                         blabber = time( NULL ) + count_delay;                           // set next time to blabber, then do so
528                                         if( blabber > bump_freq ) {
529                                                 count_delay = 300;
530                                         }
531                                         rt_epcounts( ctx->rtable, ctx->my_name );
532                                 }
533                         }
534                 }
535
536                 vlevel = refresh_vlevel( vfd );                 // ensure it's fresh when we get a message
537
538                 if( msg != NULL && msg->len > 0 ) {
539                         payload = msg->payload;
540                         mlen = msg->len;                                        // usable bytes in the payload
541                         if( vlevel > 1 ) {
542                                 rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes (%s)\n", (int) mlen, msg->payload );
543                         } else {
544                                 if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes\n", (int) mlen );
545                         }
546
547                         if( pbuf_size <= mlen ) {
548                                 if( pbuf ) {
549                                         free( pbuf );
550                                 }
551                                 if( mlen < 512 ) {
552                                         pbuf_size = 512;
553                                 } else {
554                                         pbuf_size = mlen * 2;
555                                 }
556                                 pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
557                         }
558                         memcpy( pbuf, payload, mlen );
559                         pbuf[mlen] = 0;                                                                         // don't depend on sender making this a legit string
560
561                         curr = pbuf;
562                         while( curr ) {                                                         // loop over each record in the buffer
563                                 nextr = strchr( curr, '\n' );                   // allow multiple newline records, find end of current and mark
564
565                                 if( nextr ) {
566                                         *(nextr++) = 0;
567                                 }
568
569                                 if( vlevel > 1 ) {
570                                         rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
571                                 }
572                                 if( raw_interface ) {
573                                         parse_rt_rec( ctx, NULL, curr, vlevel, NULL );          // nil pvt to parser as we can't ack messages
574                                 } else {
575                                         parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg );         // parse record and add to in progress table
576                                 }
577
578                                 curr = nextr;
579                         }
580
581                         if( ctx->shutdown ) {           // mostly for testing, but allows user app to close us down if rmr_*() function sets this
582                                 break;
583                         }
584
585                         msg->len = 0;                           // force back into the listen loop
586                 }
587         }
588
589         return NULL;    // unreachable, but some compilers don't see that and complain.
590 }
591 #endif
592
593
594 #endif