Ensure RT incremental update not applied early
[ric-plt/lib/rmr.git] / src / rmr / common / src / rtc_static.c
index 45a277a..10bc2a7 100644 (file)
@@ -1,8 +1,8 @@
 // : vi ts=4 sw=4 noet :
 /*
 ==================================================================================
-       Copyright (c) 2019 Nokia
-       Copyright (c) 2018-2019 AT&T Intellectual Property.
+       Copyright (c) 2019-2021 Nokia
+       Copyright (c) 2018-2021 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
 
 // ------------------------------------------------------------------------------------------------
 
+/*
+       Opens the vlevel control file if needed and reads the vlevel from it.
+       The file is rewound if already open so that external updates are captured.
+       The current level is returnd; 0 on error.
+
+       The environment variable (ENV_VERBOSE_FILE) is used to supply the file to
+       open and read. If missing, we will try /tmp/rmr.v.  We will try to open the file
+       on each call if not alrady open; this allows the value to be supplied after
+       start which helps debugging.
+
+       If close_file is true, then we will close the open vfd and return 0;
+*/
+extern int refresh_vlevel( int close_file ) {
+       static int vfd = -1;
+
+       char*   eptr;
+       char    wbuf[128];                      // read buffer; MUST be 11 or greater
+       int             vlevel = 0;
+
+       if( close_file ) {
+               if( vfd >= 0 ) {
+                       close( vfd );
+                       vfd = -1;
+               }
+               return 0;
+       }
+
+       if( vfd < 0 ) {                         // attempt to find/open on all calls if not open
+               if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
+                       vfd = open( eptr, O_RDONLY );
+               } else {
+                       vfd = open( "/tmp/rmr.v", O_RDONLY );
+               }
+               if( vfd < 0 ) {
+                       return 0;
+               }
+       }
+
+       memset( wbuf, 0, sizeof( char ) * 11 );                 // ensure what we read will be nil terminated
+       if( lseek( vfd, 0, SEEK_SET ) == 0 && read( vfd, wbuf, 10 ) > 0 ) {
+               vlevel = atoi( wbuf );
+       }
+
+       return vlevel;
+}
+
 /*
        Loop forever (assuming we're running in a pthread reading the static table
        every minute or so.
 static void* rtc_file( void* vctx ) {
        uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
        char*   eptr;
-       int             vfd = -1;                                       // verbose file des if we have one
        int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
        char    wbuf[256];
 
@@ -69,37 +114,97 @@ static void* rtc_file( void* vctx ) {
                return NULL;
        }
 
-       if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
-               vfd = open( eptr, O_RDONLY );
-       }
-
        ctx->flags |= CFL_NO_RTACK;                             // no attempt to ack when reading from a file
        while( 1 ) {
-               if( vfd >= 0 ) {
-                       wbuf[0] = 0;
-                       lseek( vfd, 0, 0 );
-                       read( vfd, wbuf, 10 );
-                       vlevel = atoi( wbuf );
-               }
+               vlevel = refresh_vlevel( 0 );
+               read_static_rt( ctx, vlevel );                                          // refresh from the file
 
-               read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
+               if( ctx->shutdown != 0 ) {                                                      // allow for graceful termination and unit testing
+                       refresh_vlevel( 1 );                                                            // close the verbose file if open
+                       return NULL;
+               }
 
-               sleep( 60 );
+               if( ctx->rtable_ready ) {
+                       sleep( 60 );
+               } else {
+                       sleep( 1 );                                                                             // check every second until we have a good one
+               }
        }
 }
 
-static int refresh_vlevel( int vfd ) {
-       int vlevel = 0;
-       char    rbuf[128];
+/*
+       Rtc_parse_msg parses a single message from the route manager. We allow multiple, newline terminated,
+       records in each message; it is required that the last record in the message be complete (we do not
+       reconstruct records split over multiple messages).  For each record, we call the record parser
+       to parse and add the information to the table being built.
 
-       if( vfd >= 0 ) {                                        // if file is open, read current value
-               rbuf[0] = 0;
-               lseek( vfd, 0, 0 );
-               read( vfd, rbuf, 10 );
-               vlevel = atoi( rbuf );
-       }
+       This function was broken from the main rtc() function in order to be able to unit test it. Without
+       this as a standalone funciton, it was impossible to simulate a message arriving on the RTC's private
+       context.
 
-       return vlevel;
+       To reduce malloc/free cycles, we allocate a static work buffer and expand it when needed; in other
+       words, this is not thread safe but it shouldn't need to be.
+*/
+static void rtc_parse_msg( uta_ctx_t *ctx, uta_ctx_t* pvt_cx, rmr_mbuf_t* msg, int vlevel,  int* flags ) {
+       static  unsigned char* pbuf = NULL;
+       static  int pbuf_size = 0;
+
+       unsigned char* payload;
+       unsigned char* curr;
+       unsigned char* nextr;
+       int mlen;
+
+       payload = msg->payload;
+       mlen = msg->len;                                        // usable bytes in the payload
+
+       if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message type=%d len=%d\n", msg->mtype, (int) mlen );
+       switch( msg->mtype ) {
+               case RMRRM_TABLE_DATA:
+                       if( (*flags & RTCFL_HAVE_UPDATE) == 0 ) {
+                               *flags |= RTCFL_HAVE_UPDATE;
+                               rmr_vlog( RMR_VL_INFO, "message flow from route manager starts\n" );
+                       }
+
+                       if( pbuf_size <= mlen ) {
+                               if( pbuf ) {
+                                       free( pbuf );
+                               }
+                               if( mlen < 512 ) {
+                                       pbuf_size = 1024;
+                               } else {
+                                       pbuf_size = mlen * 2;
+                               }
+                               pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
+                       }
+                       memcpy( pbuf, payload, mlen );
+                       pbuf[mlen] = 0;                                                                         // don't depend on sender making this a legit string
+                       if( vlevel > 1 ) {
+                               rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: rt message: (%s)\n", pbuf );
+                       }
+
+                       curr = pbuf;
+                       while( curr ) {                                                                         // loop over each record in the buffer
+                               nextr = strchr( (char *) curr, '\n' );                  // allow multiple newline records, find end of current and mark
+
+                               if( nextr ) {
+                                       *(nextr++) = 0;
+                               }
+
+                               if( vlevel > 1 ) {
+                                       rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc_parse_msg: snarf_fd=%d processing (%s)\n", ctx ? ctx->snarf_rt_fd : -99, curr );
+                               }
+                               parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg );         // parse record and add to in progress table; ack using rts to msg
+
+                               curr = nextr;
+                       }
+
+                       msg->len = 0;                           // force back into the listen loop
+                       break;
+
+               default:
+                       rmr_vlog( RMR_VL_WARN, "rmr_rtc: invalid message type=%d len=%d\n", msg->mtype, (int) msg->len );
+                       break;
+       }
 }
 
 /*
@@ -108,22 +213,22 @@ static int refresh_vlevel( int vfd ) {
        from the Route Manager, or opens a port and listens for Route Manager
        to push table updates.
 
-       It may do other things along the way (latency measurements, alarms, 
+       It may do other things along the way (latency measurements, alarms,
        respond to RMR pings, etc.).
 
        The behaviour with respect to listening for Route Manager updates vs
        the initiation of the connection and sending a request depends on the
        value of the ENV_RTG_ADDR (RMR_RTG_SVC)  environment variable. If
        host:port, or IP:port, is given, then we assume that we make the connection
-       and send a request for the table (request mode).  If the variable is just 
-       a port, then we assume Route Manager will connect and push updates (original 
+       and send a request for the table (request mode).  If the variable is just
+       a port, then we assume Route Manager will connect and push updates (original
        method).
 
        If the variable is not defined, the default behaviour, in order to be
-       backwards compatable, depends on the presence of the ENV_CTL_PORT 
+       backwards compatable, depends on the presence of the ENV_CTL_PORT
        (RMR_CTL_PORT) variable (new with the support for requesting a table).
 
-       
+
        ENV_CTL_PORT    ENV_RTG_ADDR    Behaviour
        unset                   unset                   Open default CTL port (DEF_CTL_PORT) and
                                                                        wait for Rt Mgr to push tables
@@ -134,15 +239,15 @@ static int refresh_vlevel( int vfd ) {
                                                                        used is the value set by ENV_CTL_PORT.
 
        unset                   set                             As described above. The default control
-                                                                       port (DEF_CTL_PORT) is used. 
+                                                                       port (DEF_CTL_PORT) is used.
 
-       When we are running in request mode, then we will send the RMR message 
-       RMRRM_REFRESH to this address (wormhole) as a request for the route manager 
+       When we are running in request mode, then we will send the RMR message
+       RMRRM_REFRESH to this address (wormhole) as a request for the route manager
        to send a new table. We will attempt to connect and send requests until
        we have a table. Calls to rmr_ready() will report FALSE until a table is
        loaded _unless_ a seed table was given.
 
-       Route table information is expected to arrive on RMR messages with type 
+       Route table information is expected to arrive on RMR messages with type
        RMRRM_TABLE_DATA.  There is NOT a specific message type for each possible
        table record, so the payload is as it appears in the seed file or as
        delivered in old versions.  It may take several RMRRM_TABLE_DATA messages
@@ -163,24 +268,17 @@ static void* rtc( void* vctx ) {
        uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
        uta_ctx_t*      pvt_cx;                                 // private context for session with rtg
        rmr_mbuf_t*     msg = NULL;                             // message from rtg
-       char*   payload;                                        // payload in the message
-       size_t  mlen;
        char*   my_port;                                        // the port number that we will listen on (4561 has been the default for this)
        char*   rtg_addr;                                       // host:port address of route table generator (route manager)
        char*   daddr;                                          // duplicated rtg address string to parse/trash
        size_t  buf_size;                                       // nng needs var pointer not just size?
-       char*   nextr;                                          // pointer at next record in the message
-       char*   curr;                                           // current record
-       int     i;
+       int             i;
        long    blabber = 0;                            // time of last blabber so we don't flood if rtg goes bad
        int             cstate = -1;                            // connection state to rtg
        int             state;                                          // processing state of some nng function
        char*   tokens[128];
        char    wbuf[128];
-       char*   pbuf = NULL;
-       int             pbuf_size = 0;                          // number allocated in pbuf
        int             ntoks;
-       int             vfd = -1;                                       // verbose file des if we have one
        int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
        char*   eptr;
        int             epfd = -1;                                      // fd for epoll so we can multi-task
@@ -189,6 +287,8 @@ static void* rtc( void* vctx ) {
        int             count_delay = 30;                       // number of seconds between writing count info; initially every 30s
        int             bump_freq = 0;                          // time at which we will bump count frequency to every 5 minutes
        int             flags = 0;
+       int             rt_req_freq = DEF_RTREQ_FREQ;   // request frequency (sec) when wanting a new table
+       int             nxt_rt_req = 0;                                 // time of next request
 
 
        if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
@@ -196,19 +296,26 @@ static void* rtc( void* vctx ) {
                return NULL;
        }
 
-       if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
-               vfd = open( eptr, O_RDONLY );
-               vlevel = refresh_vlevel( vfd );
+       vlevel = refresh_vlevel( 0 );
+
+       if( (eptr = getenv( ENV_RTREQ_FREA )) != NULL ) {
+               rt_req_freq = atoi( eptr );
+               if( rt_req_freq < 1 || rt_req_freq > 300 ) {
+                       rt_req_freq = DEF_RTREQ_FREQ;
+                       rmr_vlog( RMR_VL_WARN, "rmr_rtc: RT request frequency (%d) out of range (1-300), using default (%d)\n", rt_req_freq, DEF_RTREQ_FREQ );
+               }
        }
+       rmr_vlog( RMR_VL_INFO, "rmr_rtc: RT request frequency set to: %d seconds", rt_req_freq );
 
-       ctx->flags |= CFL_NO_RTACK;                     // don't ack when reading from a file
+       ctx->flags |= CFL_NO_RTACK;                             // don't ack when reading from a file
        read_static_rt( ctx, vlevel );                  // seed the route table if one provided
        ctx->flags &= ~CFL_NO_RTACK;
+       ctx->flags &= ~CFL_FULLRT;                              // even though rmr-ready goes true, the seed doesn't count as a full RT from route generator
 
 
        my_port = getenv( ENV_CTL_PORT );                               // default port to listen on (likely 4561)
        if( my_port == NULL || ! *my_port ) {                   // if undefined, then go with default
-               my_port = DEF_CTL_PORT; 
+               my_port = DEF_CTL_PORT;
                daddr = DEF_CTL_PORT;                                           // backwards compat; if ctl port not hard defined, default is to listen
        } else {
                daddr = DEF_RTG_WK_ADDR;                                        // if ctl port is defined, then default changes to connecting to well known RM addr
@@ -235,7 +342,7 @@ static void* rtc( void* vctx ) {
                                flags |= RTCFL_HAVE_UPDATE;                                     // and signal not to try to request an update
                                my_port = tokens[1];
                        } else {
-                               // rtg_addr points at rt mgr address and my port set from env or default stands as is   
+                               // rtg_addr points at rt mgr address and my port set from env or default stands as is
                        }
                        break;
        }
@@ -253,88 +360,47 @@ static void* rtc( void* vctx ) {
 
        ctx->rtg_whid = -1;
 
+       cycle_snarfed_rt( ctx );                                // cause the nrt to be opened
+
        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", my_port );
 
        bump_freq = time( NULL ) + 300;                         // after 5 minutes we decrease the count frequency
        blabber = 0;
        while( 1 ) {                                                                                                            // until the cows return, pigs fly, or somesuch event likely not to happen
                while( msg == NULL || msg->len <= 0 ) {                                                 // until we actually have something from the other side
-                       if( (flags & RTCFL_HAVE_UPDATE) == 0 ) {                                                // no route table updated from rt mgr; request one
+                       if( (flags & RTCFL_HAVE_UPDATE) == 0 && time( NULL ) >= nxt_rt_req ) {                  // no route table updated from rt mgr; request one
                                if( ctx->rtg_whid < 0 ) {
                                        ctx->rtg_whid = rmr_wh_open( pvt_cx, rtg_addr );
                                }
                                send_update_req( pvt_cx, ctx );
+                               nxt_rt_req = time( NULL ) + rt_req_freq;
                        }
 
                        msg = rmr_torcv_msg( pvt_cx, msg, 1000 );
 
                        if( time( NULL ) > blabber  ) {
-                               vlevel = refresh_vlevel( vfd );
+                               vlevel = refresh_vlevel( 0 );
+                               blabber = time( NULL ) + count_delay;                           // set next time to blabber, then do so
+                               if( blabber > bump_freq ) {
+                                       count_delay = 300;
+                               }
                                if( vlevel >= 0 ) {                                                                             // allow it to be forced off with -n in verbose file
-                                       blabber = time( NULL ) + count_delay;                           // set next time to blabber, then do so
-                                       if( blabber > bump_freq ) {
-                                               count_delay = 300;
-                                       }
                                        rt_epcounts( ctx->rtable, ctx->my_name );
                                }
                        }
-               }
-
-               vlevel = refresh_vlevel( vfd );                 // ensure it's fresh when we get a message
 
-               if( msg != NULL && msg->len > 0 ) {
-                       payload = msg->payload;
-                       mlen = msg->len;                                        // usable bytes in the payload
-                       if( vlevel > 1 ) {
-                               rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: received rt message type=%d len=%d bytes (%s)\n", msg->mtype, (int) mlen, msg->payload );
-                       } else {
-                               if( DEBUG > 1 || (vlevel > 0) ) rmr_vlog( RMR_VL_DEBUG, "rmr_rtc: received rt message type=%d len=%d\n", msg->mtype, (int) mlen );
+                       if( ctx->shutdown != 0 ) {
+                               break;                                                  // mostly for unit test, but allows a forced stop
                        }
 
-                       switch( msg->mtype ) {
-                               case RMRRM_TABLE_DATA:
-                                       if( (flags & RTCFL_HAVE_UPDATE) == 0 ) {
-                                               flags |= RTCFL_HAVE_UPDATE;
-                                               rmr_vlog( RMR_VL_INFO, "message flow from route manager starts\n" );
-                                       }
-
-                                       if( pbuf_size <= mlen ) {
-                                               if( pbuf ) {
-                                                       free( pbuf );
-                                               }
-                                               if( mlen < 512 ) {
-                                                       pbuf_size = 512;
-                                               } else {
-                                                       pbuf_size = mlen * 2;
-                                               }
-                                               pbuf = (char *) malloc( sizeof( char ) * pbuf_size );
-                                       }
-                                       memcpy( pbuf, payload, mlen );
-                                       pbuf[mlen] = 0;                                                                         // don't depend on sender making this a legit string
-
-                                       curr = pbuf;
-                                       while( curr ) {                                                         // loop over each record in the buffer
-                                               nextr = strchr( curr, '\n' );                   // allow multiple newline records, find end of current and mark
-
-                                               if( nextr ) {
-                                                       *(nextr++) = 0;
-                                               }
-
-                                               if( vlevel > 1 ) {
-                                                       rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
-                                               }
-                                               parse_rt_rec( ctx, pvt_cx, curr, vlevel );              // parse record and add to in progress table
-
-                                               curr = nextr;
-                                       }
+                                                                                               // extra housekeeping chores can be added here...
+                       alarm_if_drops( ctx, pvt_cx );          // send an alarm if message are dropping, clear if we set one and thtings are better
+               }
 
-                                       msg->len = 0;                           // force back into the listen loop
-                                       break;
+               vlevel = refresh_vlevel( 0 );                   // ensure it's fresh when we get a message
 
-                               default:
-                                       rmr_vlog( RMR_VL_WARN, "rmr_rtc: invalid message type=%d len=%d\n", msg->mtype, (int) msg->len );
-                                       break;
-                       }
+               if( msg != NULL && msg->len > 0 ) {
+                       rtc_parse_msg( ctx, pvt_cx, msg, vlevel, &flags );
                }
 
                if( ctx->shutdown ) {           // mostly for testing, but allows user app to close us down if rmr_*() function sets this
@@ -347,7 +413,7 @@ static void* rtc( void* vctx ) {
 }
 
 #ifndef SI95_BUILD
-// this is nng specific inas much as we allow raw (non-RMR) messages 
+// this is nng specific inas much as we allow raw (non-RMR) messages
 
 /*
        NOTE:   This is the original rtc code when we supported "raw" nano/nng messages
@@ -405,7 +471,7 @@ static void* raw_rtc( void* vctx ) {
        size_t  buf_size;                                       // nng needs var pointer not just size?
        char*   nextr;                                          // pointer at next record in the message
        char*   curr;                                           // current record
-       int     i;
+       int             i;
        long    blabber = 0;                            // time of last blabber so we don't flood if rtg goes bad
        int             cstate = -1;                            // connection state to rtg
        int             state;                                          // processing state of some nng function
@@ -415,7 +481,6 @@ static void* raw_rtc( void* vctx ) {
        int             pbuf_size = 0;                          // number allocated in pbuf
        int             ntoks;
        int             raw_interface = 1;                      // rtg is using raw NNG/Nano not RMr to send updates
-       int             vfd = -1;                                       // verbose file des if we have one
        int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
        char*   eptr;
        int             epfd = -1;                                      // fd for epoll so we can multi-task
@@ -431,11 +496,7 @@ static void* raw_rtc( void* vctx ) {
                return NULL;
        }
 
-       if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
-               vfd = open( eptr, O_RDONLY );
-               vlevel = refresh_vlevel( vfd );
-       }                
-
+       vlevel = refresh_vlevel( 0 );
        read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
 
        if( (port = getenv( ENV_RTG_PORT )) == NULL || ! *port ) {              // port we need to open to listen for RTG connections
@@ -444,13 +505,6 @@ static void* raw_rtc( void* vctx ) {
                port = strdup( port );
        }
 
-/*
-       this test is now done in init and this function is started _only_ if the value was 1
-       if( (curr = getenv( ENV_RTG_RAW )) != NULL ) {
-               raw_interface = atoi( curr ) > 0;                               // if > 0 we assume that rtg messages are NOT coming from an RMr based process
-       }
-*/
-
        fport = port;           // must hold to free
 
        ntoks = uta_tokenise( port, tokens, 120, ':' );                 // assume tcp:port, but it could be port or old style host:port
