Ensure RT incremental update not applied early
[ric-plt/lib/rmr.git] / src / rmr / common / src / rtc_static.c
index 9aca092..10bc2a7 100644 (file)
@@ -1,8 +1,8 @@
 // : vi ts=4 sw=4 noet :
 /*
 ==================================================================================
-       Copyright (c) 2019-2020 Nokia
-       Copyright (c) 2018-2020 AT&T Intellectual Property.
+       Copyright (c) 2019-2021 Nokia
+       Copyright (c) 2018-2021 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
 
 // ------------------------------------------------------------------------------------------------
 
+/*
+       Opens the vlevel control file if needed and reads the vlevel from it.
+       The file is rewound if already open so that external updates are captured.
+       The current level is returnd; 0 on error.
+
+       The environment variable (ENV_VERBOSE_FILE) is used to supply the file to
+       open and read. If missing, we will try /tmp/rmr.v.  We will try to open the file
+       on each call if not alrady open; this allows the value to be supplied after
+       start which helps debugging.
+
+       If close_file is true, then we will close the open vfd and return 0;
+*/
+extern int refresh_vlevel( int close_file ) {
+       static int vfd = -1;
+
+       char*   eptr;
+       char    wbuf[128];                      // read buffer; MUST be 11 or greater
+       int             vlevel = 0;
+
+       if( close_file ) {
+               if( vfd >= 0 ) {
+                       close( vfd );
+                       vfd = -1;
+               }
+               return 0;
+       }
+
+       if( vfd < 0 ) {                         // attempt to find/open on all calls if not open
+               if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
+                       vfd = open( eptr, O_RDONLY );
+               } else {
+                       vfd = open( "/tmp/rmr.v", O_RDONLY );
+               }
+               if( vfd < 0 ) {
+                       return 0;
+               }
+       }
+
+       memset( wbuf, 0, sizeof( char ) * 11 );                 // ensure what we read will be nil terminated
+       if( lseek( vfd, 0, SEEK_SET ) == 0 && read( vfd, wbuf, 10 ) > 0 ) {
+               vlevel = atoi( wbuf );
+       }
+
+       return vlevel;
+}
+
 /*
        Loop forever (assuming we're running in a pthread reading the static table
        every minute or so.
 static void* rtc_file( void* vctx ) {
        uta_ctx_t*      ctx;                                    // context user has -- where we pin the route table
        char*   eptr;
-       int             vfd = -1;                                       // verbose file des if we have one
        int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
        char    wbuf[256];
 
@@ -69,42 +114,22 @@ static void* rtc_file( void* vctx ) {
                return NULL;
        }
 
-       if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
-               vfd = open( eptr, O_RDONLY );
-       }
-
        ctx->flags |= CFL_NO_RTACK;                             // no attempt to ack when reading from a file
        while( 1 ) {
-               if( vfd >= 0 ) {
-                       wbuf[0] = 0;
-                       lseek( vfd, 0, 0 );
-                       if( read( vfd, wbuf, 10 ) > 0 ) {
-                               vlevel = atoi( wbuf );
-                       }
-               }
-
-               read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
+               vlevel = refresh_vlevel( 0 );
+               read_static_rt( ctx, vlevel );                                          // refresh from the file
 
                if( ctx->shutdown != 0 ) {                                                      // allow for graceful termination and unit testing
+                       refresh_vlevel( 1 );                                                            // close the verbose file if open
                        return NULL;
                }
-               sleep( 60 );
-       }
-}
-
-static int refresh_vlevel( int vfd ) {
-       int vlevel = 0;
-       char    rbuf[128];
 
-       if( vfd >= 0 ) {                                        // if file is open, read current value
-               rbuf[0] = 0;
-               lseek( vfd, 0, 0 );
-               if( read( vfd, rbuf, 10 ) > 0 ) {
-                       vlevel = atoi( rbuf );
+               if( ctx->rtable_ready ) {
+                       sleep( 60 );
+               } else {
+                       sleep( 1 );                                                                             // check every second until we have a good one
                }
        }
-
-       return vlevel;
 }
 
 /*
@@ -166,7 +191,7 @@ static void rtc_parse_msg( uta_ctx_t *ctx, uta_ctx_t* pvt_cx, rmr_mbuf_t* msg, i
                                }
 
                                if( vlevel > 1 ) {
-                                       rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc: processing (%s)\n", curr );
+                                       rmr_vlog_force( RMR_VL_DEBUG, "rmr_rtc_parse_msg: snarf_fd=%d processing (%s)\n", ctx ? ctx->snarf_rt_fd : -99, curr );
                                }
                                parse_rt_rec( ctx, pvt_cx, curr, vlevel, msg );         // parse record and add to in progress table; ack using rts to msg
 
@@ -254,7 +279,6 @@ static void* rtc( void* vctx ) {
        char*   tokens[128];
        char    wbuf[128];
        int             ntoks;
-       int             vfd = -1;                                       // verbose file des if we have one
        int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
        char*   eptr;
        int             epfd = -1;                                      // fd for epoll so we can multi-task
@@ -272,21 +296,13 @@ static void* rtc( void* vctx ) {
                return NULL;
        }
 
-       if( (ctx->ephash = rmr_sym_alloc( RT_SIZE )) == NULL ) {                // master hash table for endpoints (each rt will reference this)
-               rmr_vlog( RMR_VL_CRIT, "rmr_rtc: internal mishap: unable to allocate an endpoint hash table\n" );
-               return NULL;
-       }
-
-       if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
-               vfd = open( eptr, O_RDONLY );
-               vlevel = refresh_vlevel( vfd );
-       }
+       vlevel = refresh_vlevel( 0 );
 
        if( (eptr = getenv( ENV_RTREQ_FREA )) != NULL ) {
                rt_req_freq = atoi( eptr );
                if( rt_req_freq < 1 || rt_req_freq > 300 ) {
                        rt_req_freq = DEF_RTREQ_FREQ;
-                       rmr_vlog( RMR_VL_WARN, "rmr_rtc: RT request frequency (%d) out of range (1-300), using default", DEF_RTREQ_FREQ );
+                       rmr_vlog( RMR_VL_WARN, "rmr_rtc: RT request frequency (%d) out of range (1-300), using default (%d)\n", rt_req_freq, DEF_RTREQ_FREQ );
                }
        }
        rmr_vlog( RMR_VL_INFO, "rmr_rtc: RT request frequency set to: %d seconds", rt_req_freq );
@@ -294,6 +310,7 @@ static void* rtc( void* vctx ) {
        ctx->flags |= CFL_NO_RTACK;                             // don't ack when reading from a file
        read_static_rt( ctx, vlevel );                  // seed the route table if one provided
        ctx->flags &= ~CFL_NO_RTACK;
+       ctx->flags &= ~CFL_FULLRT;                              // even though rmr-ready goes true, the seed doesn't count as a full RT from route generator
 
 
        my_port = getenv( ENV_CTL_PORT );                               // default port to listen on (likely 4561)
@@ -343,6 +360,8 @@ static void* rtc( void* vctx ) {
 
        ctx->rtg_whid = -1;
 
+       cycle_snarfed_rt( ctx );                                // cause the nrt to be opened
+
        if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "rtc thread is running and listening; listening for rtg conns on %s\n", my_port );
 
        bump_freq = time( NULL ) + 300;                         // after 5 minutes we decrease the count frequency
@@ -360,12 +379,12 @@ static void* rtc( void* vctx ) {
                        msg = rmr_torcv_msg( pvt_cx, msg, 1000 );
 
                        if( time( NULL ) > blabber  ) {
-                               vlevel = refresh_vlevel( vfd );
+                               vlevel = refresh_vlevel( 0 );
+                               blabber = time( NULL ) + count_delay;                           // set next time to blabber, then do so
+                               if( blabber > bump_freq ) {
+                                       count_delay = 300;
+                               }
                                if( vlevel >= 0 ) {                                                                             // allow it to be forced off with -n in verbose file
-                                       blabber = time( NULL ) + count_delay;                           // set next time to blabber, then do so
-                                       if( blabber > bump_freq ) {
-                                               count_delay = 300;
-                                       }
                                        rt_epcounts( ctx->rtable, ctx->my_name );
                                }
                        }
@@ -373,9 +392,12 @@ static void* rtc( void* vctx ) {
                        if( ctx->shutdown != 0 ) {
                                break;                                                  // mostly for unit test, but allows a forced stop
                        }
+
+                                                                                               // extra housekeeping chores can be added here...
+                       alarm_if_drops( ctx, pvt_cx );          // send an alarm if message are dropping, clear if we set one and thtings are better
                }
 
-               vlevel = refresh_vlevel( vfd );                 // ensure it's fresh when we get a message
+               vlevel = refresh_vlevel( 0 );                   // ensure it's fresh when we get a message
 
                if( msg != NULL && msg->len > 0 ) {
                        rtc_parse_msg( ctx, pvt_cx, msg, vlevel, &flags );
@@ -459,7 +481,6 @@ static void* raw_rtc( void* vctx ) {
        int             pbuf_size = 0;                          // number allocated in pbuf
        int             ntoks;
        int             raw_interface = 1;                      // rtg is using raw NNG/Nano not RMr to send updates
-       int             vfd = -1;                                       // verbose file des if we have one
        int             vlevel = 0;                                     // how chatty we should be 0== no nattering allowed
        char*   eptr;
        int             epfd = -1;                                      // fd for epoll so we can multi-task
@@ -475,11 +496,7 @@ static void* raw_rtc( void* vctx ) {
                return NULL;
        }
 
-       if( (eptr = getenv( ENV_VERBOSE_FILE )) != NULL ) {
-               vfd = open( eptr, O_RDONLY );
-               vlevel = refresh_vlevel( vfd );
-       }
-
+       vlevel = refresh_vlevel( 0 );
        read_static_rt( ctx, vlevel );                                          // seed the route table if one provided
 
        if( (port = getenv( ENV_RTG_PORT )) == NULL || ! *port ) {              // port we need to open to listen for RTG connections
@@ -488,13 +505,6 @@ static void* raw_rtc( void* vctx ) {
                port = strdup( port );
        }
 
-/*
-       this test is now done in init and this function is started _only_ if the value was 1
-       if( (curr = getenv( ENV_RTG_RAW )) != NULL ) {
-               raw_interface = atoi( curr ) > 0;                               // if > 0 we assume that rtg messages are NOT coming from an RMr based process
-       }
-*/
-
        fport = port;           // must hold to free
 
        ntoks = uta_tokenise( port, tokens, 120, ':' );                 // assume tcp:port, but it could be port or old style host:port
@@ -566,7 +576,7 @@ static void* raw_rtc( void* vctx ) {
                        }
 
                        if( time( NULL ) > blabber  ) {
-                               vlevel = refresh_vlevel( vfd );
+                               vlevel = refresh_vlevel( 0 );
                                if( vlevel >= 0 ) {                                                                             // allow it to be forced off with -n in verbose file
                                        blabber = time( NULL ) + count_delay;                           // set next time to blabber, then do so
                                        if( blabber > bump_freq ) {
@@ -575,9 +585,11 @@ static void* raw_rtc( void* vctx ) {
                                        rt_epcounts( ctx->rtable, ctx->my_name );
                                }
                        }
+
+                       alarm_if_drops( ctx );                          // send an alarm if message are dropping, clear if we set one and thtings are better
                }
 
-               vlevel = refresh_vlevel( vfd );                 // ensure it's fresh when we get a message
+               vlevel = refresh_vlevel( 0 );                   // ensure it's fresh when we get a message
 
                if( msg != NULL && msg->len > 0 ) {
                        payload = msg->payload;