Ensure RT incremental update not applied early
[ric-plt/lib/rmr.git] / src / rmr / common / src / rtc_static.c
index 58fc940..10bc2a7 100644 (file)
@@ -1,8 +1,8 @@
 // : vi ts=4 sw=4 noet :
 /*
 ==================================================================================
-       Copyright (c) 2019 Nokia
-       Copyright (c) 2018-2019 AT&T Intellectual Property.
+       Copyright (c) 2019-2021 Nokia
+       Copyright (c) 2018-2021 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
 #include <sys/stat.h>
 #include <unistd.h>
 
-static int refresh_vlevel( int vfd ) {
-       int vlevel = 0;
-       char    rbuf[128];
+#include <RIC_message_types.h>         // needed for RMR/Rt Mgr msg types
 
-       if( vfd >= 0 ) {                                        // if file is open, read current value
-               rbuf[0] = 0;
-               lseek( vfd, 0, 0 );
-               read( vfd, rbuf, 10 );
-               vlevel = atoi( rbuf );
+// ---- local constants ------------------
+                                                                               // flags
+#define RTCFL_HAVE_UPDATE      0x01            // an update from RM was received
+
+#define MAX_RTC_BUF    5 * 1024                        // max buffer size we'll expect is 4k, add some fudge room
+
+// ------------------------------------------------------------------------------------------------
+
+/*
+       Opens the vlevel control file if needed and reads the vlevel from it.
+       The file is rewound if already open so that external updates are captured.
+       The current level is returnd; 0 on error.
+
+       The environment variable (ENV_VERBOSE_FILE) is used to supply the file to
+       open and read. If missing, we will try /tmp/rmr.v.  We will try to open the file
+       on each call if not alrady open; this allows the value to be supplied after
+       start which helps debugging.
+
+       If close_file is true, then we will close the open vfd and return 0;
+*/
+extern int refresh_vlevel( int close_file ) {
+       static int vfd = -1;
+
+       char*   eptr;
+       char    wbuf[128];                      // read buffer; MUST be 11 or greater
+       int             vlevel = 0;
+
+       if( close_file ) {
+               if( vfd >= 0 ) {
+                       close( vfd );
+                       vfd = -1;
+               }
+               return 0;
+       }
+
+       if( vfd < 0 ) {                         // attempt to find/open on all calls if not open
+               if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
+                       vfd = open( eptr, O_RDONLY );
+               } else {
+                       vfd = open( "/tmp/rmr.v", O_RDONLY );
+               }
+               if( vfd < 0 ) {
+                       return 0;
+               }
+       }
+
+       memset( wbuf, 0, sizeof( char ) * 11 );                 // ensure what we read will be nil terminated
+       if( lseek( vfd, 0, SEEK_SET ) == 0 && read( vfd, wbuf, 10 ) > 0 ) {
+               vlevel = atoi( wbuf );
        }
 
        return vlevel;
 }
 
 /*
+       Loop forever (assuming we're running in a pthread reading the static table
+       every minute or so.
+*/
+static void* rtc_file( void* vctx ) {
+       uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
+       char*   eptr;
+       int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
+       char    wbuf[256];
+
+
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
+               return NULL;
+       }
+
+       ctx->flags |= CFL_NO_RTACK;                             // no attempt to ack when reading from a file
+       while( 1 ) {
+               vlevel = refresh_vlevel( 0 );
+               read_static_rt( ctx, vlevel );                                          // refresh from the file
+
+               if( ctx->shutdown != 0 ) {                                                      // allow for graceful termination and unit testing
+                       refresh_vlevel( 1 );                                                            // close the verbose file if open
+                       return NULL;
+               }
+
+               if( ctx->rtable_ready ) {
+                       sleep( 60 );
+               } else {
+                       sleep( 1 );                                                                             // check every second until we have a good one
+               }
+       }
+}
+
+/*
+       Rtc_parse_msg parses a single message from the route manager. We allow multiple, newline terminated,
+       records in each message; it is required that the last record in the message be complete (we do not
+       reconstruct records split over multiple messages).  For each record, we call the record parser
+       to parse and add the information to the table being built.
+
+       This function was broken from the main rtc() function in order to be able to unit test it. Without
+       this as a standalone funciton, it was impossible to simulate a message arriving on the RTC's private
+       context.
+
+       To reduce malloc/free cycles, we allocate a static work buffer and expand it when needed; in other
+       words, this is not thread safe but it shouldn't need to be.
+*/
+static void rtc_parse_msg( uta_ctx_t *ctx, uta_ctx_t* pvt_cx, rmr_mbuf_t* msg, int vlevel,  int* flags ) {
+       static  unsigned char* pbuf = NULL;
+       static  int pbuf_size = 0;
+
+       unsigned char* payload;
+       unsigned char* curr;
+       unsigned char* nextr;
+       int mlen;
+
+       payload = msg->payload;
+       mlen = msg->len;                                        // usable bytes in the payload
+
+       if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message type=%d len=%d\n", msg->mtype, (int) mlen );
+       switch( msg->mtype ) {
+               case RMRRM_TABLE_DATA:
+                       if( (*flags & RTCFL_HAVE_UPDATE) == 0 ) {
+                               *flags |= RTCFL_HAVE_UPDATE;
+                               rmr_vlog( RMR_VL_INFO, "message flow from route manager starts\n" );
+                       }
+
+                       if( pbuf_size <= mlen ) {
+                               if( pbuf ) {
+                                       free( pbuf );
+                               }
+                               if( mlen < 512 ) {
+                                       pbuf_size = 1024;
+                               } else {
+                                       pbuf_size = mlen * 2;
+                               }
+                               pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
+                       }
+                       memcpy( pbuf, payload, mlen );
+                       pbuf[mlen] = 0;                                                                         // don't depend on sender making this a legit string
+                       if( vlevel > 1 ) {
+                               rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: rt message: (%s)\n", pbuf );
+                       }
+
+                       curr = pbuf;
+                       while( curr ) {                                                                         // loop over each record in the buffer
+                               nextr = strchr( (char *) curr, '\n' );                  // allow multiple newline records, find end of current and mark
+
+                               if( nextr ) {
+                                       *(nextr++) = 0;
+                               }
+
+                               if( vlevel > 1 ) {
+                                       rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc_parse_msg: snarf_fd=%d processing (%s)\n", ctx ? ctx->snarf_rt_fd : -99, curr );
+                               }
+                               parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg );         // parse record and add to in progress table; ack using rts to msg
+
+                               curr = nextr;
+                       }
+
+                       msg->len = 0;                           // force back into the listen loop
+                       break;
+
+               default:
+                       rmr_vlog( RMR_VL_WARN, "rmr_rtc: invalid message type=%d len=%d\n", msg->mtype, (int) msg->len );
+                       break;
+       }
+}
+
+/*
+       Route Table Collector
+       A side thread which either attempts to connect and request a table
+       from the Route Manager, or opens a port and listens for Route Manager
+       to push table updates.
+
+       It may do other things along the way (latency measurements, alarms,
+       respond to RMR pings, etc.).
+
+       The behaviour with respect to listening for Route Manager updates vs
+       the initiation of the connection and sending a request depends on the
+       value of the ENV_RTG_ADDR (RMR_RTG_SVC)  environment variable. If
+       host:port, or IP:port, is given, then we assume that we make the connection
+       and send a request for the table (request mode).  If the variable is just
+       a port, then we assume Route Manager will connect and push updates (original
+       method).
+
+       If the variable is not defined, the default behaviour, in order to be
+       backwards compatable, depends on the presence of the ENV_CTL_PORT
+       (RMR_CTL_PORT) variable (new with the support for requesting a table).
+
+
+       ENV_CTL_PORT    ENV_RTG_ADDR    Behaviour
+       unset                   unset                   Open default CTL port (DEF_CTL_PORT) and
+                                                                       wait for Rt Mgr to push tables
+
+       set                             unset                   Use the default Rt Mgr wellknown addr
+                                                                       and port (DEF_RTG_WK_ADDR) to connect
+                                                                       and request a table. The control port
+                                                                       used is the value set by ENV_CTL_PORT.
+
+       unset                   set                             As described above. The default control
+                                                                       port (DEF_CTL_PORT) is used.
+
+       When we are running in request mode, then we will send the RMR message
+       RMRRM_REFRESH to this address (wormhole) as a request for the route manager
+       to send a new table. We will attempt to connect and send requests until
+       we have a table. Calls to rmr_ready() will report FALSE until a table is
+       loaded _unless_ a seed table was given.
+
+       Route table information is expected to arrive on RMR messages with type
+       RMRRM_TABLE_DATA.  There is NOT a specific message type for each possible
+       table record, so the payload is as it appears in the seed file or as
+       delivered in old versions.  It may take several RMRRM_TABLE_DATA messages
+       to completely supply a new table or table update. See the header for parse_rt_rec
+       in common for a description of possible message contents.
+
+       Buffers received from the route table generator can contain multiple newline terminated
+       records, but each buffer must be less than 4K in length, and the last record in a
+       buffer may NOT be split across buffers.
+
+       Other chores:
+       In addition to the primary task of getting, vetting, and installing a new route table, or
+       updates to the existing table, this thread will periodically cause the send counts for each
+       endpoint known to be written to standard error. The frequency is once every 180 seconds, and
+       more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
+*/
+static void* rtc( void* vctx ) {
+       uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
+       uta_ctx_t*      pvt_cx;                                 // private context for session with rtg
+       rmr_mbuf_t*     msg = NULL;                             // message from rtg
+       char*   my_port;                                        // the port number that we will listen on (4561 has been the default for this)
+       char*   rtg_addr;                                       // host:port address of route table generator (route manager)
+       char*   daddr;                                          // duplicated rtg address string to parse/trash
+       size_t  buf_size;                                       // nng needs var pointer not just size?
+       int             i;
+       long    blabber = 0;                            // time of last blabber so we don't flood if rtg goes bad
+       int             cstate = -1;                            // connection state to rtg
+       int             state;                                          // processing state of some nng function
+       char*   tokens[128];
+       char    wbuf[128];
+       int             ntoks;
+       int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
+       char*   eptr;
+       int             epfd = -1;                                      // fd for epoll so we can multi-task
+       struct  epoll_event events[1];          // list of events to give to epoll; we only have one we care about
+       struct  epoll_event epe;                        // event definition for event to listen to
+       int             count_delay = 30;                       // number of seconds between writing count info; initially every 30s
+       int             bump_freq = 0;                          // time at which we will bump count frequency to every 5 minutes
+       int             flags = 0;
+       int             rt_req_freq = DEF_RTREQ_FREQ;   // request frequency (sec) when wanting a new table
+       int             nxt_rt_req = 0;                                 // time of next request
+
+
+       if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
+               rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
+               return NULL;
+       }
+
+       vlevel = refresh_vlevel( 0 );
+
+       if( (eptr = getenv( ENV_RTREQ_FREA )) != NULL ) {
+               rt_req_freq = atoi( eptr );
+               if( rt_req_freq < 1 || rt_req_freq > 300 ) {
+                       rt_req_freq = DEF_RTREQ_FREQ;
+                       rmr_vlog( RMR_VL_WARN, "rmr_rtc: RT request frequency (%d) out of range (1-300), using default (%d)\n", rt_req_freq, DEF_RTREQ_FREQ );
+               }
+       }
+       rmr_vlog( RMR_VL_INFO, "rmr_rtc: RT request frequency set to: %d seconds", rt_req_freq );
+
+       ctx->flags |= CFL_NO_RTACK;                             // don't ack when reading from a file
+       read_static_rt( ctx, vlevel );                  // seed the route table if one provided
+       ctx->flags &= ~CFL_NO_RTACK;
+       ctx->flags &= ~CFL_FULLRT;                              // even though rmr-ready goes true, the seed doesn't count as a full RT from route generator
+
+
+       my_port = getenv( ENV_CTL_PORT );                               // default port to listen on (likely 4561)
+       if( my_port == NULL || ! *my_port ) {                   // if undefined, then go with default
+               my_port = DEF_CTL_PORT;
+               daddr = DEF_CTL_PORT;                                           // backwards compat; if ctl port not hard defined, default is to listen
+       } else {
+               daddr = DEF_RTG_WK_ADDR;                                        // if ctl port is defined, then default changes to connecting to well known RM addr
+       }
+
+       if( (rtg_addr = getenv( ENV_RTG_ADDR )) == NULL || ! *rtg_addr ) {              // undefined, use default set above
+               rtg_addr = daddr;
+       }
+
+       daddr = strdup( rtg_addr );                                                                     // dup to destroy during parse
+
+       ntoks = uta_tokenise( daddr, tokens, 120, ':' );                        // should be host:ip of rt mgr (could be port only which we assume is old listen port)
+       switch( ntoks ) {
+               case 0:                                                                 // should not happen, but prevent accidents and allow default to ignore additional tokens
+                       break;
+
+               case 1:
+                       my_port = tokens[0];                    // just port -- assume backlevel environment where we just listen
+                       flags |= RTCFL_HAVE_UPDATE;             // prevent sending update reqests
+                       break;
+
+               default:
+                       if( strcmp( tokens[0], "tcp" ) == 0 ) {                 // old school nng tcp:xxxx so we listen on xxx
+                               flags |= RTCFL_HAVE_UPDATE;                                     // and signal not to try to request an update
+                               my_port = tokens[1];
+                       } else {
+                               // rtg_addr points at rt mgr address and my port set from env or default stands as is
+                       }
+                       break;
+       }
+
+       if( (pvt_cx = init( my_port, MAX_RTC_BUF, FL_NOTHREAD )) == NULL ) {                            // open a private context (no RT listener!)
+               rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
+
+               while( TRUE ) {                                                                                         // no listen port, just dump counts now and then
+                       sleep( count_delay );
+                       rt_epcounts( ctx->rtable, ctx->my_name );
+               }
+
+               return NULL;
+       }
+
+       ctx->rtg_whid = -1;
+
+       cycle_snarfed_rt( ctx );                                // cause the nrt to be opened
+
+       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", my_port );
+
+       bump_freq = time( NULL ) + 300;                         // after 5 minutes we decrease the count frequency
+       blabber = 0;
+       while( 1 ) {                                                                                                            // until the cows return, pigs fly, or somesuch event likely not to happen
+               while( msg == NULL || msg->len <= 0 ) {                                                 // until we actually have something from the other side
+                       if( (flags & RTCFL_HAVE_UPDATE) == 0 && time( NULL ) >= nxt_rt_req ) {                  // no route table updated from rt mgr; request one
+                               if( ctx->rtg_whid < 0 ) {
+                                       ctx->rtg_whid = rmr_wh_open( pvt_cx, rtg_addr );
+                               }
+                               send_update_req( pvt_cx, ctx );
+                               nxt_rt_req = time( NULL ) + rt_req_freq;
+                       }
+
+                       msg = rmr_torcv_msg( pvt_cx, msg, 1000 );
+
+                       if( time( NULL ) > blabber  ) {
+                               vlevel = refresh_vlevel( 0 );
+                               blabber = time( NULL ) + count_delay;                           // set next time to blabber, then do so
+                               if( blabber > bump_freq ) {
+                                       count_delay = 300;
+                               }
+                               if( vlevel >= 0 ) {                                                                             // allow it to be forced off with -n in verbose file
+                                       rt_epcounts( ctx->rtable, ctx->my_name );
+                               }
+                       }
+
+                       if( ctx->shutdown != 0 ) {
+                               break;                                                  // mostly for unit test, but allows a forced stop
+                       }
+
+                                                                                               // extra housekeeping chores can be added here...
+                       alarm_if_drops( ctx, pvt_cx );          // send an alarm if message are dropping, clear if we set one and thtings are better
+               }
+
+               vlevel = refresh_vlevel( 0 );                   // ensure it's fresh when we get a message
+
+               if( msg != NULL && msg->len > 0 ) {
+                       rtc_parse_msg( ctx, pvt_cx, msg, vlevel, &flags );
+               }
+
+               if( ctx->shutdown ) {           // mostly for testing, but allows user app to close us down if rmr_*() function sets this
+                       return NULL;
+               }
+
+       }
+
+       return NULL;    // unreachable, but some compilers don't see that and complain.
+}
+
+#ifndef SI95_BUILD
+// this is nng specific inas much as we allow raw (non-RMR) messages
+
+/*
+       NOTE:   This is the original rtc code when we supported "raw" nano/nng messages
+                       from the route manger.  It is deprecated in favour of managing all RM-RMR
+                       communications via an RMR session.
+
+                       The rtc() function above is the new and preferred function regardless
+                       of transport.
+
+       -----------------------------------------------------------------------------------
        Route Table Collector
        A side thread which opens a socket and subscribes to a routing table generator.
        It may do other things along the way (latency measurements?).
@@ -93,19 +460,18 @@ static int refresh_vlevel( int vfd ) {
        endpoint known to be written to standard error. The frequency is once every 180 seconds, and
        more frequently if verbose mode (see ENV_VERBOSE_FILE) is > 0.
 */