@@ -475,7 +529,7 @@ static void* raw_rtc( void* vctx ) {
                        sleep( count_delay );
                        rt_epcounts( ctx->rtable, ctx->my_name );
                }
-               
+
                free( fport );                                  // parinoid free and return
                return NULL;
        }
@@ -522,7 +576,7 @@ static void* raw_rtc( void* vctx ) {
                        }
 
                        if( time( NULL ) > blabber  ) {
-                               vlevel = refresh_vlevel( vfd );
+                               vlevel = refresh_vlevel( 0 );
                                if( vlevel >= 0 ) {                                                                             // allow it to be forced off with -n in verbose file
                                        blabber = time( NULL ) + count_delay;                           // set next time to blabber, then do so
                                        if( blabber > bump_freq ) {
@@ -531,9 +585,11 @@ static void* raw_rtc( void* vctx ) {
                                        rt_epcounts( ctx->rtable, ctx->my_name );
                                }
                        }
+
+                       alarm_if_drops( ctx );                          // send an alarm if message are dropping, clear if we set one and thtings are better
                }
 
-               vlevel = refresh_vlevel( vfd );                 // ensure it's fresh when we get a message
+               vlevel = refresh_vlevel( 0 );                   // ensure it's fresh when we get a message
 
                if( msg != NULL && msg->len > 0 ) {
                        payload = msg->payload;
@@ -570,9 +626,9 @@ static void* raw_rtc( void* vctx ) {
                                        rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
                                }
                                if( raw_interface ) {
-                                       parse_rt_rec( ctx, NULL, curr, vlevel );                // nil pvt to parser as we can't ack messages
+                                       parse_rt_rec( ctx, NULL, curr, vlevel, NULL );          // nil pvt to parser as we can't ack messages
                                } else {
-                                       parse_rt_rec( ctx, pvt_cx, curr, vlevel );              // parse record and add to in progress table
+                                       parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg );         // parse record and add to in progress table
                                }
 
                                curr = nextr;