-static void* rtc( void* vctx ) {
+static void* raw_rtc( void* vctx ) {
        uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
        uta_ctx_t*      pvt_cx;                                 // private context for session with rtg
        rmr_mbuf_t*     msg = NULL;                             // message from rtg
        char*   payload;                                        // payload in the message
        size_t  mlen;
-       size_t  clen;                                           // length to copy and mark
        char*   port;                                           // a port number we listen/connect to
        char*   fport;                                          // pointer to the real buffer to free
        size_t  buf_size;                                       // nng needs var pointer not just size?
        char*   nextr;                                          // pointer at next record in the message
        char*   curr;                                           // current record
-       int     i;
+       int             i;
        long    blabber = 0;                            // time of last blabber so we don't flood if rtg goes bad
        int             cstate = -1;                            // connection state to rtg
        int             state;                                          // processing state of some nng function
@@ -115,7 +481,6 @@ static void* rtc( void* vctx ) {
        int             pbuf_size = 0;                          // number allocated in pbuf
        int             ntoks;
        int             raw_interface = 1;                      // rtg is using raw NNG/Nano not RMr to send updates
-       int             vfd = -1;                                       // verbose file des if we have one
        int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
        char*   eptr;
        int             epfd = -1;                                      // fd for epoll so we can multi-task
@@ -127,15 +492,11 @@ static void* rtc( void* vctx ) {
 
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
-               fprintf( stderr, "[CRI] rmr_rtc: internal mishap: context passed in was nil\n" );
+               rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: context passed in was nil\n" );
                return NULL;
        }
 
-       if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
-               vfd = open( eptr, O_RDONLY );
-               vlevel = refresh_vlevel( vfd );
-       }                
-
+       vlevel = refresh_vlevel( 0 );
        read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
 
        if( (port = getenv( ENV_RTG_PORT )) == NULL || ! *port ) {              // port we need to open to listen for RTG connections
@@ -144,10 +505,6 @@ static void* rtc( void* vctx ) {
                port = strdup( port );
        }
 
-       if( (curr = getenv( ENV_RTG_RAW )) != NULL ) {
-               raw_interface = atoi( curr ) > 0;                               // if > 0 we assume that rtg messages are NOT coming from an RMr based process
-       }
-
        fport = port;           // must hold to free
 
        ntoks = uta_tokenise( port, tokens, 120, ':' );                 // assume tcp:port, but it could be port or old style host:port
@@ -166,37 +523,37 @@ static void* rtc( void* vctx ) {
        }
 
        if( (pvt_cx = init( port, MAX_RTG_MSG_SZ, FL_NOTHREAD )) == NULL ) {                            // open a private context
-               fprintf( stderr, "[CRI] rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
+               rmr_vlog( RMR_VL_CRIT, "rmr_rtc: unable to initialise listen port for RTG (pvt_cx)\n" );
 
                while( TRUE ) {                                                                                         // no listen port, just dump counts now and then
                        sleep( count_delay );
                        rt_epcounts( ctx->rtable, ctx->my_name );
                }
-               
+
                free( fport );                                  // parinoid free and return
                return NULL;
        }
 
        if( (rcv_fd = rmr_get_rcvfd( pvt_cx )) >= 0 ) {            // get the epoll fd for the rtg socket
                if( rcv_fd < 0 ) {
-                       fprintf( stderr, "[WARN] cannot get epoll fd for rtg session; stats will generate only after update from rt manager\n" );
+                       rmr_vlog( RMR_VL_WARN, "cannot get epoll fd for rtg session; stats will generate only after update from rt manager\n" );
                } else {
                        if( (epfd = epoll_create1( 0 )) < 0 ) {
-                               fprintf( stderr, "[WARN] stats will generate only after rt manager update; unable to create epoll fd for rtg session: %s\n", strerror( errno ) );
+                               rmr_vlog( RMR_VL_WARN, "stats will generate only after rt manager update; unable to create epoll fd for rtg session: %s\n", strerror( errno ) );
                                rcv_fd = -1;
                        } else {
                                epe.events = EPOLLIN;
                                epe.data.fd = rcv_fd;
 
                                if( epoll_ctl( epfd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
-                                       fprintf( stderr, "[WARN] stats will generate only after rt manager update; unable to init epoll_ctl: %s\n", strerror( errno ) );
+                                       rmr_vlog( RMR_VL_WARN, "stats will generate only after rt manager update; unable to init epoll_ctl: %s\n", strerror( errno ) );
                                        rcv_fd = -1;
                                }
                        }
                }
        }
 
-       if( DEBUG ) fprintf( stderr, "[DBUG] rtc thread is running and listening; listening for rtg conns on %s\n", port );
+       if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", port );
        free( fport );
 
        // future:  if we need to register with the rtg, then build a message and send it through a wormhole here
@@ -219,7 +576,7 @@ static void* rtc( void* vctx ) {
                        }
 
                        if( time( NULL ) > blabber  ) {
-                               vlevel = refresh_vlevel( vfd );
+                               vlevel = refresh_vlevel( 0 );
                                if( vlevel >= 0 ) {                                                                             // allow it to be forced off with -n in verbose file
                                        blabber = time( NULL ) + count_delay;                           // set next time to blabber, then do so
                                        if( blabber > bump_freq ) {
@@ -228,17 +585,19 @@ static void* rtc( void* vctx ) {
                                        rt_epcounts( ctx->rtable, ctx->my_name );
                                }
                        }
+
+                       alarm_if_drops( ctx );                          // send an alarm if message are dropping, clear if we set one and thtings are better
                }
 
-               vlevel = refresh_vlevel( vfd );                 // ensure it's fresh when we get a message
+               vlevel = refresh_vlevel( 0 );                   // ensure it's fresh when we get a message
 
                if( msg != NULL && msg->len > 0 ) {
                        payload = msg->payload;
                        mlen = msg->len;                                        // usable bytes in the payload
                        if( vlevel > 1 ) {
-                               fprintf( stderr, "[DBUG] rmr_rtc: received rt message; %d bytes (%s)\n", (int) mlen, msg->payload );
+                               rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes (%s)\n", (int) mlen, msg->payload );
                        } else {
-                               if( DEBUG > 1 || (vlevel > 0) ) fprintf( stderr, "[DBUG] rmr_rtc: received rt message; %d bytes\n", (int) mlen );
+                               if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message; %d bytes\n", (int) mlen );
                        }
 
                        if( pbuf_size <= mlen ) {
@@ -264,9 +623,13 @@ static void* rtc( void* vctx ) {
                                }
 
                                if( vlevel > 1 ) {
-                                       fprintf( stderr, "[DBUG] rmr_rtc: processing (%s)\n", curr );
+                                       rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
+                               }
+                               if( raw_interface ) {
+                                       parse_rt_rec( ctx, NULL, curr, vlevel, NULL );          // nil pvt to parser as we can't ack messages
+                               } else {
+                                       parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg );         // parse record and add to in progress table
                                }
-                               parse_rt_rec( ctx, curr, vlevel );              // parse record and add to in progress table
 
                                curr = nextr;
                        }
@@ -281,6 +644,7 @@ static void* rtc( void* vctx ) {
 
        return NULL;    // unreachable, but some compilers don't see that and complain.
 }
+#endif
 
 
 #endif